Commit 94f24ae1 authored by Jan Vcelak's avatar Jan Vcelak

profiles.loader: new code, unit tests

parent 29f7357f
......@@ -94,5 +94,8 @@ clean:
find -name "*.pyc" | xargs rm -f
rm -rf $(VERSIONED_NAME) rpm-build-dir
.PHONY: clean archive srpm tag
test:
python -m unittest discover tests
.PHONY: clean archive srpm tag test
import unittest
import tempfile
import shutil
import os.path
import tuned.profiles.loader
# DI: return config itself instead of the profile
class TestLoader(tuned.profiles.loader.Loader):
def _create_profile(self, config):
return config
class LoaderTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
tmpdir1 = tempfile.mkdtemp()
tmpdir2 = tempfile.mkdtemp()
cls._tmp_load_dirs = [tmpdir1, tmpdir2]
cls._create_profile(tmpdir1, "default", "[main]\n\n[network]\ntype=net\ndevices=em*\n\n[disk]\nenabled=false\n")
cls._create_profile(tmpdir1, "invalid", "INVALID")
cls._create_profile(tmpdir2, "empty", "")
cls._create_profile(tmpdir1, "custom", "[custom]\ntype=one\n")
cls._create_profile(tmpdir2, "custom", "[custom]\ntype=two\n")
@classmethod
def tearDownClass(cls):
for tmp_dir in cls._tmp_load_dirs:
shutil.rmtree(tmp_dir, True)
@classmethod
def _create_profile(cls, load_dir, profile_name, tuned_conf_content):
profile_dir = os.path.join(load_dir, profile_name)
conf_name = os.path.join(profile_dir, "tuned.conf")
os.mkdir(profile_dir)
with open(conf_name, "w") as conf_file:
conf_file.write(tuned_conf_content)
def test_init(self):
tuned.profiles.loader.Loader()
tuned.profiles.loader.Loader([])
tuned.profiles.loader.Loader(["/tmp"])
tuned.profiles.loader.Loader(["/foo", "/bar"])
def test_init_wrong_type(self):
with self.assertRaises(TypeError):
tuned.profiles.loader.Loader(False)
def test_init_extra_params(self):
with self.assertRaises(TypeError):
tuned.profiles.loader.Loader([], "extra")
def test_default_load_directories(self):
loader = tuned.profiles.loader.Loader()
# order is important
self.assertEqual(len(loader.load_directories), 2)
self.assertEqual(loader.load_directories[0], "/var/lib/tuned")
self.assertEqual(loader.load_directories[1], "/etc/tuned")
def test_add_directory(self):
loader = tuned.profiles.loader.Loader([])
self.assertEqual(len(loader.load_directories), 0);
loader.add_directory("/a")
self.assertEqual(len(loader.load_directories), 1);
loader.add_directory("/b")
self.assertEqual(len(loader.load_directories), 2);
self.assertEqual(loader.load_directories[0], "/a")
self.assertEqual(loader.load_directories[1], "/b")
def test_load(self):
loader = TestLoader(self._tmp_load_dirs)
default_config = loader.load("default")
self.assertIn("main", default_config)
self.assertIn("network", default_config)
self.assertIn("disk", default_config)
self.assertNotIn("type", default_config["main"])
self.assertEquals(default_config["network"]["type"], "net")
self.assertEquals(default_config["disk"]["type"], "disk")
self.assertEquals(len(default_config["main"]), 0);
self.assertEquals(len(default_config["network"]), 2);
self.assertEquals(len(default_config["disk"]), 2);
self.assertEquals(default_config["network"]["devices"], "em*")
self.assertEquals(default_config["disk"]["enabled"], "false") # TODO: improve parser
def test_load_empty(self):
loader = TestLoader(self._tmp_load_dirs)
empty_config = loader.load("empty")
self.assertEquals(empty_config, {})
def test_load_invalid(self):
loader = TestLoader(self._tmp_load_dirs)
with self.assertRaises(tuned.profiles.exceptions.InvalidProfileException):
invalid_config = loader.load("invalid")
def test_load_nonexistent(self):
loader = TestLoader(self._tmp_load_dirs)
with self.assertRaises(tuned.profiles.exceptions.InvalidProfileException):
config = loader.load("nonexistent")
def test_load_order(self):
loader = TestLoader(self._tmp_load_dirs)
custom_config = loader.load("custom")
self.assertEquals(custom_config["custom"]["type"], "two")
def test_default_load(self):
loader = tuned.profiles.loader.Loader(self._tmp_load_dirs)
config = loader.load("empty")
self.assertIs(type(config), tuned.profiles.profile.Profile)
import unittest
import tuned.profiles
class ProfileTestCase(unittest.TestCase):
def test_init(self):
tuned.profiles.Profile({})
def test_init_missing_params(self):
with self.assertRaises(TypeError):
tuned.profiles.Profile()
def test_init_extra_params(self):
with self.assertRaises(TypeError):
tuned.profiles.Profile({}, "extra")
def test_create_units(self):
profile = tuned.profiles.Profile({
"main": { "anything": 10 },
"network" : { "type": "net", "devices": "*" },
"storage" : { "type": "disk" },
})
self.assertIs(type(profile.units), list)
self.assertEqual(len(profile.units), 2)
for name in ["network", "storage"]:
for unit in profile.units:
if unit.name == name:
break
else:
self.assertTrue(False)
def test_create_units_empty(self):
profile = tuned.profiles.Profile({"main":{}})
self.assertIs(type(profile.units), list)
self.assertEqual(len(profile.units), 0)
def test_sets_options(self):
profile = tuned.profiles.Profile({
"main": { "anything": 10 },
"network" : { "type": "net", "devices": "*" },
})
self.assertIs(type(profile.options), dict)
self.assertEquals(profile.options["anything"], 10)
def test_sets_options_empty(self):
profile = tuned.profiles.Profile({
"storage" : { "type": "disk" },
})
self.assertIs(type(profile.options), dict)
self.assertEquals(len(profile.options), 0)
import unittest
import tuned.profiles
class UnitTestCase(unittest.TestCase):
def test_init(self):
unit = tuned.profiles.Unit("network", "net", {})
def test_init_missing_params(self):
with self.assertRaises(TypeError):
tuned.profiles.Unit()
def test_init_extra_params(self):
with self.assertRaises(TypeError):
tuned.profiles.Unit("a", "b", {}, "c")
def test_attributes(self):
unit = tuned.profiles.Unit("network", "net", {"devices": "em1"})
self.assertEqual(unit.name, "network")
self.assertEqual(unit.plugin, "net")
self.assertEqual(unit.options["devices"], "em1")
unit2 = tuned.profiles.Unit("test", "disk", {"enabled": True})
self.assertEqual(unit2.name, "test")
self.assertEqual(unit2.plugin, "disk")
self.assertTrue(unit2.options["enabled"])
# Copyright (C) 2008-2011 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
import exceptions
import logs
import ConfigParser
import fnmatch
import os
import pprint
log = logs.get()
__all__ = ["Profile"]
class Profile(object):
def __init__(self, manager, config_file):
self._manager = manager
self._config_file = config_file
self._plugin_configs = {}
@classmethod
def find_profile(cls, name):
profile = "/etc/tuned/%s/tuned.conf" % (name)
if os.path.exists(profile):
return profile
profile = "/usr/lib/tuned/%s/tuned.conf" % (name)
if os.path.exists(profile):
return profile
return name
def _apply_config(self):
pp = pprint.PrettyPrinter(indent=4)
log.debug("Loaded config: %s" % (pp.pformat(self._plugin_configs)))
for name, cfg in self._plugin_configs.iteritems():
plugin = cfg["type"]
del cfg["type"]
p = self._manager.create(name, plugin, cfg)
def _disable_plugin(self, name, plugin_cfg):
# Iterates over already loaded plugins.
# If the already loaded plugin contains the same device as the newly
# loaded one, remove the device from the already loaded one.
plugins_to_remove = []
for plugin, cfg in self._plugin_configs.iteritems():
if cfg["type"] != plugin_cfg["type"]:
continue
if cfg.has_key("devices") and cfg["devices"] != None:
for device in plugin_cfg["devices"]:
if device in cfg["devices"]:
cfg["devices"].remove(device)
log.debug("Disabling plugin %s device %s" % (plugin, device))
# If we removed all devices, this plugin is not useful anymore,
# so we should remove it too:
if (len(cfg["devices"]) == 0):
plugins_to_remove.append(plugin)
else:
# If the plugin does not have any device set, it can be
# run only once, so remove previous occurence
log.debug("Disabling plugin %s" % (plugin))
plugins_to_remove.append(plugin)
for plugin in plugins_to_remove:
log.debug("Removing plugin %s because it is useless after disabling" % (plugin))
del self._plugin_configs[plugin]
def _merge_plugin(self, name, plugin_cfg):
# Iterates over already loaded plugins.
# Merges the option of two plugins with the same types together
plugins_to_remove = []
for plugin, cfg in self._plugin_configs.iteritems():
if cfg["type"] != plugin_cfg["type"]:
continue
if cfg.has_key("devices") and cfg["devices"] != None:
for device in plugin_cfg["devices"]:
if device in cfg["devices"]:
log.debug("Merging plugin %s with %s" % (name, plugin))
plugin_cfg.update(cfg)
plugins_to_remove.append(plugin)
break
else:
log.debug("Merging plugin %s with %s" % (name, plugin))
tmp_cfg = cfg
tmp_cfg.update(plugin_cfg)
plugin_cfg = tmp_cfg
plugins_to_remove.append(plugin)
for plugin in plugins_to_remove:
log.debug("Removing plugin %s because it is useless after merge" % (plugin))
del self._plugin_configs[plugin]
return plugin_cfg
def _fnmatch_list(self, tunable, devs):
for dev in devs:
if fnmatch.fnmatch(tunable, dev):
return True
return False
def _store_plugin_config(self, name, plugin_cfg):
plugin = plugin_cfg["type"]
# Check if the plugin is supported on this HW
try:
if not self._manager.plugins_repository.is_supported(plugin):
log.info("Plugin %s is not supported on this HW" % (plugin))
return
except exceptions.TunedException as e:
e.log()
log.error("unable to create unit %s" % plugin)
return
# If there are no devices set, set all tunable_devices as default
if not plugin_cfg.has_key("devices"):
try:
plugin_cfg["devices"] = self._manager.plugins_repository.tunable_devices(plugin)
except exceptions.TunedException as e:
e.log()
log.error("unable to create unit %s" % plugin)
return
else:
devs = plugin_cfg["devices"].split(",") # [sd*, dm*, sda1, sda2]
tunable_devices = self._manager.plugins_repository.tunable_devices(plugin)
# Filter out devices which are not mentioned in 'devs'. This is
# here because of wildcard-matching support.
plugin_cfg["devices"] = [tunable for tunable in tunable_devices if self._fnmatch_list(tunable, devs)]
if plugin_cfg.has_key("disable"):
self._disable_plugin(name, plugin_cfg)
return
if plugin_cfg.has_key("replace"):
self._disable_plugin(name, plugin_cfg)
del plugin_cfg["replace"]
else:
plugin_cfg = self._merge_plugin(name, plugin_cfg)
self._plugin_configs[name] = plugin_cfg
def _get_unique_plugin_name(self, name):
i = 1
n = name
while n in self._plugin_configs.keys():
n = name + "_" + str(i)
i += 1
return n
def _load_plugins(self, cfg, load_path):
for section in cfg.sections():
if section == "main":
continue
if not cfg.has_option(section, "type"):
log.info("No 'type' option for %s plugin, will treat '%s' as a plugin type" % (section, section))
cfg.set(section, "type", section)
cfg.set(section, "_load_path", load_path)
self._store_plugin_config(self._get_unique_plugin_name(section), dict(cfg.items(section)))
return True
def _load_config(self, config):
if not os.path.exists(config):
log.error("Config file %s does not exist" % (config))
return False
cfg = ConfigParser.SafeConfigParser()
cfg.read(config)
if cfg.has_option("main", "include"):
included_cfg = self.find_profile(cfg.get("main", "include"))
if not self._load_config(included_cfg):
return False
return self._load_plugins(cfg, os.path.dirname(config))
def load(self):
parsed = False
# Load more config files stored as list
if isinstance(self._config_file, list):
for cfg in self._config_file:
parsed = self._load_config(cfg)
if not parsed:
break
else:
parsed = self._load_config(self._config_file)
return (parsed and self._apply_config())
def cleanup(self):
pass
from tuned.profiles.loader import *
from tuned.profiles.profile import *
from tuned.profiles.unit import *
from tuned.profiles.exceptions import *
import tuned.exceptions
class InvalidProfileException(tuned.exceptions.TunedException):
pass
import tuned.profiles.profile
import ConfigParser
import os.path
from tuned.profiles.exceptions import InvalidProfileException
class Loader(object):
"""
Profiles loader.
"""
__slots__ = [ "_load_directories" ]
def __init__(self, load_directories = None):
if load_directories is None:
load_directories = [ "/var/lib/tuned", "/etc/tuned" ]
elif type(load_directories) is not list:
raise TypeError("load_directories parameter is not a list")
self._load_directories = load_directories
def _create_profile(self, config):
return tuned.profiles.profile.Profile(config)
@property
def load_directories(self):
return self._load_directories
def add_directory(self, new_dir):
self._load_directories.append(new_dir)
def load(self, profile_name):
file_name = self._find_config(profile_name)
if file_name is None:
raise InvalidProfileException("Profile '%s' not found." % profile_name)
config = self._load_config(file_name)
self._clean_config(config, file_name)
# merging is removed temporarily
return self._create_profile(config)
def _find_config(self, profile_name, skip_files=None):
for dir_name in reversed(self._load_directories):
config_file = os.path.join(dir_name, profile_name, "tuned.conf")
config_file = os.path.normpath(config_file)
if skip_files is not None and config_file in skip_files:
continue
if os.path.exists(config_file):
return config_file
def _load_config(self, file_name):
parser = ConfigParser.SafeConfigParser(allow_no_value=True)
try:
parser.read(file_name)
except ConfigParser.Error as e:
raise InvalidProfileException("Cannot load profile.", e)
data = {}
for section in parser.sections():
data[section] = {}
for option, value in parser.items(section):
data[section][option] = value
return data
def _clean_config(self, config, file_name):
for unit_name in config:
# nothing special for global options
if unit_name == "main":
continue
# no plugin type specified, assume it matches the unit name
if not "type" in config[unit_name]:
config[unit_name]["type"] = unit_name
# special case: script names have to be expanded
if config[unit_name]["type"] == "script" and "script" in config[unit_name]:
dir_name = os.path.dirname(file_name)
script_path = os.path.join(dir_name, config[unit_name]["script"])
config[unit_name]["script"] = os.path.normpath(script_path)
import tuned.profiles.unit
class Profile(object):
"""
Representation of a tuning profile.
"""
__slots__ = ["_config", "_options", "_units"]
def __init__(self, config):
self._config = config
self._init_options(config)
self._init_units(config)
def _init_options(self, config):
self._options = {}
if "main" in config:
self._options = config["main"].copy()
def _init_units(self, config):
self._units = []
for unit_name in config:
if unit_name != "main":
new_unit = self._create_unit(unit_name, config[unit_name])
self._units.append(new_unit)
def _create_unit(self, name, config_options):
options = config_options.copy()
plugin = options.pop("type")
return tuned.profiles.unit.Unit(name, plugin, options)
@property
def units(self):
"""
Units included in the profile.
"""
return self._units
@property
def options(self):
"""
Profile global options.
"""
return self._options
class Unit(object):
"""
Unit description.
"""
__slots__ = [ "_name", "_plugin", "_options" ]
def __init__(self, name, plugin, options):
self._name = name
self._plugin = plugin
self._options = options
@property
def name(self):
return self._name
@property
def plugin(self):
return self._plugin
@property
def options(self):
return self._options
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment