Commit 26ea813d authored by Joshua Harlow's avatar Joshua Harlow

Fix py26 for rhel (and older versions of python)

parent 67e506a5
......@@ -20,7 +20,6 @@ import base64
import glob
import gzip
import io
import shlex
from cloudinit.net import get_devicelist
from cloudinit.net import sys_netdev_info
......@@ -34,13 +33,17 @@ def _load_shell_content(content, add_empty=False, empty_val=None):
then add entries in to the returned dictionary for 'VAR='
variables. Set their value to empty_val."""
data = {}
for line in shlex.split(content):
key, value = line.split("=", 1)
if not value:
value = empty_val
if add_empty or value:
data[key] = value
for line in util.shlex_split(content):
try:
key, value = line.split("=", 1)
except ValueError:
# Unsplittable line, skip it...
pass
else:
if not value:
value = empty_val
if add_empty or value:
data[key] = value
return data
......@@ -60,6 +63,9 @@ def _klibc_to_config_entry(content, mac_addrs=None):
if mac_addrs is None:
mac_addrs = {}
print("Reading content")
print(content)
data = _load_shell_content(content)
try:
name = data['DEVICE']
......@@ -185,7 +191,7 @@ def read_kernel_cmdline_config(files=None, mac_addrs=None, cmdline=None):
return None
if mac_addrs is None:
mac_addrs = {k: sys_netdev_info(k, 'address')
for k in get_devicelist()}
mac_addrs = dict((k, sys_netdev_info(k, 'address'))
for k in get_devicelist())
return config_from_klibc_net_cfg(files=files, mac_addrs=mac_addrs)
......@@ -421,7 +421,7 @@ def write_files(datadir, files, dirmode=None):
elem.text = DEF_PASSWD_REDACTION
return ET.tostring(root)
except Exception:
LOG.critical("failed to redact userpassword in {}".format(fname))
LOG.critical("failed to redact userpassword in %s", fname)
return cnt
if not datadir:
......
......@@ -534,13 +534,13 @@ def convert_net_json(network_json):
config = []
for link in links:
subnets = []
cfg = {k: v for k, v in link.items()
if k in valid_keys['physical']}
cfg = dict((k, v) for k, v in link.items()
if k in valid_keys['physical'])
cfg.update({'name': link['id']})
for network in [net for net in networks
if net['link'] == link['id']]:
subnet = {k: v for k, v in network.items()
if k in valid_keys['subnet']}
subnet = dict((k, v) for k, v in network.items()
if k in valid_keys['subnet'])
if 'dhcp' in network['type']:
t = 'dhcp6' if network['type'].startswith('ipv6') else 'dhcp4'
subnet.update({
......
......@@ -37,6 +37,7 @@ import pwd
import random
import re
import shutil
import shlex
import socket
import stat
import string
......@@ -81,6 +82,7 @@ CONTAINER_TESTS = (['systemd-detect-virt', '--quiet', '--container'],
['lxc-is-container'])
PROC_CMDLINE = None
PY26 = sys.version_info[0:2] == (2, 6)
def decode_binary(blob, encoding='utf-8'):
......@@ -171,7 +173,8 @@ class ProcessExecutionError(IOError):
def __init__(self, stdout=None, stderr=None,
exit_code=None, cmd=None,
description=None, reason=None):
description=None, reason=None,
errno=None):
if not cmd:
self.cmd = '-'
else:
......@@ -202,6 +205,7 @@ class ProcessExecutionError(IOError):
else:
self.reason = '-'
self.errno = errno
message = self.MESSAGE_TMPL % {
'description': self.description,
'cmd': self.cmd,
......@@ -1147,7 +1151,14 @@ def find_devs_with(criteria=None, oformat='device',
options.append(path)
cmd = blk_id_cmd + options
# See man blkid for why 2 is added
(out, _err) = subp(cmd, rcs=[0, 2])
try:
(out, _err) = subp(cmd, rcs=[0, 2])
except ProcessExecutionError as e:
if e.errno == errno.ENOENT:
# blkid not found...
out = ""
else:
raise
entries = []
for line in out.splitlines():
line = line.strip()
......@@ -1191,6 +1202,13 @@ def load_file(fname, read_cb=None, quiet=False, decode=True):
return contents
def shlex_split(blob):
if PY26 and isinstance(blob, six.text_type):
# Older versions don't support unicode input
blob = blob.encode("utf8")
return shlex.split(blob)
def get_cmdline():
if 'DEBUG_PROC_CMDLINE' in os.environ:
return os.environ["DEBUG_PROC_CMDLINE"]
......@@ -1696,7 +1714,8 @@ def subp(args, data=None, rcs=None, env=None, capture=True, shell=False,
sp = subprocess.Popen(args, **kws)
(out, err) = sp.communicate(data)
except OSError as e:
raise ProcessExecutionError(cmd=args, reason=e)
raise ProcessExecutionError(cmd=args, reason=e,
errno=e.errno)
rc = sp.returncode
if rc not in rcs:
raise ProcessExecutionError(stdout=out, stderr=err,
......
......@@ -11,8 +11,12 @@ PrettyTable
oauthlib
# This one is currently used only by the CloudSigma and SmartOS datasources.
# If these datasources are removed, this is no longer needed
pyserial
# If these datasources are removed, this is no longer needed.
#
# This will not work in py2.6 so it is only optionally installed on
# python 2.7 and later.
#
# pyserial
# This is only needed for places where we need to support configs in a manner
# that the built-in config parser is not sufficent (ie
......
......@@ -197,7 +197,6 @@ requirements = read_requires()
if sys.version_info < (3,):
requirements.append('cheetah')
setuptools.setup(
name='cloud-init',
version=get_version(),
......
......@@ -7,12 +7,10 @@ import shutil
import tempfile
import unittest
import mock
import six
import unittest2
try:
from unittest import mock
except ImportError:
import mock
try:
from contextlib import ExitStack
except ImportError:
......@@ -21,6 +19,9 @@ except ImportError:
from cloudinit import helpers as ch
from cloudinit import util
# Used for skipping tests
SkipTest = unittest2.SkipTest
# Used for detecting different python versions
PY2 = False
PY26 = False
......@@ -44,79 +45,6 @@ else:
if _PY_MINOR == 4 and _PY_MICRO < 3:
FIX_HTTPRETTY = True
if PY26:
# For now add these on, taken from python 2.7 + slightly adjusted. Drop
# all this once Python 2.6 is dropped as a minimum requirement.
class TestCase(unittest.TestCase):
def setUp(self):
super(TestCase, self).setUp()
self.__all_cleanups = ExitStack()
def tearDown(self):
self.__all_cleanups.close()
unittest.TestCase.tearDown(self)
def addCleanup(self, function, *args, **kws):
self.__all_cleanups.callback(function, *args, **kws)
def assertIs(self, expr1, expr2, msg=None):
if expr1 is not expr2:
standardMsg = '%r is not %r' % (expr1, expr2)
self.fail(self._formatMessage(msg, standardMsg))
def assertIn(self, member, container, msg=None):
if member not in container:
standardMsg = '%r not found in %r' % (member, container)
self.fail(self._formatMessage(msg, standardMsg))
def assertNotIn(self, member, container, msg=None):
if member in container:
standardMsg = '%r unexpectedly found in %r'
standardMsg = standardMsg % (member, container)
self.fail(self._formatMessage(msg, standardMsg))
def assertIsNone(self, value, msg=None):
if value is not None:
standardMsg = '%r is not None'
standardMsg = standardMsg % (value)
self.fail(self._formatMessage(msg, standardMsg))
def assertIsInstance(self, obj, cls, msg=None):
"""Same as self.assertTrue(isinstance(obj, cls)), with a nicer
default message."""
if not isinstance(obj, cls):
standardMsg = '%s is not an instance of %r' % (repr(obj), cls)
self.fail(self._formatMessage(msg, standardMsg))
def assertDictContainsSubset(self, expected, actual, msg=None):
missing = []
mismatched = []
for k, v in expected.items():
if k not in actual:
missing.append(k)
elif actual[k] != v:
mismatched.append('%r, expected: %r, actual: %r'
% (k, v, actual[k]))
if len(missing) == 0 and len(mismatched) == 0:
return
standardMsg = ''
if missing:
standardMsg = 'Missing: %r' % ','.join(m for m in missing)
if mismatched:
if standardMsg:
standardMsg += '; '
standardMsg += 'Mismatched values: %s' % ','.join(mismatched)
self.fail(self._formatMessage(msg, standardMsg))
else:
class TestCase(unittest.TestCase):
pass
# Makes the old path start
# with new base instead of whatever
# it previously had
......@@ -151,6 +79,10 @@ def retarget_many_wrapper(new_base, am, old_func):
return wrapper
class TestCase(unittest2.TestCase):
pass
class ResourceUsingTestCase(TestCase):
def setUp(self):
super(ResourceUsingTestCase, self).setUp()
......
import os
import shutil
import tempfile
import unittest
try:
from unittest import mock
except ImportError:
import mock
try:
from contextlib import ExitStack
except ImportError:
from contextlib2 import ExitStack
import unittest2
from cloudinit import handlers
from cloudinit import helpers
......@@ -18,7 +9,7 @@ from cloudinit import settings
from cloudinit import url_helper
from cloudinit import util
from .helpers import TestCase
from .helpers import TestCase, ExitStack, mock
class FakeModule(handlers.Handler):
......@@ -99,9 +90,10 @@ class TestWalkerHandleHandler(TestCase):
self.assertEqual(self.data['handlercount'], 0)
class TestHandlerHandlePart(unittest.TestCase):
class TestHandlerHandlePart(TestCase):
def setUp(self):
super(TestHandlerHandlePart, self).setUp()
self.data = "fake data"
self.ctype = "fake ctype"
self.filename = "fake filename"
......@@ -177,7 +169,7 @@ class TestHandlerHandlePart(unittest.TestCase):
self.data, self.ctype, self.filename, self.payload)
class TestCmdlineUrl(unittest.TestCase):
class TestCmdlineUrl(TestCase):
def test_invalid_content(self):
url = "http://example.com/foo"
key = "mykey"
......
......@@ -4,12 +4,7 @@ import sys
import six
from . import helpers as test_helpers
try:
from unittest import mock
except ImportError:
import mock
mock = test_helpers.mock
BIN_CLOUDINIT = "bin/cloud-init"
......
from __future__ import print_function
import sys
import unittest
from . import helpers as test_helpers
from cloudinit.cs_utils import Cepko
import unittest2
try:
skip = unittest.skip
except AttributeError:
# Python 2.6. Doesn't have to be high fidelity.
def skip(reason):
def decorator(func):
def wrapper(*args, **kws):
print(reason, file=sys.stderr)
return wrapper
return decorator
from cloudinit.cs_utils import Cepko
WILL_WORK = True
except ImportError:
WILL_WORK = False
SERVER_CONTEXT = {
......@@ -32,29 +26,21 @@ SERVER_CONTEXT = {
}
class CepkoMock(Cepko):
def all(self):
return SERVER_CONTEXT
if WILL_WORK:
class CepkoMock(Cepko):
def all(self):
return SERVER_CONTEXT
def get(self, key="", request_pattern=None):
return SERVER_CONTEXT['tags']
def get(self, key="", request_pattern=None):
return SERVER_CONTEXT['tags']
# 2015-01-22 BAW: This test is completely useless because it only ever tests
# the CepkoMock object. Even in its original form, I don't think it ever
# touched the underlying Cepko class methods.
@skip('This test is completely useless')
class CepkoResultTests(unittest.TestCase):
class CepkoResultTests(test_helpers.TestCase):
def setUp(self):
pass
# self.mocked = self.mocker.replace("cloudinit.cs_utils.Cepko",
# spec=CepkoMock,
# count=False,
# passthrough=False)
# self.mocked()
# self.mocker.result(CepkoMock())
# self.mocker.replay()
# self.c = Cepko()
raise unittest2.SkipTest('This test is completely useless')
def test_getitem(self):
result = self.c.all()
......
from cloudinit import helpers
from cloudinit.util import b64e, decode_binary, load_file
from cloudinit.sources import DataSourceAzure
from ..helpers import TestCase, populate_dir
try:
from unittest import mock
except ImportError:
import mock
try:
from contextlib import ExitStack
except ImportError:
from contextlib2 import ExitStack
from ..helpers import TestCase, populate_dir, mock, ExitStack, PY26, SkipTest
import crypt
import os
......@@ -83,6 +75,8 @@ class TestAzureDataSource(TestCase):
def setUp(self):
super(TestAzureDataSource, self).setUp()
if PY26:
raise SkipTest("Does not work on python 2.6")
self.tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp)
......
import os
from cloudinit.sources.helpers import azure as azure_helper
from ..helpers import TestCase
try:
from unittest import mock
except ImportError:
import mock
from ..helpers import ExitStack
from ..helpers import TestCase
try:
from contextlib import ExitStack
except ImportError:
from contextlib2 import ExitStack
from ..helpers import mock
GOAL_STATE_TEMPLATE = """\
......
# coding: utf-8
import copy
from cloudinit.cs_utils import Cepko
from cloudinit.sources import DataSourceCloudSigma
try:
# Serial does not work on py2.6 (anymore)
import pyserial
from cloudinit.cs_utils import Cepko
from cloudinit.sources import DataSourceCloudSigma
WILL_WORK = True
except ImportError:
WILL_WORK = False
from .. import helpers as test_helpers
from ..helpers import SkipTest
SERVER_CONTEXT = {
"cpu": 1000,
......@@ -29,17 +36,20 @@ SERVER_CONTEXT = {
}
class CepkoMock(Cepko):
def __init__(self, mocked_context):
self.result = mocked_context
if WILL_WORK:
class CepkoMock(Cepko):
def __init__(self, mocked_context):
self.result = mocked_context
def all(self):
return self
def all(self):
return self
class DataSourceCloudSigmaTest(test_helpers.TestCase):
def setUp(self):
super(DataSourceCloudSigmaTest, self).setUp()
if not WILL_WORK:
raise SkipTest("Datasource testing not supported")
self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "")
self.datasource.is_running_in_cloudsigma = lambda: True
self.datasource.cepko = CepkoMock(SERVER_CONTEXT)
......
from cloudinit import helpers
from cloudinit.sources.DataSourceCloudStack import DataSourceCloudStack
from ..helpers import TestCase
try:
from unittest import mock
except ImportError:
import mock
try:
from contextlib import ExitStack
except ImportError:
from contextlib2 import ExitStack
from ..helpers import TestCase, mock, ExitStack
class TestCloudStackPasswordFetching(TestCase):
......
......@@ -5,22 +5,13 @@ import shutil
import six
import tempfile
try:
from unittest import mock
except ImportError:
import mock
try:
from contextlib import ExitStack
except ImportError:
from contextlib2 import ExitStack
from cloudinit import helpers
from cloudinit import settings
from cloudinit.sources import DataSourceConfigDrive as ds
from cloudinit.sources.helpers import openstack
from cloudinit import util
from ..helpers import TestCase
from ..helpers import TestCase, ExitStack, mock
PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
......
from cloudinit import helpers
from cloudinit.sources import DataSourceNoCloud
from cloudinit import util
from ..helpers import TestCase, populate_dir
from ..helpers import TestCase, populate_dir, mock, ExitStack
import os
import yaml
import shutil
import tempfile
import unittest
try:
from unittest import mock
except ImportError:
import mock
try:
from contextlib import ExitStack
except ImportError:
from contextlib2 import ExitStack
class TestNoCloudDataSource(TestCase):
......@@ -139,7 +129,7 @@ class TestNoCloudDataSource(TestCase):
self.assertTrue(ret)
class TestParseCommandLineData(unittest.TestCase):
class TestParseCommandLineData(TestCase):
def test_parse_cmdline_data_valid(self):
ds_id = "ds=nocloud"
......
......@@ -33,19 +33,21 @@ import tempfile
import uuid
from binascii import crc32
import serial
try:
# Serial does not work on py2.6 (anymore)
import serial
from cloudinit.sources import DataSourceSmartOS
WILL_WORK = True
except ImportError:
WILL_WORK = False
import six
from cloudinit import helpers as c_helpers
from cloudinit.sources import DataSourceSmartOS
from cloudinit.util import b64e
from .. import helpers
try:
from unittest import mock
except ImportError:
import mock
from ..helpers import mock, SkipTest
MOCK_RETURNS = {
'hostname': 'test-host',
......@@ -79,7 +81,8 @@ def get_mock_client(mockdata):
class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
def setUp(self):
super(TestSmartOSDataSource, self).setUp()
if not WILL_WORK:
raise SkipTest("This test will not work")
self.tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp)
self.legacy_user_d = tempfile.mkdtemp()
......@@ -445,6 +448,8 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
def setUp(self):
super(TestJoyentMetadataClient, self).setUp()
if not WILL_WORK:
raise SkipTest("This test will not work")
self.serial = mock.MagicMock(spec=serial.Serial)
self.request_id = 0xabcdef12
self.metadata_value = 'value'
......
......@@ -78,8 +78,9 @@ class TestEniNetRendering(TestCase):
@mock.patch("cloudinit.net.sys_dev_path")
@mock.patch("cloudinit.net.sys_netdev_info")
@mock.patch("cloudinit.net.get_devicelist")
def test_generation(self, mock_get_devicelist, mock_sys_netdev_info,
mock_sys_dev_path):
def test_default_generation(self, mock_get_devicelist,
mock_sys_netdev_info,
mock_sys_dev_path):
mock_get_devicelist.return_value = ['eth1000', 'lo']
dev_characteristics = {
......@@ -138,7 +139,7 @@ iface eth1000 inet dhcp
self.assertEqual(expected.lstrip(), contents.lstrip())
class TestNetConfigParsing(TestCase):
class TestCmdlineConfigParsing(TestCase):
simple_cfg = {
'config': [{"type": "physical", "name": "eth0",
"mac_address": "c0:d6:9f:2c:e8:80",
......
......@@ -7,7 +7,9 @@ from cloudinit import reporting
from cloudinit.reporting import handlers
from cloudinit.reporting import events
from .helpers import (mock, TestCase)
import mock
from .helpers import TestCase
def _fake_registry():
......
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3, as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from cloudinit import util
from cloudinit.config import cc_rh_subscription
import logging
import mock
import unittest
from .helpers import TestCase, mock
class GoodTests(unittest.TestCase):
class GoodTests(TestCase):
def setUp(self):
super(GoodTests, self).setUp()
self.name = "cc_rh_subscription"
......@@ -92,7 +105,7 @@ class GoodTests(unittest.TestCase):
self.assertEqual(self.SM._sub_man_cli.call_count, 9)
class TestBadInput(unittest.TestCase):
class TestBadInput(TestCase):
name = "cc_rh_subscription"
cloud_init = None
log = logging.getLogger("bad_tests")
......
[tox]
envlist = py27,py3,pyflakes
envlist = py27,py26,py3,pyflakes
recreate = True
usedevelop = True
[testenv]
commands = python -m nose {posargs:tests}
deps = -r{toxinidir}/test-requirements.txt
-r{toxinidir}/requirements.txt
[testenv:py3]
basepython = python3
-r{toxinidir}/requirements.txt
setenv =
LC_ALL = en_US.utf-8
[testenv:pyflakes]