test_merging.py 7.46 KB
Newer Older
1 2
# This file is part of cloud-init. See LICENSE file for license information.

3
from cloudinit.tests import helpers
4

5 6 7 8 9 10 11 12 13
from cloudinit.handlers import cloud_config
from cloudinit.handlers import (CONTENT_START, CONTENT_END)

from cloudinit import helpers as c_helpers
from cloudinit import util

import collections
import glob
import os
14
import random
15
import re
16
import six
17
import string
18

19
SOURCE_PAT = "source*.*yaml"
20
EXPECTED_PAT = "expected%s.yaml"
21 22
TYPES = [dict, str, list, tuple, None]
TYPES.extend(six.integer_types)
23 24 25 26 27 28 29 30 31


def _old_mergedict(src, cand):
    """
    Merge values from C{cand} into C{src}.
    If C{src} has a key C{cand} will not override.
    Nested dictionaries are merged recursively.
    """
    if isinstance(src, dict) and isinstance(cand, dict):
32
        for (k, v) in cand.items():
33 34 35 36 37 38 39 40 41 42 43 44 45
            if k not in src:
                src[k] = v
            else:
                src[k] = _old_mergedict(src[k], v)
    return src


def _old_mergemanydict(*args):
    out = {}
    for a in args:
        out = _old_mergedict(out, a)
    return out

46

47 48
def _random_str(rand):
    base = ''
49 50
    for _i in range(rand.randint(1, 2 ** 8)):
        base += rand.choice(string.ascii_letters + string.digits)
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
    return base


class _NoMoreException(Exception):
    pass


def _make_dict(current_depth, max_depth, rand):
    if current_depth >= max_depth:
        raise _NoMoreException()
    if current_depth == 0:
        t = dict
    else:
        t = rand.choice(TYPES)
    base = None
    if t in [None]:
        return base
    if t in [dict, list, tuple]:
        if t in [dict]:
            amount = rand.randint(0, 5)
71
            keys = [_random_str(rand) for _i in range(0, amount)]
72 73 74 75 76 77 78 79 80
            base = {}
            for k in keys:
                try:
                    base[k] = _make_dict(current_depth + 1, max_depth, rand)
                except _NoMoreException:
                    pass
        elif t in [list, tuple]:
            base = []
            amount = rand.randint(0, 5)
81
            for _i in range(0, amount):
82 83 84 85 86 87
                try:
                    base.append(_make_dict(current_depth + 1, max_depth, rand))
                except _NoMoreException:
                    pass
            if t in [tuple]:
                base = tuple(base)
88
    elif t in six.integer_types:
Scott Moser's avatar
Scott Moser committed
89
        base = rand.randint(0, 2 ** 8)
90 91 92 93 94 95 96 97 98 99 100
    elif t in [str]:
        base = _random_str(rand)
    return base


def make_dict(max_depth, seed=None):
    max_depth = max(1, max_depth)
    rand = random.Random(seed)
    return _make_dict(0, max_depth, rand)


101
class TestSimpleRun(helpers.ResourceUsingTestCase):
102
    def _load_merge_files(self):
103
        merge_root = helpers.resourceLocation('merge_sources')
104 105 106
        tests = []
        source_ids = collections.defaultdict(list)
        expected_files = {}
107
        for fn in glob.glob(os.path.join(merge_root, SOURCE_PAT)):
108 109 110 111 112 113 114
            base_fn = os.path.basename(fn)
            file_id = re.match(r"source(\d+)\-(\d+)[.]yaml", base_fn)
            if not file_id:
                raise IOError("File %s does not have a numeric identifier"
                              % (fn))
            file_id = int(file_id.group(1))
            source_ids[file_id].append(fn)
115
            expected_fn = os.path.join(merge_root, EXPECTED_PAT % (file_id))
116 117 118
            if not os.path.isfile(expected_fn):
                raise IOError("No expected file found at %s" % (expected_fn))
            expected_files[file_id] = expected_fn
119
        for i in sorted(source_ids.keys()):
120
            source_file_contents = []
121 122 123 124 125
            for fn in sorted(source_ids[i]):
                source_file_contents.append([fn, util.load_file(fn)])
            expected = util.load_yaml(util.load_file(expected_files[i]))
            entry = [source_file_contents, [expected, expected_files[i]]]
            tests.append(entry)
126 127
        return tests

128 129
    def test_seed_runs(self):
        test_dicts = []
130
        for i in range(1, 10):
131
            base_dicts = []
132
            for j in range(1, 10):
133 134 135 136 137
                base_dicts.append(make_dict(5, i * j))
            test_dicts.append(base_dicts)
        for test in test_dicts:
            c = _old_mergemanydict(*test)
            d = util.mergemanydict(test)
138
            self.assertEqual(c, d)
139

140
    def test_merge_cc_samples(self):
141
        tests = self._load_merge_files()
142 143 144
        paths = c_helpers.Paths({})
        cc_handler = cloud_config.CloudConfigPartHandler(paths)
        cc_handler.cloud_fn = None
145
        for (payloads, (expected_merge, expected_fn)) in tests:
146 147
            cc_handler.handle_part(None, CONTENT_START, None,
                                   None, None, None)
148 149 150 151 152
            merging_fns = []
            for (fn, contents) in payloads:
                cc_handler.handle_part(None, None, "%s.yaml" % (fn),
                                       contents, None, {})
                merging_fns.append(fn)
153 154 155
            merged_buf = cc_handler.cloud_buf
            cc_handler.handle_part(None, CONTENT_END, None,
                                   None, None, None)
156 157 158 159
            fail_msg = "Equality failure on checking %s with %s: %s != %s"
            fail_msg = fail_msg % (expected_fn,
                                   ",".join(merging_fns), merged_buf,
                                   expected_merge)
160
            self.assertEqual(expected_merge, merged_buf, msg=fail_msg)
161 162 163 164 165 166 167 168 169 170 171

    def test_compat_merges_dict(self):
        a = {
            '1': '2',
            'b': 'c',
        }
        b = {
            'b': 'e',
        }
        c = _old_mergedict(a, b)
        d = util.mergemanydict([a, b])
172
        self.assertEqual(c, d)
173

Scott Moser's avatar
Scott Moser committed
174
    def test_compat_merges_dict2(self):
175 176 177 178 179 180 181 182 183 184 185 186
        a = {
            'Blah': 1,
            'Blah2': 2,
            'Blah3': 3,
        }
        b = {
            'Blah': 1,
            'Blah2': 2,
            'Blah3': [1],
        }
        c = _old_mergedict(a, b)
        d = util.mergemanydict([a, b])
187
        self.assertEqual(c, d)
188

189 190 191 192 193 194
    def test_compat_merges_list(self):
        a = {'b': [1, 2, 3]}
        b = {'b': [4, 5]}
        c = {'b': [6, 7]}
        e = _old_mergemanydict(a, b, c)
        f = util.mergemanydict([a, b, c])
195
        self.assertEqual(e, f)
196 197 198 199 200 201 202

    def test_compat_merges_str(self):
        a = {'b': "hi"}
        b = {'b': "howdy"}
        c = {'b': "hallo"}
        e = _old_mergemanydict(a, b, c)
        f = util.mergemanydict([a, b, c])
203
        self.assertEqual(e, f)
204 205

    def test_compat_merge_sub_dict(self):
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
        a = {
            '1': '2',
            'b': {
                'f': 'g',
                'e': 'c',
                'h': 'd',
                'hh': {
                    '1': 2,
                },
            }
        }
        b = {
            'b': {
                'e': 'c',
                'hh': {
                    '3': 4,
                }
            }
        }
        c = _old_mergedict(a, b)
        d = util.mergemanydict([a, b])
227
        self.assertEqual(c, d)
228 229

    def test_compat_merge_sub_dict2(self):
230 231 232 233 234 235 236 237 238 239 240 241 242
        a = {
            '1': '2',
            'b': {
                'f': 'g',
            }
        }
        b = {
            'b': {
                'e': 'c',
            }
        }
        c = _old_mergedict(a, b)
        d = util.mergemanydict([a, b])
243
        self.assertEqual(c, d)
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258

    def test_compat_merge_sub_list(self):
        a = {
            '1': '2',
            'b': {
                'f': ['1'],
            }
        }
        b = {
            'b': {
                'f': [],
            }
        }
        c = _old_mergedict(a, b)
        d = util.mergemanydict([a, b])
259
        self.assertEqual(c, d)
260 261

# vi: ts=4 expandtab