providers.py 12.3 KB
Newer Older
1
2
from __future__ import annotations
from typing import TYPE_CHECKING, Optional, Sequence, Union, List, Type
Enrico Zini's avatar
Enrico Zini committed
3
import json
4
5
6

if TYPE_CHECKING:
    import django.http
7
    from .models import Identity
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

# Note: this module is supposed to be imported from settings.py
# Its import list for the case of defining providers should be kept accordingly
# minimal

providers_by_name = None


def get(name: str, *args):
    """
    Get a provider by name, if defined, else raise ImproperlyConfigured
    """
    from django.conf import settings
    from django.core.exceptions import ImproperlyConfigured
    providers = getattr(settings, "SIGNON_PROVIDERS", None)
    if providers is None:
        raise ImproperlyConfigured(f"signon provider {name} requested, but SIGNON_PROVIDERS is not defined in settings")

    # Index providers by name if needed
    global providers_by_name
    if providers_by_name is None:
        providers_by_name = {p.name: p for p in providers}

    provider = providers_by_name.get(name)
    if provider is None:
        if args:
            return args[0]
        raise ImproperlyConfigured(f"signon provider {name} requested, but not found in SIGNON_PROVIDERS setting")
    return provider


class BoundProvider:
    """
    Request-aware proxy for Provider
    """
    def __init__(self, provider: "Provider", request: django.http.HttpRequest):
        self.provider = provider
        self.request = request

    def __getattr__(self, name):
        """
        Proxy attribute access to the provider definition
        """
        return getattr(self.provider, name)

Enrico Zini's avatar
Enrico Zini committed
53
54
55
    def get_active_identity(self):
        return self.request.signon_identities.get(self.provider.name)

56
57
58
59
60
    def get_actions(self):
        """
        Return a dict describing actions for actions that can be taken for this
        provider in the current request
        """
61
        from django.utils.translation import gettext as _
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
        from django.urls import reverse
        my_user = None
        other_users = set()
        for name, identity in self.request.signon_identities.items():
            if name == self.provider.name:
                my_user = identity.person
            elif identity.person:
                other_users.add(identity.person)
        if my_user is not None:
            other_users.discard(my_user)

        res = []
        for u in sorted(other_users):
            res.append({
                "label": _("Link to {}").format(u),
                "icon": "link",
                "url": reverse("signon:link", args=[self.name, u.pk]),
            })
        if my_user:
            res.append({
                "label": _("Unlink"),
                "icon": "unlink",
                "url": reverse("signon:unlink", args=[self.name]),
            })
        return res

88
89
90
91
    def logout(self, identity):
        self.request.signon_identities.pop(self.provider.name, None)
        self.request.session.pop(f"signon_identity_{self.provider.name}", None)

92
93
94
95
96
97
98
99

class Provider:
    """
    Information about a signon identity provider
    """
    # Class used to create a request-bound version
    bound_class: Type["BoundProvider"] = BoundProvider

100
    def __init__(self, name: str, label: str, icon: Optional[str] = None, single_bind: bool = False):
101
102
103
        self.name = name
        self.label = label
        self.icon = icon
104
        self.single_bind = single_bind
105
106
107
108

    def bind(self, request: django.http.HttpRequest) -> "BoundProvider":
        return self.bound_class(self, request)

109
110
111
112
113
114
115
116
117
118
119
120
121
    def logout(self, request: django.http.HttpRequest):
        """
        Logout the identity for this provider
        """
        pass

    def preserve_active_identifier_after_logout(self, request: django.http.HttpRequest):
        """
        This can be called after django's auth.logout() is called and the
        session is flushed, to preserve the active identities if needed.
        """
        pass

122
123
124
125
126
127
128
129
130
    def identity_for_request(self, request: django.http.HttpRequest) -> Optional[Identity]:
        """
        Return the active identity for this provider and request.

        Returns None if there is no identity currently active for this provider
        in the request
        """
        return None

131

132
133
class BoundBaseSessionProvider(BoundProvider):
    def get_actions(self):
134
        from django.utils.translation import gettext as _
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
        from django.urls import reverse
        res = super().get_actions()
        res.append({
            "method": "post",
            "label": _("Logout"),
            "icon": "sign-out",
            "url": reverse("signon:logout_identity", args=[self.name]),
        })
        return res


class BaseSessionProvider(Provider):
    """
    Provider that represents active identities by storing their IDs in the
    session
    """
    bound_class = BoundBaseSessionProvider

    def identity_for_request(self, request: django.http.HttpRequest) -> Optional[Identity]:
        """
        Look for an active identity ID in the session
        """
        from .models import Identity
        pk = request.session.get(f"signon_identity_{self.name}")
        if pk is None:
            return None

        try:
            return Identity.objects.get(pk=pk, issuer=self.name)
        except Identity.DoesNotExist:
            # If the session has a broken Identity ID, remove it
            del request.session[f"signon_identity_{self.name}"]
            return None

    def logout(self, request: django.http.HttpRequest):
        """
        Logout the identity for this provider
        """
        request.session.pop(f"signon_identity_{self.name}", None)
        request.signon_identities.pop(self.name, None)

    def preserve_active_identifier_after_logout(self, request: django.http.HttpRequest):
        identity = request.signon_identities.get(self.name)
        if identity is not None:
            request.session[f"signon_identity_{self.name}"] = identity.pk


class BoundOIDCProvider(BoundBaseSessionProvider):
183
184
185
186
187
188
    def __init__(self, *args, **kw):
        super().__init__(*args, **kw)
        from requests_oauthlib import OAuth2Session
        from django.urls import reverse
        self.oauth = OAuth2Session(
                self.provider.client_id, scope=self.provider.scope,
189
                redirect_uri=self.request.build_absolute_uri(reverse("signon:oidc_callback", args=(self.name,))))
190
191
        self.tokens = None
        self.keyset = None
192
193
194
195
196
197
198
199

    def get_authorization_url(self) -> str:
        """
        Return an authorization URL for this provider
        """
        url, state = self.oauth.authorization_url(self.provider.url_authorize)
        return url

200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
    def load_keys(self):
        # TODO: pre-cache this in a discovery method, run once, that saves
        # provider information somewhere
        import jwcrypto.jwk
        key_response = self.oauth.get(self.provider.url_jwks)
        key_response.raise_for_status()
        self.keyset = jwcrypto.jwk.JWKSet.from_json(key_response.text)

    def load_tokens(self):
        """
        Fetch and validate access_token and id_token from OIDC provider
        """
        import jwcrypto.jwt

        if self.keyset is None:
            self.load_keys()

        self.tokens = self.oauth.fetch_token(
                self.url_token,
                authorization_response=self.request.build_absolute_uri(),
                client_secret=self.client_secret)

        id_token = self.tokens["id_token"]

        tok = jwcrypto.jwt.JWT(key=self.keyset, jwt=id_token)
Enrico Zini's avatar
Enrico Zini committed
225
        self.id_token_claims = json.loads(tok.claims)
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243

        assert self.id_token_claims["iss"] == self.provider.url_issuer
        assert self.id_token_claims["aud"] == self.provider.client_id
        # TODO: assert tok["iat"] to be not too old?
        # TODO: honor tok["auth_time"]?

    def get_userinfo(self):
        """
        Fetch user information from OIDC provider
        """
        user_response = self.oauth.get(
            self.provider.url_userinfo,
            headers={
                'Authorization': f'Bearer {self.tokens["access_token"]}',
            })
        user_response.raise_for_status()
        return user_response.json()

244

245
class OIDCProvider(BaseSessionProvider):
246
247
248
249
250
251
252
253
254
    """
    OpenID Connect identity provider
    """
    bound_class = BoundOIDCProvider

    def __init__(
            self, name: str, label: str,
            client_id: str,
            client_secret: str,
255
            url_issuer: str,
256
257
258
            url_authorize: str,
            url_token: str,
            url_userinfo: str,
259
            url_jwks: str,
260
            scope: Union[str, Sequence[str]],
261
262
263
            icon: Optional[str] = None,
            single_bind: bool = False):
        super().__init__(name=name, label=label, icon=icon, single_bind=single_bind)
264
265
        self.client_id = client_id
        self.client_secret = client_secret
266
        self.url_issuer = url_issuer
267
268
269
        self.url_authorize = url_authorize
        self.url_token = url_token
        self.url_userinfo = url_userinfo
270
        self.url_jwks = url_jwks
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
        self.scope: List[str]
        if isinstance(scope, str):
            self.scope = [scope]
        else:
            self.scope = list(scope)


class GitlabProvider(OIDCProvider):
    """
    Gitlab OIDC identity provider
    """
    def __init__(
            self, name: str, label: str,
            client_id: str,
            client_secret: str,
            url: str,
            scope: Optional[Union[str, Sequence[str]]] = None,
288
289
            icon: Optional[str] = None,
            single_bind: bool = False):
290
        super().__init__(
291
                name=name, label=label, icon=icon, single_bind=single_bind,
292
                client_id=client_id, client_secret=client_secret,
Enrico Zini's avatar
Enrico Zini committed
293
                scope=scope if scope is not None else "openid",
294
                url_issuer=url,
295
296
                url_authorize=f"{url}/oauth/authorize",
                url_token=f"{url}/oauth/token",
297
298
                url_userinfo=f"{url}/oauth/userinfo",
                url_jwks=f"{url}/oauth/discovery/keys")
299
        self.single_bind = True
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318


class BoundDebssoProvider(BoundProvider):
    def get_authorization_url(self) -> str:
        """
        For sso.debian.org, we can only point to the client instructions
        """
        return "https://wiki.debian.org/DebianSingleSignOn#Debian_SSO_documentation"


class DebssoProvider(Provider):
    """
    Client certificate based provider from sso.debian.org
    """
    bound_class = BoundDebssoProvider

    def __init__(
            self, name: str, label: str,
            icon: Optional[str] = None,
319
            single_bind: bool = False,
320
            env_name: Optional[str] = None):
321
        super().__init__(name=name, label=label, icon=icon, single_bind=single_bind)
322
        self.env_name = env_name if env_name is not None else "SSL_CLIENT_S_DN_CN"
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359

    def _get_remote_user(self, request: django.http.HttpRequest) -> Optional[str]:
        from django.conf import settings

        # Allow to override the current user via settings for tests
        remote_user = getattr(settings, "TEST_USER", None)
        if remote_user is not None:
            return remote_user

        # Get user from SSO certificates
        remote_user = request.META.get("SSL_CLIENT_S_DN_CN", None)
        if remote_user is not None:
            return remote_user

        return None

    def identity_for_request(self, request: django.http.HttpRequest) -> Optional[Identity]:
        """
        Look up an identity based on request.META entries from sso.debian.org
        """
        from .models import Identity
        remote_user = self._get_remote_user(request)

        # Accept only @debian.org and @users.alioth.debian.org values
        if (remote_user
                and not remote_user.endswith("@debian.org")
                and not remote_user.endswith("@users.alioth.debian.org")):
            remote_user = None

        if remote_user is None:
            request.META.pop("REMOTE_USER", None)
            return None

        request.META["REMOTE_USER"] = remote_user
        try:
            return Identity.objects.get(issuer=self.name, subject=remote_user)
        except Identity.DoesNotExist:
360
            from django.contrib.auth import get_user_model
361
362
            return Identity.objects.create(
                    issuer=self.name, subject=remote_user, username=remote_user,
363
                    audit_author=get_user_model().objects.get_housekeeper(),
364
365
                    audit_notes="Identity created automatically from valid user in environment"
            )