clonetest.py 6.74 KB
Newer Older
1
# Copyright (C) 2013, 2015 Red Hat, Inc.
2 3 4
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
5 6
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301 USA.

import unittest
import os
import logging

from tests import utils

24
from virtinst import Cloner
25 26 27 28 29 30 31

ORIG_NAME  = "clone-orig"
CLONE_NAME = "clone-new"

# Create some files to use as test images
FILE1 = "/tmp/virtinst-test1.img"
FILE2 = "/tmp/virtinst-test2.img"
32 33 34 35
P1_VOL1  = "/dev/default-pool/testvol1.img"
P1_VOL2  = "/dev/default-pool/testvol2.img"
P2_VOL1  = "/dev/cross-pool/testvol1.img"
P2_VOL2  = "/dev/cross-pool/testvol2.img"
36

37 38 39
POOL1 = "/dev/default-pool"
POOL2 = "/dev/cross-pool"
DISKPOOL = "/dev/disk-pool"
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56

local_files = [FILE1, FILE2]

clonexml_dir = os.path.join(os.getcwd(), "tests/clone-xml")


class TestClone(unittest.TestCase):

    def setUp(self):
        for f in local_files:
            os.system("touch %s" % f)

    def tearDown(self):
        for f in local_files:
            os.unlink(f)

    def _clone_helper(self, filebase, disks=None, force_list=None,
57
                      skip_list=None, compare=True, conn=None,
58
                      clone_disks_file=None):
59 60 61 62
        """Helper for comparing clone input/output from 2 xml files"""
        infile = os.path.join(clonexml_dir, filebase + "-in.xml")
        in_content = utils.read_file(infile)

63 64 65
        if not conn:
            conn = utils.open_testdriver()
        cloneobj = Cloner(conn)
66 67 68 69 70 71 72 73 74
        cloneobj.original_xml = in_content
        for force in force_list or []:
            cloneobj.force_target = force
        for skip in skip_list or []:
            cloneobj.skip_target = skip

        cloneobj = self._default_clone_values(cloneobj, disks)

        if compare:
75 76
            self._clone_compare(cloneobj, filebase,
                                clone_disks_file=clone_disks_file)
77 78
            self._clone_define(filebase)
        else:
79 80
            cloneobj.setup_original()
            cloneobj.setup_clone()
81 82 83 84 85 86

    def _default_clone_values(self, cloneobj, disks=None):
        """Sets default values for the cloned VM."""
        cloneobj.clone_name = "clone-new"
        cloneobj.clone_uuid = "12345678-1234-1234-1234-123456789012"

87
        cloneobj.clone_macs = ["22:23:45:67:89:00", "22:23:45:67:89:01"]
88

89 90
        if disks is None:
            disks = ["/dev/disk-pool/disk-vol1", "/tmp/clone2.img",
91
                     "/clone3", "/tmp/clone4.img",
92
                     "/tmp/clone5.img", None]
93

94
        cloneobj.clone_paths = disks
95 96
        return cloneobj

97
    def _clone_compare(self, cloneobj, outbase, clone_disks_file=None):
98 99 100
        """Helps compare output from passed clone instance with an xml file"""
        outfile = os.path.join(clonexml_dir, outbase + "-out.xml")

101 102
        cloneobj.setup_original()
        cloneobj.setup_clone()
103 104

        utils.diff_compare(cloneobj.clone_xml, outfile)
105 106 107 108 109
        if clone_disks_file:
            xml_clone_disks = ""
            for i in cloneobj.get_clone_disks():
                xml_clone_disks += i.get_vol_install().get_xml_config()
            utils.diff_compare(xml_clone_disks, clone_disks_file)
110 111 112 113 114 115

    def _clone_define(self, filebase):
        """Take the valid output xml and attempt to define it on the
           connection to ensure we don't get any errors"""
        outfile = os.path.join(clonexml_dir, filebase + "-out.xml")
        outxml = utils.read_file(outfile)
116
        conn = utils.open_testdriver()
117 118 119 120
        utils.test_create(conn, outxml)

    def testRemoteNoStorage(self):
        """Test remote clone where VM has no storage that needs cloning"""
121
        conn = utils.open_test_remote()
122
        for base in ["nostorage", "noclone-storage"]:
123
            self._clone_helper(base, disks=[], conn=conn)
124 125 126 127 128 129

    def testRemoteWithStorage(self):
        """
        Test remote clone with storage needing cloning. Should fail,
        since libvirt has no storage clone api.
        """
130
        conn = utils.open_test_remote()
131
        for base in ["general-cfg"]:
132 133 134 135
            try:
                self._clone_helper(base,
                                   disks=["%s/1.img" % POOL1,
                                          "%s/2.img" % POOL1],
136
                                   conn=conn)
137 138 139 140

                # We shouldn't succeed, so test fails
                raise AssertionError("Remote clone with storage passed "
                                     "when it shouldn't.")
141
            except (ValueError, RuntimeError) as e:
142 143 144
                # Exception expected
                logging.debug("Received expected exception: %s", str(e))

145
    def testCloneStorageManaged(self):
146 147 148 149 150 151
        base = "managed-storage"
        self._clone_helper(base, ["%s/new1.img" % POOL1,
                                  "%s/new2.img" % DISKPOOL])

    def testCloneStorageCrossPool(self):
        base = "cross-pool"
152
        conn = utils.open_test_remote()
153
        clone_disks_file = os.path.join(clonexml_dir, base + "-disks-out.xml")
154
        self._clone_helper(base, ["%s/new1.img" % POOL2,
155 156
                                  "%s/new2.img" % POOL1],
                           clone_disks_file=clone_disks_file,
157
                           conn=conn)
158 159 160 161

    def testCloneStorageForce(self):
        base = "force"
        self._clone_helper(base,
162
                           disks=["/dev/default-pool/1234.img",
163
                                  None, "/clone2.img"],
164 165 166 167 168
                           force_list=["hda", "fdb", "sdb"])

    def testCloneStorageSkip(self):
        base = "skip"
        self._clone_helper(base,
169 170
                           disks=["/dev/default-pool/1234.img",
                                  None, "/tmp/clone2.img"],
171 172 173 174 175 176 177 178 179 180 181
                           skip_list=["hda", "fdb"])

    def testCloneFullPool(self):
        base = "fullpool"
        try:
            self._clone_helper(base, disks=["/full-pool/test.img"],
                               compare=False)
        except Exception:
            return

        raise AssertionError("Expected exception, but none raised.")
182 183 184 185

    def testCloneNvramAuto(self):
        base = "nvram-auto"
        self._clone_helper(base)
186 187 188 189

    def testCloneNvramNewpool(self):
        base = "nvram-newpool"
        self._clone_helper(base)
190 191 192 193

    def testCloneGraphicsPassword(self):
        base = "graphics-password"
        self._clone_helper(base)