unittest.py 18.9 KB
Newer Older
1
from __future__ import annotations
Enrico Zini's avatar
Enrico Zini committed
2
import backend.models as bmodels
3
import legacy.models as lmodels
4
from backend.models import Person, AM, Fingerprint, _build_fullname
Enrico Zini's avatar
Enrico Zini committed
5
from backend import const
6
from dsa.models import LDAPFields
7
from signon.models import Identity
8
from django.conf import settings
Enrico Zini's avatar
Enrico Zini committed
9
from django.utils.timezone import now
10
from django.contrib.auth.backends import ModelBackend
11
from django.test import Client
Enrico Zini's avatar
Enrico Zini committed
12
from rest_framework.test import APIClient
13
from collections import defaultdict
Enrico Zini's avatar
Enrico Zini committed
14
import nm2.lib.unittest
Enrico Zini's avatar
Enrico Zini committed
15
import contextlib
Enrico Zini's avatar
Enrico Zini committed
16
17
18
19
import datetime
import re


Enrico Zini's avatar
Enrico Zini committed
20
class NamedObjects(nm2.lib.unittest.NamedObjects):
Enrico Zini's avatar
Enrico Zini committed
21
22
23
24
25
26
27
28
29
30
    def create(self, _name, **kw):
        self._update_kwargs_with_defaults(_name, kw)
        self[_name] = o = self._model.objects.create(**kw)
        return o


class TestPersons(NamedObjects):
    def __init__(self, **defaults):
        defaults.setdefault("cn", lambda name, **kw: name.capitalize())
        defaults.setdefault("email", "{_name}@example.org")
31
        defaults.setdefault("email_ldap", "{_name}@example.org")
Enrico Zini's avatar
Enrico Zini committed
32
33
        super(TestPersons, self).__init__(Person, **defaults)

34
    def create(self, _name, **kw):
Enrico Zini's avatar
Enrico Zini committed
35
        self._update_kwargs_with_defaults(_name, kw)
36
37
38
39
40
41
42
43
44
45
46
47
48

        # Allow to create Person and LDAPFields in one shot
        ldap_fields = {}
        for field in ("cn", "mn", "sn", "uid"):
            if field in kw:
                ldap_fields[field] = kw.pop(field)
        if "email_ldap" in kw:
            ldap_fields["email"] = kw.pop("email_ldap")
        ldap_fields.setdefault("uid", _name)

        if "fullname" not in kw:
            kw["fullname"] = _build_fullname(ldap_fields.get("cn"), ldap_fields.get("mn"), ldap_fields.get("sn"))

Enrico Zini's avatar
Enrico Zini committed
49
        self[_name] = o = self._model.objects.create_user(audit_skip=True, **kw)
50
        LDAPFields.objects.create(person=o, audit_skip=True, **ldap_fields)
51

Enrico Zini's avatar
Enrico Zini committed
52
53
54
        return o


Enrico Zini's avatar
Enrico Zini committed
55
56
57
58
59
60
61
62
63
64
65
66
class TestKeys(NamedObjects):
    def __init__(self, **defaults):
        from keyring.models import Key
        super(TestKeys, self).__init__(Key, **defaults)

    def create(self, _name, **kw):
        self._update_kwargs_with_defaults(_name, kw)
        self._model.objects.test_preload(_name)
        self[_name] = o = self._model.objects.get_or_download(_name, **kw)
        return o


67
68
69
70
class TestAuthenticationBackend(ModelBackend):
    pass


Enrico Zini's avatar
Enrico Zini committed
71
class TestBase(nm2.lib.unittest.TestBase):
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
    TEST_AUTH_BACKEND = "backend.unittest.TestAuthenticationBackend"

    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        if cls.TEST_AUTH_BACKEND not in settings.AUTHENTICATION_BACKENDS:
            settings.AUTHENTICATION_BACKENDS.append(cls.TEST_AUTH_BACKEND)

    @classmethod
    def tearDownClass(cls):
        if cls.TEST_AUTH_BACKEND in settings.AUTHENTICATION_BACKENDS:
            settings.AUTHENTICATION_BACKENDS.remove(cls.TEST_AUTH_BACKEND)
        super().tearDownClass()

    def make_test_client(self, person, signon_identities=()):
Enrico Zini's avatar
Enrico Zini committed
87
88
89
90
91
92
93
        """
        Instantiate a test client, logging in the given person.

        If person is None, visit anonymously. If person is None but
        sso_username is not None, authenticate as the given sso_username even
        if a Person record does not exist.
        """
94
95
96
97
        client = Client()
        for identity in signon_identities:
            identity = self.identities[identity]
            if identity.issuer == "salsa":
98
99
100
                session = client.session
                session["signon_identity_salsa"] = identity.pk
                session.save()
101
102
103
104
105
            elif identity.issuer == "debsso":
                client.defaults["SSL_CLIENT_S_DN_CN"] = identity.subject
            else:
                raise NotImplementedError(f"{identity.issuer} not supported as identity during testing")

106
107
        if isinstance(person, str):
            person = self.persons[person]
Enrico Zini's avatar
Enrico Zini committed
108
        if person is not None:
109
            client.force_login(person, backend=self.TEST_AUTH_BACKEND)
110
111
        client.visitor = person
        return client
Enrico Zini's avatar
Enrico Zini committed
112

113
    def make_test_apiclient(self, person, signon_identities=()):
Enrico Zini's avatar
Enrico Zini committed
114
115
116
117
118
119
120
        """
        Instantiate a test client, logging in the given person.

        If person is None, visit anonymously. If person is None but
        sso_username is not None, authenticate as the given sso_username even
        if a Person record does not exist.
        """
121
122
123
124
125
126
127
128
129
130
131
        client = APIClient()
        for identity in signon_identities:
            identity = self.identities[identity]
            if identity.issuer == "salsa":
                client.session[f"signon_identity_salsa"] = identity.pk
                client.session.save()
            elif identity.issuer == "debsso":
                client.defaults["SSL_CLIENT_S_DN_CN"] = identity.subject
            else:
                raise NotImplementedError(f"{identity.issuer} not supported as identity during testing")

132
133
        if isinstance(person, str):
            person = self.persons[person]
Enrico Zini's avatar
Enrico Zini committed
134
        if person is not None:
135
            client.force_login(person, backend=self.TEST_AUTH_BACKEND)
Enrico Zini's avatar
Enrico Zini committed
136
137
138
        client.visitor = person
        return client

139
140
141
142
143
144
    def assertPermissionDenied(self, response):
        if response.status_code == 403:
            pass
        else:
            self.fail("response has status code {} instead of a 403 Forbidden".format(response.status_code))

145
146
147
148
149
150
151
152
153
154
155
    def assertFormRedirectMatches(self, response, target, form_name="form"):
        if response.status_code == 200:
            form = response.context[form_name]
            if form.errors:
                self.fail("{} did not validate. Errors: {}".format(form_name, repr(form.errors)))
            self.fail("response has status code 200 instead of redirecting, and did not find any form errors")
        if response.status_code != 302:
            self.fail("response has status code {} instead of a Redirect".format(response.status_code))
        if target and not re.search(target, response["Location"]):
            self.fail("response redirects to {} which does not match {}".format(response["Location"], target))

Enrico Zini's avatar
Enrico Zini committed
156
157
158
159
160
161
    def assertContainsElements(self, response, elements, *names):
        """
        Check that the response contains only the elements in `names` from PageElements `elements`
        """
        want = set(names)
        extras = want - set(elements.keys())
Enrico Zini's avatar
Enrico Zini committed
162
163
        if extras:
            raise RuntimeError("Wanted elements not found in the list of possible ones: {}".format(", ".join(extras)))
Enrico Zini's avatar
Enrico Zini committed
164
165
166
        should_have = []
        should_not_have = []
        content = response.content.decode("utf-8")
Enrico Zini's avatar
Enrico Zini committed
167
        for name, regex in elements.items():
Enrico Zini's avatar
Enrico Zini committed
168
169
170
171
172
173
174
175
            if name in want:
                if not regex.search(content):
                    should_have.append(name)
            else:
                if regex.search(content):
                    should_not_have.append(name)
        if should_have or should_not_have:
            msg = []
Enrico Zini's avatar
Enrico Zini committed
176
177
178
179
            if should_have:
                msg.append("should have element(s) {}".format(", ".join(should_have)))
            if should_not_have:
                msg.append("should not have element(s) {}".format(", ".join(should_not_have)))
Enrico Zini's avatar
Enrico Zini committed
180
            self.fail("page " + " and ".join(msg))
Enrico Zini's avatar
Enrico Zini committed
181

Enrico Zini's avatar
Enrico Zini committed
182

183
class BaseFixtureMixin(TestBase):
Enrico Zini's avatar
Enrico Zini committed
184
185
186
    @classmethod
    def get_persons_defaults(cls):
        """
187
188
189
190
        Get default arguments for test persons
        """
        return {}

Enrico Zini's avatar
Enrico Zini committed
191
192
    @classmethod
    def setUpClass(cls):
193
        import process.models as pmodels
194
        super(BaseFixtureMixin, cls).setUpClass()
Enrico Zini's avatar
Enrico Zini committed
195
196
        cls.add_named_objects(
            persons=TestPersons(**cls.get_persons_defaults()),
197
            identities=NamedObjects(Identity),
Enrico Zini's avatar
Enrico Zini committed
198
199
200
201
202
            fingerprints=NamedObjects(Fingerprint),
            ams=NamedObjects(AM),
            processes=NamedObjects(pmodels.Process),
            keys=TestKeys()
        )
Enrico Zini's avatar
Enrico Zini committed
203

Enrico Zini's avatar
Enrico Zini committed
204
        # Preload keys
205
206
        cls.keys.create("66B4DFB68CB24EBBD8650BC4F4B4B0CC797EBFAB")
        cls.keys.create("1793D6AB75663E6BF104953A634F4BD1E7AD5568")
Enrico Zini's avatar
Enrico Zini committed
207
        cls.keys.create("0EED77DC41D760FDE44035FF5556A34E04A3610B")
208

209
210
211
212
213
214
215
216
    @classmethod
    def create_person(cls, name, *args, alioth=False, username=None, **kw):
        if username is None:
            if alioth:
                username = name + "-guest@users.alioth.debian.org"
            else:
                username = name + "@debian.org"
        p = cls.persons.create(name, *args, **kw)
217
218
219
        # cls.identities.create(
        #         f"{name}_debsso", person=p, issuer="debsso", subject=username, username=username,
        #         audit_skip=True)
220
221
        return p

222

Enrico Zini's avatar
Enrico Zini committed
223
224
225
226
227
228
229
230
231
232
233
234
235
236
class OpFixtureMixin(BaseFixtureMixin):
    def check_op(self, o, check_contents):
        # FIXME: compatibility, remove when code has been ported
        return self.assertOperationSerializes(o, check_contents)

    def assertOperationSerializes(self, o, check_contents=None):
        """
        When run with check_contents set to a function, it
        serializes/deserializes o and checks its contents with the
        check_contents function.

        When run without check_contents, it acts as a decorator, allowing a
        shorter invocation, like this:

Enrico Zini's avatar
Enrico Zini committed
237
238
239
240
           @self.assertOperationSerializes(operation_to_check)
           def _(o):
               self.assertEqual(o.audit_author, self.persons.fd)
               # ...more checks on o...
Enrico Zini's avatar
Enrico Zini committed
241
242
243
244
245
246
247
248
249
250
251
252
253
        """
        if check_contents is None:
            # Act as a decorator
            def run_test(check_contents):
                self.assertOperationSerializes(o, check_contents)
            return run_test
        else:
            # Actually run the test
            from backend import ops
            cls = o.__class__

            self.assertIsInstance(o.audit_time, datetime.datetime)
            check_contents(o)
Enrico Zini's avatar
Enrico Zini committed
254

Enrico Zini's avatar
Enrico Zini committed
255
256
257
258
259
260
261
            d = o.to_dict()
            o = cls(**d)
            self.assertIsInstance(o.audit_time, datetime.datetime)
            check_contents(o)

            j = o.to_json()
            o = ops.Operation.from_json(j)
Enrico Zini's avatar
Enrico Zini committed
262
            self.assertIsInstance(o, cls)
Enrico Zini's avatar
Enrico Zini committed
263
264
265
266
267
268
269
270
271
272
273
            self.assertIsInstance(o.audit_time, datetime.datetime)
            check_contents(o)

    @contextlib.contextmanager
    def collect_operations(self):
        from backend import ops
        with ops.Operation.test_collect() as ops:
            yield ops


class PersonFixtureMixin(OpFixtureMixin):
274
275
276
277
278
279
    """
    Pre-create some persons
    """
    @classmethod
    def setUpClass(cls):
        super(PersonFixtureMixin, cls).setUpClass()
Enrico Zini's avatar
Enrico Zini committed
280
        # pending account
281
        cls.create_person(
Enrico Zini's avatar
Enrico Zini committed
282
283
                "pending", status=const.STATUS_DC, expires=now() +
                datetime.timedelta(days=1), pending="12345", alioth=True)
Enrico Zini's avatar
Enrico Zini committed
284
        # debian contributor
285
        cls.create_person("dc", status=const.STATUS_DC, alioth=True)
Enrico Zini's avatar
Enrico Zini committed
286
        # debian contributor with guest account
287
        cls.create_person("dc_ga", status=const.STATUS_DC_GA, alioth=True)
Enrico Zini's avatar
Enrico Zini committed
288
        # dm
289
        cls.create_person("dm", status=const.STATUS_DM, alioth=True)
Enrico Zini's avatar
Enrico Zini committed
290
        # dm with guest account
291
        cls.create_person("dm_ga", status=const.STATUS_DM_GA, alioth=True)
Enrico Zini's avatar
Enrico Zini committed
292
        # dd, nonuploading
293
        cls.create_person("dd_nu", status=const.STATUS_DD_NU)
Enrico Zini's avatar
Enrico Zini committed
294
        # dd, uploading
295
        cls.create_person("dd_u", status=const.STATUS_DD_U)
296
        # dd, emeritus
297
        cls.create_person("dd_e", status=const.STATUS_EMERITUS_DD)
298
        # dd, removed
299
        cls.create_person("dd_r", status=const.STATUS_REMOVED_DD)
300
        # unrelated active am
301
        activeam = cls.create_person("activeam", status=const.STATUS_DD_NU)
302
303
        cls.ams.create("activeam", person=activeam)
        # inactive am
304
        oldam = cls.create_person("oldam", status=const.STATUS_DD_NU)
305
        cls.ams.create("oldam", person=oldam, is_am=False)
Enrico Zini's avatar
Enrico Zini committed
306
        # fd
307
        fd = cls.create_person("fd", status=const.STATUS_DD_NU)
Enrico Zini's avatar
Enrico Zini committed
308
309
        cls.ams.create("fd", person=fd, is_fd=True)
        # dam
310
        dam = cls.create_person("dam", status=const.STATUS_DD_U)
Enrico Zini's avatar
Enrico Zini committed
311
        cls.ams.create("dam", person=dam, is_fd=True, is_dam=True)
312
313


314
315
316
317
318
319
class TestSet(set):
    """
    Set of strings that can be initialized from space-separated strings, and
    changed with simple text patches.
    """
    def __init__(self, initial=""):
Enrico Zini's avatar
Enrico Zini committed
320
321
        if initial:
            self.update(initial.split())
322
323
324
325
326
327
328
329
330
331
332
333

    def set(self, vals):
        self.clear()
        self.update(vals.split())

    def patch(self, diff):
        for change in diff.split():
            if change[0] == "+":
                self.add(change[1:])
            elif change[0] == "-":
                self.discard(change[1:])
            else:
Enrico Zini's avatar
Enrico Zini committed
334
335
                raise RuntimeError("Changes {} contain {} that is nether an add nor a remove".format(
                    repr(diff), repr(change)))
336
337
338
339
340
341
342

    def clone(self):
        res = TestSet()
        res.update(self)
        return res


343
344
345
346
347
348
349
350
class PatchExact(object):
    def __init__(self, text):
        if text:
            self.items = set(text.split())
        else:
            self.items = set()

    def apply(self, cur):
Enrico Zini's avatar
Enrico Zini committed
351
352
        if self.items:
            return set(self.items)
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
        return None


class PatchDiff(object):
    def __init__(self, text):
        self.added = set()
        self.removed = set()
        for change in text.split():
            if change[0] == "+":
                self.added.add(change[1:])
            elif change[0] == "-":
                self.removed.add(change[1:])
            else:
                raise RuntimeError("Changes {} contain {} that is nether an add nor a remove".format(text, change))

    def apply(self, cur):
        if cur is None:
            cur = set(self.added)
        else:
            cur = (cur - self.removed) | self.added
Enrico Zini's avatar
Enrico Zini committed
373
374
        if not cur:
            return None
375
376
377
        return cur


378
class ExpectedSets(defaultdict):
379
380
381
    """
    Store the permissions expected out of a *VisitorPermissions object
    """
382
    def __init__(self, testcase, action_msg="{visitor}", issue_msg="{problem} {mismatch}"):
383
        super(ExpectedSets, self).__init__(TestSet)
384
        self.testcase = testcase
385
386
387
388
389
        self.action_msg = action_msg
        self.issue_msg = issue_msg

    @property
    def visitors(self):
Enrico Zini's avatar
Enrico Zini committed
390
        return list(self.keys())
391

392
393
394
395
396
    def __getitem__(self, key):
        if key == "-":
            key = None
        return super().__getitem__(key)

397
    def set(self, visitors, text):
398
399
        for v in visitors.split():
            self[v].set(text)
400
401

    def patch(self, visitors, text):
402
403
        for v in visitors.split():
            self[v].patch(text)
404
405
406
407
408
409
410
411

    def select_others(self, persons):
        other_visitors = set(persons.keys())
        other_visitors.add(None)
        other_visitors -= set(self.keys())
        return other_visitors

    def combine(self, other):
412
        res = ExpectedSets(self.testcase, action_msg=self.action_msg, issue_msg=self.issue_msg)
Enrico Zini's avatar
Enrico Zini committed
413
        for k, v in list(self.items()):
414
            res[k] = v.clone()
Enrico Zini's avatar
Enrico Zini committed
415
        for k, v in list(other.items()):
416
            res[k].update(v)
417
418
        return res

419
    def assertEqual(self, visitor, got):
420
421
        got = set(got)
        wanted = self.get(visitor, set())
Enrico Zini's avatar
Enrico Zini committed
422
423
        if got == wanted:
            return
424
425
426
        extra = got - wanted
        missing = wanted - got
        msgs = []
Enrico Zini's avatar
Enrico Zini committed
427
428
429
430
        if missing:
            msgs.append(self.issue_msg.format(problem="misses", mismatch=", ".join(sorted(missing))))
        if extra:
            msgs.append(self.issue_msg.format(problem="has extra", mismatch=", ".join(sorted(extra))))
431
        self.testcase.fail(self.action_msg.format(visitor=visitor) + " " + " and ".join(msgs))
432

433
    def assertEmpty(self, visitor, got):
434
        extra = set(got)
Enrico Zini's avatar
Enrico Zini committed
435
436
437
438
439
        if not extra:
            return
        self.testcase.fail(
                self.action_msg.format(visitor=visitor) + " " +
                self.issue_msg.format(problem="has", mismatch=", ".join(sorted(extra))))
440

441
442
443
444
445
446
447
448
    def assertMatches(self, visited):
        for visitor in self.visitors:
            visit_perms = visited.permissions_of(self.testcase.persons[visitor])
            self.assertEqual(visitor, visit_perms)
        for visitor in self.select_others(self.testcase.persons):
            visit_perms = visited.permissions_of(self.testcase.persons[visitor] if visitor else None)
            self.assertEmpty(visitor, visit_perms)

449

450
451
452
453
class ExpectedPerms(object):
    """
    Store the permissions expected out of a *VisitorPermissions object
    """
454
    def __init__(self, perms=None):
Enrico Zini's avatar
Enrico Zini committed
455
456
        if perms is None:
            perms = {}
457
        self.perms = {}
458
        for visitors, expected_perms in perms.items():
459
460
461
462
            for visitor in visitors.split():
                self.perms[visitor] = set(expected_perms.split())

    def _apply_diff(self, d, diff):
463
        for visitors, change in diff.items():
464
465
466
467
468
469
470
471
472
473
474
            for visitor in visitors.split():
                cur = change.apply(d.get(visitor, None))
                if not cur:
                    d.pop(visitor, None)
                else:
                    d[visitor] = cur

    def update_perms(self, diff):
        self._apply_diff(self.perms, diff)

    def set_perms(self, visitors, text):
Enrico Zini's avatar
Enrico Zini committed
475
        self.update_perms({visitors: PatchExact(text)})
476
477

    def patch_perms(self, visitors, text):
Enrico Zini's avatar
Enrico Zini committed
478
        self.update_perms({visitors: PatchDiff(text)})
479
480


Enrico Zini's avatar
Enrico Zini committed
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
class PageElements(dict):
    """
    List of all page elements possibly expected in the results of a view.

    dict matching name used to refer to the element with regexp matching the
    element.
    """
    def add_id(self, id):
        self[id] = re.compile(r"""id\s*=\s*["']{}["']""".format(re.escape(id)))

    def add_class(self, cls):
        self[cls] = re.compile(r"""class\s*=\s*["']{}["']""".format(re.escape(cls)))

    def add_href(self, name, url):
        self[name] = re.compile(r"""href\s*=\s*["']{}["']""".format(re.escape(url)))

497
498
499
    def add_th(self, name, title):
        self[name] = re.compile(r"""<th>\s*{}\s*</th>""".format(re.escape(title)))

Enrico Zini's avatar
Enrico Zini committed
500
501
    def add_string(self, name, term):
        self[name] = re.compile(r"""{}""".format(re.escape(term)))
Enrico Zini's avatar
Enrico Zini committed
502

503
504
    def clone(self):
        res = PageElements()
Enrico Zini's avatar
Enrico Zini committed
505
        res.update(self.items())
506
507
        return res

Enrico Zini's avatar
Enrico Zini committed
508
509
510

class TestOldProcesses(NamedObjects):
    def __init__(self, **defaults):
511
        super(TestOldProcesses, self).__init__(lmodels.Process, **defaults)
Enrico Zini's avatar
Enrico Zini committed
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
        defaults.setdefault("progress", const.PROGRESS_APP_NEW)

    def create(self, _name, advocates=[], **kw):
        self._update_kwargs_with_defaults(_name, kw)

        if "process" in kw:
            kw.setdefault("is_active", kw["process"] not in (const.PROGRESS_DONE, const.PROGRESS_CANCELLED))
        else:
            kw.setdefault("is_active", True)

        if "manager" in kw:
            try:
                am = kw["manager"].am
            except bmodels.AM.DoesNotExist:
                am = bmodels.AM.objects.create(person=kw["manager"])
            kw["manager"] = am

        self[_name] = o = self._model.objects.create(**kw)
        for a in advocates:
            o.advocates.add(a)
        return o


class OldProcessFixtureMixin(PersonFixtureMixin):
    @classmethod
    def get_processes_defaults(cls):
        """
        Get default arguments for test processes
        """
        return {}

    @classmethod
    def setUpClass(cls):
        super(OldProcessFixtureMixin, cls).setUpClass()
        cls.processes = TestOldProcesses(**cls.get_processes_defaults())