...
 
Commits (2)
......@@ -21,6 +21,7 @@ Test the backups action script.
import json
import os
import shutil
import subprocess
import tempfile
import unittest
import uuid
......@@ -35,15 +36,23 @@ from plinth.tests import config as test_config
euid = os.geteuid()
def _borg_is_installed():
"""Return whether borg is installed on the system."""
try:
subprocess.run(['borg', '--version'], stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL, check=True)
return True
except FileNotFoundError:
return False
class TestBackups(unittest.TestCase):
"""Test creating, reading and deleting a repository"""
# try to access a non-existing url and a URL that exists but does not
# grant access
nonexisting_repo_url = "user@%s.com.au:~/repo" % str(uuid.uuid1())
inaccessible_repo_url = "user@heise.de:~/repo"
dummy_credentials = {
'ssh_password': 'invalid_password'
}
dummy_credentials = {'ssh_password': 'invalid_password'}
repokey_encryption_passphrase = '12345'
@classmethod
......@@ -51,12 +60,12 @@ class TestBackups(unittest.TestCase):
"""Initial setup for all the classes."""
cls.action_directory = tempfile.TemporaryDirectory()
cls.backup_directory = tempfile.TemporaryDirectory()
cls.data_directory = os.path.join(os.path.dirname(
os.path.realpath(__file__)), 'backup_data')
cls.data_directory = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'backup_data')
cls.actions_dir_factory = cfg.actions_dir
cfg.actions_dir = cls.action_directory.name
actions_dir = os.path.join(os.path.dirname(__file__), '..', '..', '..',
'..', 'actions')
actions_dir = os.path.join(
os.path.dirname(__file__), '..', '..', '..', '..', 'actions')
shutil.copy(os.path.join(actions_dir, 'backups'), cfg.actions_dir)
shutil.copy(os.path.join(actions_dir, 'sshfs'), cfg.actions_dir)
......@@ -67,7 +76,8 @@ class TestBackups(unittest.TestCase):
cls.backup_directory.cleanup()
cfg.actions_dir = cls.actions_dir_factory
@unittest.skipUnless(euid == 0, 'Needs to be root')
@unittest.skipUnless(euid == 0 and _borg_is_installed(),
'Needs to be root')
def test_nonexisting_repository(self):
nonexisting_dir = os.path.join(self.backup_directory.name,
'does_not_exist')
......@@ -75,7 +85,8 @@ class TestBackups(unittest.TestCase):
with self.assertRaises(backups.errors.BorgRepositoryDoesNotExistError):
repository.get_info()
@unittest.skipUnless(euid == 0, 'Needs to be root')
@unittest.skipUnless(euid == 0 and _borg_is_installed(),
'Needs to be root')
def test_empty_dir(self):
empty_dir = os.path.join(self.backup_directory.name, 'empty_dir')
os.mkdir(empty_dir)
......@@ -83,7 +94,8 @@ class TestBackups(unittest.TestCase):
with self.assertRaises(backups.errors.BorgRepositoryDoesNotExistError):
repository.get_info()
@unittest.skipUnless(euid == 0, 'Needs to be root')
@unittest.skipUnless(euid == 0 and _borg_is_installed(),
'Needs to be root')
def test_create_unencrypted_repository(self):
repo_path = os.path.join(self.backup_directory.name, 'borgbackup')
repository = BorgRepository(repo_path)
......@@ -91,7 +103,8 @@ class TestBackups(unittest.TestCase):
info = repository.get_info()
self.assertTrue('encryption' in info)
@unittest.skipUnless(euid == 0, 'Needs to be root')
@unittest.skipUnless(euid == 0 and _borg_is_installed(),
'Needs to be root')
def test_create_export_delete_archive(self):
"""
- Create a repo
......@@ -106,9 +119,10 @@ class TestBackups(unittest.TestCase):
repository = BorgRepository(repo_path)
repository.create_repository()
archive_path = "::".join([repo_path, archive_name])
actions.superuser_run(
'backups', ['create-archive', '--path', archive_path, '--paths',
self.data_directory])
actions.superuser_run('backups', [
'create-archive', '--path', archive_path, '--paths',
self.data_directory
])
archive = repository.list_archives()[0]
self.assertEquals(archive['name'], archive_name)
......@@ -117,7 +131,8 @@ class TestBackups(unittest.TestCase):
content = repository.list_archives()
self.assertEquals(len(content), 0)
@unittest.skipUnless(euid == 0 and test_config.backups_ssh_path,
@unittest.skipUnless(euid == 0 and _borg_is_installed()
and test_config.backups_ssh_path,
'Needs to be root and ssh password provided')
def test_remote_backup_actions(self):
"""
......@@ -150,37 +165,38 @@ class TestBackups(unittest.TestCase):
arguments += ['--ssh-keyfile', credentials['ssh_keyfile']]
return (arguments, kwargs)
@unittest.skipUnless(euid == 0 and test_config.backups_ssh_path,
@unittest.skipUnless(euid == 0 and _borg_is_installed()
and test_config.backups_ssh_path,
'Needs to be root and ssh password provided')
def test_sshfs_mount_password(self):
"""Test (un)mounting if password for a remote location is given"""
credentials = self.get_credentials()
ssh_path = test_config.backups_ssh_path
repository = SshBorgRepository(path=ssh_path,
credentials=credentials,
repository = SshBorgRepository(path=ssh_path, credentials=credentials,
automount=False)
repository.mount()
self.assertTrue(repository.is_mounted)
repository.umount()
self.assertFalse(repository.is_mounted)
@unittest.skipUnless(euid == 0 and test_config.backups_ssh_keyfile,
@unittest.skipUnless(euid == 0 and _borg_is_installed()
and test_config.backups_ssh_keyfile,
'Needs to be root and ssh keyfile provided')
def test_sshfs_mount_keyfile(self):
"""Test (un)mounting if keyfile for a remote location is given"""
credentials = self.get_credentials()
ssh_path = test_config.backups_ssh_path
repository = SshBorgRepository(path=ssh_path,
credentials=credentials,
repository = SshBorgRepository(path=ssh_path, credentials=credentials,
automount=False)
repository.mount()
self.assertTrue(repository.is_mounted)
repository.umount()
self.assertFalse(repository.is_mounted)
@unittest.skipUnless(euid == 0, 'Needs to be root')
@unittest.skipUnless(euid == 0 and _borg_is_installed(),
'Needs to be root')
def test_access_nonexisting_url(self):
repository = SshBorgRepository(path=self.nonexisting_repo_url,
credentials=self.dummy_credentials,
......@@ -188,7 +204,8 @@ class TestBackups(unittest.TestCase):
with self.assertRaises(backups.errors.BorgRepositoryDoesNotExistError):
repository.get_info()
@unittest.skipUnless(euid == 0, 'Needs to be root')
@unittest.skipUnless(euid == 0 and _borg_is_installed(),
'Needs to be root')
def test_inaccessible_repo_url(self):
"""Test accessing an existing URL with wrong credentials"""
repository = SshBorgRepository(path=self.inaccessible_repo_url,
......