__init__.py 10.6 KB
Newer Older
1
2
3
4
5
#   test_upload.py — UploadController test cases
#
#   This file is part of debexpo
#   https://salsa.debian.org/mentors.debian.net-team/debexpo
#
6
#   Copyright © 2018-2020 Baptiste Beauplat <lyknode@cilg.org>
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#
#   Permission is hereby granted, free of charge, to any person
#   obtaining a copy of this software and associated documentation
#   files (the "Software"), to deal in the Software without
#   restriction, including without limitation the rights to use,
#   copy, modify, merge, publish, distribute, sublicense, and/or sell
#   copies of the Software, and to permit persons to whom the
#   Software is furnished to do so, subject to the following
#   conditions:
#
#   The above copyright notice and this permission notice shall be
#   included in all copies or substantial portions of the Software.
#
#   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
#   EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
#   OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
#   NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
#   HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
#   WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
#   FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
#   OTHER DEALINGS IN THE SOFTWARE.

29
import logging
30
import socket
31

32
33
from os import walk
from os.path import isdir, join, dirname
34
from shutil import rmtree, copytree
35
from tempfile import TemporaryDirectory
36

37
from django.core import mail
38
from django.urls import reverse
39
from django_redis.pool import ConnectionFactory
40
41
42

from debexpo.importer.models import Importer, Spool
from debexpo.packages.models import Package, PackageUpload
43
from debexpo.packages.tasks import remove_uploads
44
45
from debexpo.accounts.models import User
from debexpo.comments.models import PackageSubscription
46
from debexpo.plugins.models import PluginResults
47
48
from tests import TestController
from tests.functional.importer.source_package import TestSourcePackage
49
50
51

log = logging.getLogger(__name__)

52

53
54
55
56
57
58
59
60
61
62
63
def test_network():
    socket.setdefaulttimeout(2)

    try:
        socket.gethostbyname('bugs.debian.org')
    except socket.error as e:
        return e

    return None


64
65
66
67
68
class FakeConnectionFactory(ConnectionFactory):
    def get_connection(self, params):
        return self.redis_client_cls(**self.redis_client_cls_kwargs)


69
70
71
72
73
74
75
76
77
78
class TestImporterController(TestController):
    """
    Toolbox for importer tests

    This class setup an environment for package to be imported in and ensure
    cleanup afterward.
    """

    def __init__(self, *args, **kwargs):
        TestController.__init__(self, *args, **kwargs)
79
        self.data_dir = join(dirname(__file__), 'data')
80
81
82

    def setUp(self):
        self._setup_example_user(gpg=True)
83
        self.spool_dir = TemporaryDirectory(prefix='debexpo-test-spool')
Baptiste Beauplat's avatar
Baptiste Beauplat committed
84
85
        self.repository_dir = TemporaryDirectory(prefix='debexpo-test-repo')
        self.repository = self.repository_dir.name
86
        self.spool = Spool(self.spool_dir.name)
87
88

    def tearDown(self):
89
        self._assert_no_leftover(str(self.spool))
90
        self._remove_subscribers()
91
        self._remove_example_user()
Baptiste Beauplat's avatar
Baptiste Beauplat committed
92
        self._cleanup_package()
93

94
    def _remove_subscribers(self):
95
        PackageSubscription.objects.all().delete()
96

97
98
    def setup_subscribers(self, package, on_upload=True, on_comment=False):
        user = User.objects.get(email='email@example.com')
99

100
101
102
103
        subscription = PackageSubscription(package=package, user=user,
                                           on_upload=on_upload,
                                           on_comment=on_comment)
        subscription.save()
104

105
106
    def _cleanup_mailbox(self):
        mail.outbox = []
107

108
    def _upload_package(self, package_dir, sub_dir=None):
109
110
        """Copy a directory content to incoming queue"""
        # copytree dst dir must not exist
111
112
        upload_dir = self.spool.get_queue_dir('incoming')

113
114
115
        if sub_dir:
            upload_dir = join(upload_dir, sub_dir)

116
117
118
        if isdir(upload_dir):
            rmtree(upload_dir)
        copytree(package_dir, upload_dir)
119

120
121
    def _get_email(self):
        """Parse an email and format the body."""
122
        return mail.outbox[0].body
123

Baptiste Beauplat's avatar
Baptiste Beauplat committed
124
    def _cleanup_package(self):
125
        Package.objects.all().delete()
Baptiste Beauplat's avatar
Baptiste Beauplat committed
126

127
    def _package_in_repo(self, package_name, version):
Baptiste Beauplat's avatar
Baptiste Beauplat committed
128
129
130
131
        """Check if package is present in repo"""
        matches = self._find_file(package_name + '_' + version + '.dsc',
                                  self.repository)
        return len(matches)
132

133
    def _file_in_repo(self, filename):
Baptiste Beauplat's avatar
Baptiste Beauplat committed
134
135
136
137
        """Check if package is present in repo"""
        matches = self._find_file(filename,
                                  self.repository)
        return len(matches)
138

139
    def _find_file(self, name, path):
140
141
142
143
144
145
146
        """Find a file in a path"""
        result = []
        for root, dirs, files in walk(path):
            if name in files:
                result.append(join(root, name))
        return result

147
148
149
150
151
152
153
    def remove_package(self, package, version):
        uploads = PackageUpload.objects.filter(package__name=package,
                                               version=version)

        with self.settings(REPOSITORY=self.repository):
            remove_uploads(uploads)

154
    def import_source_package(self, package_dir, skip_gpg=False,
155
                              skip_email=False, base_dir=None, sub_dir=None):
156
        source_package = TestSourcePackage(package_dir, base_dir)
157

158
        source_package.build(not skip_gpg)
159
        self._run_importer(source_package.get_package_dir(), skip_gpg=skip_gpg,
160
                           skip_email=skip_email, sub_dir=sub_dir)
161

162
    def import_package(self, package_dir):
163
164
        self._run_importer(join(self.data_dir, package_dir))

165
166
    def _run_importer(self, package_dir, skip_gpg=False, skip_email=False,
                      sub_dir=None):
167
        """Run debexpo importer on package_dir/*.changes"""
168
        # Copy uplod files to incomming queue
169
        self.assertTrue(isdir(package_dir))
170
        self._upload_package(package_dir, sub_dir)
171
172

        # Run the importer on change file
173
        with self.settings(REPOSITORY=self.repository):
174
            importer = Importer(str(self.spool), skip_email, skip_gpg)
175
            self._status_importer = importer.process_spool()
176
177
178

    def assert_importer_failed(self):
        """Assert that the importer has failed"""
179
        self.assertFalse(self._status_importer)
180
181
182

    def assert_importer_succeeded(self):
        """Assert that the importer has succeeded"""
183
        self.assertTrue(self._status_importer)
184
185
186

    def assert_no_email(self):
        """Assert that the importer has not generated any email"""
187
        # The mailbox file has not been created
188
        self.assertFalse(mail.outbox)
189
190
191
192
193
194

    def assert_email_with(self, search_text):
        """
        Assert that the imported would have sent a email to the uploader
        containing search_text
        """
195
        # The mailbox file has been created
196
        self.assertTrue(mail.outbox)
197

198
199
        # Get the email and assert that the body contains search_text
        email = self._get_email()
200
        self.assertIn(search_text, email)
201
202

    def assert_package_count(self, package_name, version, count):
203
        """Assert that a package appears count times in debexpo"""
204
205
206
        try:
            package = Package.objects.get(name=package_name)
        except Package.DoesNotExist:
207
            count_in_db = 0
208
209
210
211
212
213
        else:
            count_in_db = PackageUpload.objects \
                .filter(package=package) \
                .filter(version=version) \
                .count()

214
        self.assertTrue(count_in_db == count)
215

216
217
218
219
    def _lookup_plugin_result(self, package_name, plugin):
        plugin_result = PluginResults.objects \
            .filter(upload__package__name=package_name, plugin=plugin) \
            .all()
220

221
        return plugin_result
222

223
224
225
    def assert_plugin_result_count(self, package_name, plugin, count):
        plugin_result = self._lookup_plugin_result(package_name, plugin)
        self.assertEquals(count, len(plugin_result))
226

227
228
229
230
231
232
    def assert_plugin_result(self, package_name, plugin, outcome):
        plugin_results = self._lookup_plugin_result(package_name, plugin)
        self.assertTrue(plugin_results)

        for result in plugin_results:
            if result.outcome == outcome:
233
                return result.data
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251

        raise Exception(f'Plugin result not found for outcome == {outcome}')

    def assert_plugin_severity(self, package_name, plugin, severity):
        plugin_results = self._lookup_plugin_result(package_name, plugin)
        self.assertTrue(plugin_results)

        for result in plugin_results:
            if result.severity == severity:
                return

        raise Exception(f'Plugin result not found for severity == {severity}')

    def assert_plugin_template(self, package_name, outcome):
        response = self.client.get(reverse('package', args=[package_name]))

        self.assertIn(outcome, str(response.content))

252
253
254
255
256
257
258
259
260
261
262
    def assert_in_plugin_data(self, package_name, plugin, data):
        plugin_results = self._lookup_plugin_result(package_name, plugin)
        self.assertTrue(plugin_results)

        for result in plugin_results:
            for key, value in data.items():
                if key not in result.data or not result.data[key] or \
                        value not in result.data[key]:
                    raise Exception('Plugin result not found for data '
                                    f'contains: {data}\ndata: {result.data}')

263
264
265
266
267
268
269
270
    def assert_plugin_data(self, package_name, plugin, data):
        plugin_results = self._lookup_plugin_result(package_name, plugin)
        self.assertTrue(plugin_results)

        for result in plugin_results:
            if data == result.data:
                return

271
272
        raise Exception(f'Plugin result not found for data contains: {data}\n'
                        f'in{result.data}')
273

274
    def assert_file_in_repo(self, filename):
Baptiste Beauplat's avatar
Baptiste Beauplat committed
275
276
277
        """Assert that a file is present in debexpo repo"""
        log.debug('Checking file in repo: {}'.format(filename))
        self.assertTrue(self._file_in_repo(filename) > 0)
278

279
    def assert_package_in_repo(self, package_name, version):
Baptiste Beauplat's avatar
Baptiste Beauplat committed
280
281
        """Assert that a package is present in debexpo repo"""
        self.assertTrue(self._package_in_repo(package_name, version) > 0)
282
283

    def assert_package_not_in_repo(self, package_name, version):
Baptiste Beauplat's avatar
Baptiste Beauplat committed
284
285
        """Assert that a package is present in debexpo repo"""
        self.assertTrue(self._package_in_repo(package_name, version) == 0)
286
287

    def assert_rfs_content(self, package, content):
288
289
290
291
        response = self.client.get(reverse('package_rfs', args=[package]))

        self.assertEquals(response.status_code, 200)
        self.assertIn(content, str(response.content))