models.py 6.66 KB
Newer Older
1
from __future__ import annotations
2
3
import json
import datetime
4
from django.utils.translation import gettext_lazy as _
5
from django.db import models
6
from django.forms.models import model_to_dict
7
from django.contrib.auth import get_user_model
8
from . import providers
9
10


11
12
13
14
15
16
17
18
19
20
21
22
23
24
class IdentityManager(models.Manager):
    def create(self, **kw):
        """
        Create a new process and all its requirements
        """
        audit_author = kw.pop("audit_author", None)
        audit_notes = kw.pop("audit_notes", None)
        audit_skip = kw.pop("audit_skip", False)

        obj = self.model(**kw)
        obj.save(using=self._db, audit_author=audit_author, audit_notes=audit_notes, audit_skip=audit_skip)
        return obj


25
26
27
28
29
class Identity(models.Model):
    """
    Identity for a user in a remote user database
    """
    class Meta:
Enrico Zini's avatar
Enrico Zini committed
30
        unique_together = ["issuer", "subject"]
31

32
    person = models.ForeignKey(get_user_model(), related_name="identities", null=True, on_delete=models.CASCADE)
33
34
    issuer = models.CharField(max_length=512, help_text=_("identifier of the user database that manages this identity"))
    subject = models.CharField(max_length=512, help_text=_("identifier of the person in the user database"))
35
    last_used = models.DateField(auto_now=True, help_text=_("last time this identity has been used"))
36
37
38
    profile = models.CharField(max_length=1024, blank=True, help_text=_("URL profile information"))
    picture = models.CharField(max_length=1024, blank=True, help_text=_("URL to profile picture"))
    fullname = models.CharField(_("full name"), max_length=255, blank=True)
39
    username = models.CharField(max_length=512, blank=True, help_text=_("username in the remote user database"))
Enrico Zini's avatar
Enrico Zini committed
40

41
42
    objects = IdentityManager()

Enrico Zini's avatar
Enrico Zini committed
43
    def __str__(self):
44
45
46
47
        if self.username:
            return f"{self.person if self.person else '-'}:{self.issuer}:{self.subject} (aka {self.username})"
        else:
            return f"{self.person if self.person else '-'}:{self.issuer}:{self.subject} (no remote username)"
48

49
50
51
52
53
    def get_provider(self) -> providers.Provider:
        """
        Return the Provider information for this Identity
        """
        return providers.get(self.issuer)
54

55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
    def update(
            self,
            profile=None, picture=None, fullname=None, username=None,
            audit_author=None, audit_notes="", audit_skip=False):
        """
        Update profile information with extra data that may be provided in
        import / login flows. Save only if needed.
        """
        changed = False

        if profile is not None and self.profile != profile:
            self.profile = profile
            changed = True

        if picture is not None and self.picture != picture:
            self.picture = picture
            changed = True

        if fullname is not None and self.fullname != fullname:
            self.fullname = fullname
            changed = True

        if username is not None and self.username != username:
            self.username = username
            changed = True

        if changed:
            self.save(
                audit_author=audit_author, audit_notes=audit_notes,
                audit_skip=audit_skip)

        return changed

88
89
90
91
92
93
    def save(self, *args, **kw):
        """
        Save, and add an entry to the Identity audit log.

        Extra arguments that can be passed:

94
            audit_author: user model instance of the person doing the change
95
96
97
98
99
100
101
102
103
104
105
106
            audit_notes: free form text annotations for this change
            audit_skip: skip audit logging, used only for tests

        """
        # Extract our own arguments, so that they are not passed to django
        author = kw.pop("audit_author", None)
        notes = kw.pop("audit_notes", "")
        audit_skip = kw.pop("audit_skip", False)

        if audit_skip:
            changes = None
        else:
107
108
            # Get the previous version of the object, so that diff() can
            # compute differences
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
            if self.pk:
                old = Identity.objects.get(pk=self.pk)
            else:
                old = None

            changes = Identity.diff(old, self)
            if changes and not author:
                raise RuntimeError("Cannot modify an Identity instance without providing Author information")

        # Perform the save; if we are creating a new identity, this will also
        # fill in the id/pk field, so that IdentityAuditLog can link to us
        super().save(*args, **kw)

        # Finally, create the identity audit log entry
        if changes:
            IdentityAuditLog.objects.create(
                    identity=self, author=author, notes=notes, changes=IdentityAuditLog.serialize_changes(changes))

    @classmethod
    def diff(cls, old_obj: "Identity", new_obj: "Identity"):
        """
        Compute the changes between two different instances of a Person model
        """
        def to_dict(obj):
            res = model_to_dict(obj, exclude=["id", "last_used", "person"])
            if obj.person is None:
                res["person"] = None
            else:
137
                res["person"] = str(obj.person)
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
            return res

        changes = {}
        if old_obj is None:
            for k, nv in list(to_dict(new_obj).items()):
                changes[k] = [None, nv]
        else:
            old = to_dict(old_obj)
            new = to_dict(new_obj)
            for k, nv in list(new.items()):
                ov = old.get(k, None)
                # Also ignore changes like None -> ""
                if ov != nv and (ov or nv):
                    changes[k] = [ov, nv]
        return changes


class IdentityAuditLog(models.Model):
    identity = models.ForeignKey(Identity, related_name="audit_log", on_delete=models.CASCADE)
    logdate = models.DateTimeField(null=False, auto_now_add=True)
158
    author = models.ForeignKey(get_user_model(), related_name="+", null=False, on_delete=models.CASCADE)
159
160
161
162
163
    notes = models.TextField(null=False, default="")
    changes = models.TextField(null=False, default="{}")

    def __str__(self):
        return "{:%Y-%m-%d %H:%S}:{}: {}:{}".format(
164
                self.logdate, self.identity, self.author, self.notes)
165
166
167
168
169
170
171
172
173
174
175
176

    @classmethod
    def serialize_changes(cls, changes):
        class Serializer(json.JSONEncoder):
            def default(self, o):
                if isinstance(o, datetime.datetime):
                    return o.strftime("%Y-%m-%d %H:%M:%S")
                elif isinstance(o, datetime.date):
                    return o.strftime("%Y-%m-%d")
                else:
                    return json.JSONEncoder.default(self, o)
        return json.dumps(changes, cls=Serializer)
Enrico Zini's avatar
Enrico Zini committed
177
178
179

    def get_changes_list(self):
        return sorted((k, v[0], v[1]) for k, v in list(json.loads(self.changes).items()))