Skip to content
Commits on Source (3)
......@@ -5,6 +5,123 @@ History
.. to_doc
---------------------
18.5.13 (2018-05-23)
---------------------
* Small updates to test parsing to support Galaxy workflow testing.
---------------------
18.5.12 (2018-05-22)
---------------------
* Update test data processing to allow URIs in Galaxy workflow tests.
---------------------
18.5.11 (2018-05-16)
---------------------
* Parse CWL SoftwareRequirements to Galaxy requirements (required to fix various Planemo functionality
for CWL tools).
---------------------
18.5.10 (2018-05-10)
---------------------
* Docker logging API fix for Planemo.
---------------------
18.5.9 (2018-05-07)
---------------------
* Update CWL linting to target CWL 1.0.
---------------------
18.5.8 (2018-05-06)
---------------------
* Better error handling for Conda searching (thanks to @bgruening).
* Update against the latest Galaxy codebase.
* Add Galaxy tool linting to ensure versions are PEP 440 compliant (thanks to @davebx).
---------------------
18.5.7 (2018-03-12)
---------------------
* More tool testing client fixes, this time for ephemeris.
---------------------
18.5.6 (2018-03-12)
---------------------
* Bring in the latest Galaxy dev branch - includes code cleanup and many Python 3 fixes from
@nsoranzo as well as client code for executing tool tests against external Galaxy instances.
* Extend tool testing client from Galaxy's dev branch with even more data collection for compatiblity
with Planemo.
---------------------
18.5.5 (2018-03-06)
---------------------
* Fix mulled to use shlex.quote to escape single quotes in test command
(thanks to @mbargull).
* Make markupsafe a dependency since it is import unconditionally in galaxy.tools.toolbox
(thanks to @mbargull).
* Python 3 fix for assertion testing.
---------------------
18.5.4 (2018-03-01)
---------------------
* Make conda image for mulled builds configurable via an environment variable
(thanks to @mbargull).
---------------------
18.5.3 (2018-02-28)
---------------------
* Fix path module for import on Windows for Pulsar.
---------------------
18.5.2 (2018-02-28)
---------------------
* Various fixes for library usage mostly related to Conda (with help from @nsoranzo).
---------------------
18.5.1 (2018-02-26)
---------------------
* Redo last release - pushed to PyPI without actually including the desired fix.
---------------------
18.5.0 (2018-02-26)
---------------------
* Another Python 3 fix for Planemo.
* Fix galaxy-lib version - this has actually been tracking the 18.05 release of Galaxy for the last two releases.
---------------------
18.1.0 (2018-02-26)
---------------------
* More Python 3 fixes for Planemo thanks to @nsoranzo.
* Bring in the latest Galaxy development branch.
---------------------
17.9.12 (2018-02-22)
---------------------
* Python 3 fix for Planemo thanks to @nsoranzo.
* Fix bad merge of miniconda update for mulled work.
---------------------
17.9.11 (2018-02-22)
---------------------
* Update to the latest Galaxy dev just prior to the branch of 18.01.
* Python 3 fixes.
---------------------
17.9.10 (2017-11-23)
---------------------
......
Metadata-Version: 1.1
Name: galaxy-lib
Version: 17.9.10
Version: 18.5.13
Summary: Subset of Galaxy (http://galaxyproject.org/) core code base designed to be used a library.
Home-page: https://github.com/galaxyproject/galaxy-lib
Author: Galaxy Project and Community
Author-email: jmchilton@gmail.com
License: AFL
Description-Content-Type: UNKNOWN
Description:
.. image:: https://readthedocs.org/projects/galaxy-lib/badge/?version=latest
:target: http://galaxy-lib.readthedocs.io/en/latest/?badge=latest
......@@ -47,6 +48,123 @@ Description:
.. to_doc
---------------------
18.5.13 (2018-05-23)
---------------------
* Small updates to test parsing to support Galaxy workflow testing.
---------------------
18.5.12 (2018-05-22)
---------------------
* Update test data processing to allow URIs in Galaxy workflow tests.
---------------------
18.5.11 (2018-05-16)
---------------------
* Parse CWL SoftwareRequirements to Galaxy requirements (required to fix various Planemo functionality
for CWL tools).
---------------------
18.5.10 (2018-05-10)
---------------------
* Docker logging API fix for Planemo.
---------------------
18.5.9 (2018-05-07)
---------------------
* Update CWL linting to target CWL 1.0.
---------------------
18.5.8 (2018-05-06)
---------------------
* Better error handling for Conda searching (thanks to @bgruening).
* Update against the latest Galaxy codebase.
* Add Galaxy tool linting to ensure versions are PEP 440 compliant (thanks to @davebx).
---------------------
18.5.7 (2018-03-12)
---------------------
* More tool testing client fixes, this time for ephemeris.
---------------------
18.5.6 (2018-03-12)
---------------------
* Bring in the latest Galaxy dev branch - includes code cleanup and many Python 3 fixes from
@nsoranzo as well as client code for executing tool tests against external Galaxy instances.
* Extend tool testing client from Galaxy's dev branch with even more data collection for compatiblity
with Planemo.
---------------------
18.5.5 (2018-03-06)
---------------------
* Fix mulled to use shlex.quote to escape single quotes in test command
(thanks to @mbargull).
* Make markupsafe a dependency since it is import unconditionally in galaxy.tools.toolbox
(thanks to @mbargull).
* Python 3 fix for assertion testing.
---------------------
18.5.4 (2018-03-01)
---------------------
* Make conda image for mulled builds configurable via an environment variable
(thanks to @mbargull).
---------------------
18.5.3 (2018-02-28)
---------------------
* Fix path module for import on Windows for Pulsar.
---------------------
18.5.2 (2018-02-28)
---------------------
* Various fixes for library usage mostly related to Conda (with help from @nsoranzo).
---------------------
18.5.1 (2018-02-26)
---------------------
* Redo last release - pushed to PyPI without actually including the desired fix.
---------------------
18.5.0 (2018-02-26)
---------------------
* Another Python 3 fix for Planemo.
* Fix galaxy-lib version - this has actually been tracking the 18.05 release of Galaxy for the last two releases.
---------------------
18.1.0 (2018-02-26)
---------------------
* More Python 3 fixes for Planemo thanks to @nsoranzo.
* Bring in the latest Galaxy development branch.
---------------------
17.9.12 (2018-02-22)
---------------------
* Python 3 fix for Planemo thanks to @nsoranzo.
* Fix bad merge of miniconda update for mulled work.
---------------------
17.9.11 (2018-02-22)
---------------------
* Update to the latest Galaxy dev just prior to the branch of 18.01.
* Python 3 fixes.
---------------------
17.9.10 (2017-11-23)
---------------------
......
galaxy-lib (17.9.10-1) unstable; urgency=low
galaxy-lib (18.5.13-1) unstable; urgency=low
* Initial Debianization (Closes: #890061).
* New upstream version.
* Another upload (Closes: #890061).
* Added two names to debian/copyright
* Added myself to uploaders, mentioned team management
-- Steffen Moeller <moeller@debian.org> Wed, 13 Jun 2018 23:00:11 +0200
galaxy-lib (17.9.10-1) UNRELEASED; urgency=low
* Initial Debianization.
-- Michael R. Crusoe <michael.crusoe@gmail.com> Sat, 10 Feb 2018 16:37:00 +0000
Source: galaxy-lib
Maintainer: Michael R. Crusoe <michael.crusoe@gmail.com>
Maintainer: Debian Med Packaging Team <debian-med-packaging@lists.alioth.debian.org>
Uploaders: Michael R. Crusoe <michael.crusoe@gmail.com>,
Steffen Moeller <moeller@debian.org>
Section: python
Priority: optional
Build-Depends: debhelper (>= 10),
......@@ -14,7 +16,7 @@ Build-Depends: debhelper (>= 10),
python3-setuptools,
python3-six (>= 1.9.0),
python3-yaml
Standards-Version: 4.1.3
Standards-Version: 4.1.4
Vcs-Browser: https://salsa.debian.org/med-team/galaxy-lib
Vcs-Git: https://salsa.debian.org/med-team/galaxy-lib.git
Homepage: https://github.com/galaxyproject/galaxy-lib
......
......@@ -11,6 +11,12 @@ Comment: There are no icons in upstream's source tarball, nor are there CC 3.0
Files: galaxy/util/filelock.py
Copyright: (c) 2009, Evan Fosmark
License: BSD-3-clause
Files: galaxy/util/inflection.py
Copyright: 2006, Bermi Ferrer Martinez
License: BSD-3-clause
License: BSD-3-clause
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
......@@ -34,6 +40,11 @@ License: BSD-3-clause
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Files: galaxy/util/topsort.py
Copyright: 1999, originates in email from Tim Peters
license not explicitly specified, link to 1999 usenet post stale
License: AFL
Files: debian/*
Copyright: 2018 © Michael R. Crusoe <michael.crusoe@gmail.com>
License: AFL
......
# -*- coding: utf-8 -*-
__version__ = '17.9.10'
__version__ = '18.5.13'
PROJECT_NAME = "galaxy-lib"
PROJECT_OWNER = PROJECT_USERAME = "galaxyproject"
......
"""
Interfaces to containerization software
"""
from __future__ import absolute_import
import errno
import inspect
import logging
import shlex
import subprocess
import sys
import uuid
from abc import (
ABCMeta,
abstractmethod,
abstractproperty
)
from collections import namedtuple
import yaml
from six import string_types, with_metaclass
from six.moves import shlex_quote
from galaxy.exceptions import ContainerCLIError
from galaxy.util.submodules import submodules
DEFAULT_CONTAINER_TYPE = 'docker'
DEFAULT_CONF = {'_default_': {'type': DEFAULT_CONTAINER_TYPE}}
log = logging.getLogger(__name__)
class ContainerPort(namedtuple('ContainerPort', ('port', 'protocol', 'hostaddr', 'hostport'))):
"""Named tuple representing ports published by a container, with attributes:
:ivar port: Port number (inside the container)
:vartype port: int
:ivar protocol: Port protocol, either ``tcp`` or ``udp``
:vartype protocol: str
:ivar hostaddr: Address or hostname where the published port can be accessed
:vartype hostaddr: str
:ivar hostport: Published port number on which the container can be accessed
:vartype hostport: int
"""
class ContainerVolume(with_metaclass(ABCMeta, object)):
valid_modes = frozenset(["ro", "rw"])
def __init__(self, path, host_path=None, mode=None):
self.path = path
self.host_path = host_path
self.mode = mode
if mode and not self.mode_is_valid:
raise ValueError("Invalid container volume mode: %s" % mode)
@abstractmethod
def from_str(cls, as_str):
"""Classmethod to convert from this container type's string representation.
:param as_str: string representation of volume
:type as_str: str
"""
@abstractmethod
def __str__(self):
"""Return this container type's string representation of the volume.
"""
@abstractmethod
def to_native(self):
"""Return this container type's native representation of the volume.
"""
@property
def mode_is_valid(self):
return self.mode in self.valid_modes
class Container(with_metaclass(ABCMeta, object)):
def __init__(self, interface, id, name=None, **kwargs):
""":param interface: Container interface for the given container type
:type interface: :class:`ContainerInterface` subclass instance
:param id: Container identifier
:type id: str
:param name: Container name
:type name: str
"""
self._interface = interface
self._id = id
self._name = name
@property
def id(self):
"""The container's id"""
return self._id
@property
def name(self):
"""The container's name"""
return self._name
@abstractmethod
def from_id(cls, interface, id):
"""Classmethod to create an instance of Container from the container system's id for the given container type.
:param interface: Container insterface for the given id
:type interface: :class:`ContainerInterface` subclass instance
:param id: Container identifier
:type id: str
:returns: Container object
:rtype: :class:`Container` subclass instance
"""
@abstractproperty
def ports(self):
"""Attribute for accessing details of ports published by the container.
:returns: Port details
:rtype: list of :class:`ContainerPort`s
"""
@abstractproperty
def address(self):
"""Attribute for accessing the address or hostname where published ports can be accessed.
:returns: Hostname or IP address
:rtype: str
"""
@abstractmethod
def is_ready(self):
"""Indicate whether or not the container is "ready" (up, available, running).
:returns: True if ready, else False
:rtpe: bool
"""
class ContainerInterface(with_metaclass(ABCMeta, object)):
container_type = None
container_class = None
volume_class = None
conf_defaults = {
'name_prefix': 'galaxy_',
}
option_map = {}
publish_port_list_required = False
supports_volumes = True
def __init__(self, conf, key, containers_config_file):
self._key = key
self._containers_config_file = containers_config_file
mro = reversed(self.__class__.__mro__)
next(mro)
self._conf = ContainerInterfaceConfig()
for c in mro:
self._conf.update(c.conf_defaults)
self._conf.update(conf)
self.validate_config()
def _normalize_command(self, command):
if isinstance(command, string_types):
command = shlex.split(command)
return command
def _guess_kwopt_type(self, val):
opttype = 'string'
if isinstance(val, bool):
opttype = 'boolean'
elif isinstance(val, list):
opttype = 'list'
try:
if isinstance(val[0], tuple) and len(val[0]) == 3:
opttype = 'list_of_kovtrips'
except IndexError:
pass
elif isinstance(val, dict):
opttype = 'list_of_kvpairs'
return opttype
def _guess_kwopt_flag(self, opt):
return '--%s' % opt.replace('_', '-')
def _stringify_kwopts(self, kwopts):
opts = []
for opt, val in kwopts.items():
try:
optdef = self.option_map[opt]
except KeyError:
optdef = {
'flag': self._guess_kwopt_flag(opt),
'type': self._guess_kwopt_type(val),
}
log.warning("option '%s' not in %s.option_map, guessing flag '%s' type '%s'",
opt, self.__class__.__name__, optdef['flag'], optdef['type'])
opts.append(getattr(self, '_stringify_kwopt_' + optdef['type'])(optdef['flag'], val))
return ' '.join(opts)
def _stringify_kwopt_boolean(self, flag, val):
"""
"""
return '{flag}={value}'.format(flag=flag, value=str(val).lower())
def _stringify_kwopt_string(self, flag, val):
"""
"""
return '{flag} {value}'.format(flag=flag, value=shlex_quote(str(val)))
def _stringify_kwopt_list(self, flag, val):
"""
"""
if isinstance(val, string_types):
return self._stringify_kwopt_string(flag, val)
return ' '.join(['{flag} {value}'.format(flag=flag, value=shlex_quote(str(v))) for v in val])
def _stringify_kwopt_list_of_kvpairs(self, flag, val):
"""
"""
l = []
if isinstance(val, list):
# ['foo=bar', 'baz=quux']
l = val
else:
# {'foo': 'bar', 'baz': 'quux'}
for k, v in dict(val).items():
l.append('{k}={v}'.format(k=k, v=v))
return self._stringify_kwopt_list(flag, l)
def _stringify_kwopt_list_of_kovtrips(self, flag, val):
"""
"""
if isinstance(val, string_types):
return self._stringify_kwopt_string(flag, val)
l = []
for k, o, v in val:
l.append('{k}{o}{v}'.format(k=k, o=o, v=v))
return self._stringify_kwopt_list(flag, l)
def _run_command(self, command, verbose=False):
if verbose:
log.debug('running command: [%s]', command)
command_list = self._normalize_command(command)
p = subprocess.Popen(command_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
stdout, stderr = p.communicate()
if p.returncode == 0:
return stdout.strip()
else:
msg = "Command '{}' returned non-zero exit status {}".format(command, p.returncode)
log.error(msg + ': ' + stderr.strip())
raise ContainerCLIError(
msg,
stdout=stdout.strip(),
stderr=stderr.strip(),
returncode=p.returncode,
command=command,
subprocess_command=command_list)
@property
def key(self):
return self._key
@property
def containers_config_file(self):
return self._containers_config_file
def get_container(self, container_id):
return self.container_class.from_id(self, container_id)
def set_kwopts_name(self, kwopts):
if self._name_prefix is not None:
name = '{prefix}{name}'.format(
prefix=self._name_prefix,
name=kwopts.get('name', uuid.uuid4().hex)
)
kwopts['name'] = name
def validate_config(self):
"""
"""
self._name_prefix = self._conf.name_prefix
@abstractmethod
def run_in_container(self, command, image=None, **kwopts):
"""
"""
class ContainerInterfaceConfig(dict):
def __setattr__(self, name, value):
self[name] = value
def __getattr__(self, name):
try:
return self[name]
except KeyError:
raise AttributeError("'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
def get(self, name, default=None):
try:
return self[name]
except KeyError:
return default
def build_container_interfaces(containers_config_file, containers_conf=None):
"""Build :class:`ContainerInterface`s. Pass ``containers_conf`` to avoid rereading the config file.
:param containers_config_file: Filename of containers_conf.yml
:type containers_config_file: str
:param containers_conf: Optional containers conf (as read from containers_conf.yml), will be used in place
of containers_config_file
:type containers_conf: dict
:returns: Instantiated container interfaces with keys corresponding to ``containers`` keys
:rtype: dict of :class:`ContainerInterface` subclass instances
"""
if not containers_conf:
containers_conf = parse_containers_config(containers_config_file)
interface_classes = _get_interface_modules()
interfaces = {}
for k, conf in containers_conf.items():
container_type = conf.get('type', DEFAULT_CONTAINER_TYPE)
assert container_type in interface_classes, "unknown container interface type: %s" % container_type
interfaces[k] = interface_classes[container_type](conf, k, containers_config_file)
return interfaces
def parse_containers_config(containers_config_file):
"""Parse a ``containers_conf.yml`` and return the contents of its ``containers`` dictionary.
:param containers_config_file: Filename of containers_conf.yml
:type containers_config_file: str
:returns: Contents of the dictionary under the ``containers`` key
:rtype: dict
"""
conf = DEFAULT_CONF.copy()
try:
with open(containers_config_file) as fh:
c = yaml.safe_load(fh)
conf.update(c.get('containers', {}))
except (OSError, IOError) as exc:
if exc.errno == errno.ENOENT:
log.debug("config file '%s' does not exist, running with default config", containers_config_file)
else:
raise
return conf
def _get_interface_modules():
interfaces = []
modules = submodules(sys.modules[__name__])
for module in modules:
module_names = [getattr(module, _) for _ in dir(module)]
classes = [_ for _ in module_names if inspect.isclass(_) and
not _ == ContainerInterface and issubclass(_, ContainerInterface)]
interfaces.extend(classes)
return dict((x.container_type, x) for x in interfaces)
This diff is collapsed.
"""
Decorators for docker
"""
import json
from functools import wraps
def docker_columns(f):
@wraps(f)
def parse_docker_column_output(*args, **kwargs):
"""Many docker commands do not provide an option to format the output
or output in a machine-readily-parseable format (e.g. json). In order
to deal with such output and hopefully stay compatible with future
column order changes, key returned rows based on column headers.
An assumption is made that a single space in the header row does not
separate columns - column names can have spaces in them, and columns
are separated by at least 2 spaces. This seems to be true as of Docker
1.13.1.
"""
output = f(*args, **kwargs)
parsed = []
output = output.splitlines()
header = output[0]
colstarts = [0]
colidx = 0
spacect = 0
if not output:
return parsed
for i, c in enumerate(header):
if c != ' ' and spacect > 1:
colidx += 1
colstarts.append(i)
spacect = 0
elif c == ' ':
spacect += 1
colstarts.append(None)
colheadings = []
for i in range(0, len(colstarts) - 1):
colheadings.append(header[colstarts[i]:colstarts[i + 1]].strip())
for line in output[1:]:
row = {}
for i, key in enumerate(colheadings):
row[key] = line[colstarts[i]:colstarts[i + 1]].strip()
parsed.append(row)
return parsed
return parse_docker_column_output
def docker_json(f):
@wraps(f)
def json_loads(*args, **kwargs):
return json.loads(f(*args, **kwargs))
return json_loads
This diff is collapsed.
"""
Docker Swarm mode interface
"""
from __future__ import absolute_import
import logging
import os.path
import subprocess
from functools import partial
try:
import docker
except ImportError:
from galaxy.util.bunch import Bunch
docker = Bunch(types=Bunch(
ContainerSpec=None,
RestartPolicy=None,
Resources=None,
Placement=None,
))
from galaxy.containers.docker import (
DockerAPIInterface,
DockerCLIInterface,
DockerInterface
)
from galaxy.containers.docker_decorators import docker_columns, docker_json
from galaxy.containers.docker_model import (
CPUS_CONSTRAINT,
DockerNode,
DockerService,
DockerTask,
IMAGE_CONSTRAINT
)
from galaxy.exceptions import ContainerRunError
from galaxy.util.json import safe_dumps_formatted
log = logging.getLogger(__name__)
SWARM_MANAGER_PATH = os.path.abspath(
os.path.join(
os.path.dirname(__file__),
os.path.pardir,
os.path.pardir,
os.path.pardir,
'scripts',
'docker_swarm_manager.py'))
class DockerSwarmInterface(DockerInterface):
container_class = DockerService
conf_defaults = {
'ignore_volumes': False,
'node_prefix': None,
'service_create_image_constraint': False,
'service_create_cpus_constraint': False,
'resolve_image_digest': False,
'managed': True,
'manager_autostart': True,
}
publish_port_list_required = True
supports_volumes = False
def validate_config(self):
super(DockerSwarmInterface, self).validate_config()
self._node_prefix = self._conf.node_prefix
def run_in_container(self, command, image=None, **kwopts):
"""Run a service like a detached container
"""
kwopts['replicas'] = 1
kwopts['restart_condition'] = 'none'
if kwopts.get('publish_all_ports', False):
# not supported for services
# TODO: inspect image (or query registry if possible) for port list
if kwopts.get('publish_port_random', False) or kwopts.get('ports', False):
# assume this covers for publish_all_ports
del kwopts['publish_all_ports']
else:
raise ContainerRunError(
"Publishing all ports is not supported in Docker swarm"
" mode, use `publish_port_random` or `ports`",
image=image,
command=command
)
if not kwopts.get('detach', True):
raise ContainerRunError(
"Running attached containers is not supported in Docker swarm mode",
image=image,
command=command
)
elif kwopts.get('detach', None):
del kwopts['detach']
if kwopts.get('volumes', None):
if self._conf.ignore_volumes:
log.warning(
"'volumes' kwopt is set and not supported in Docker swarm "
"mode, volumes will not be passed (set 'ignore_volumes: "
"False' in containers config to fail instead): %s" % kwopts['volumes']
)
else:
raise ContainerRunError(
"'volumes' kwopt is set and not supported in Docker swarm "
"mode (set 'ignore_volumes: True' in containers config to "
"warn instead): %s" % kwopts['volumes'],
image=image,
command=command
)
# ensure the volumes key is removed from kwopts
kwopts.pop('volumes', None)
service = self.service_create(command, image=image, **kwopts)
self._run_swarm_manager()
return service
#
# helpers
#
def _run_swarm_manager(self):
if self._conf.managed and self._conf.manager_autostart:
try:
# sys.exectuable would be preferable to using $PATH, but sys.executable is probably uwsgi
subprocess.check_call(['python', SWARM_MANAGER_PATH, '--containers-config-file',
self.containers_config_file, '--swarm', self.key])
except subprocess.CalledProcessError as exc:
log.error('Failed to launch swarm manager: %s', str(exc))
def _get_image(self, image):
"""Get the image string, either from the argument, or from the
configured interface default if ``image`` is ``None``. Optionally
resolve the image to its digest if ``resolve_image_digest`` is set in
the interface configuration.
If the image has not been pulled, the repo digest cannot be determined
and the image name will be returned.
:type image: str or None
:param image: image id or name
:returns: image name or image repo digest
"""
if not image:
image = self._conf.image
assert image is not None, "No image supplied as parameter and no image set as default in config, cannot create service"
if self._conf.resolve_image_digest:
image = self.image_repodigest(image)
return image
def _objects_by_attribute(self, generator, attribute_name):
rval = {}
for obj in generator:
attr = getattr(obj, attribute_name)
if attr not in rval:
rval[attr] = []
rval[attr].append(obj)
return rval
#
# docker object generators
#
def services(self, id=None, name=None):
for service_dict in self.service_ls(id=id, name=name):
service_id = service_dict['ID']
service = DockerService(self, service_id, inspect=service_dict)
if service.name.startswith(self._name_prefix):
yield service
def service(self, id=None, name=None):
try:
return self.services(id=id, name=name).next()
except StopIteration:
return None
def services_in_state(self, desired, current, tasks='any'):
for service in self.services():
if service.in_state(desired, current, tasks=tasks):
yield service
def service_tasks(self, service):
for task_dict in self.service_ps(service.id):
yield DockerTask.from_api(self, task_dict, service=service)
def nodes(self, id=None, name=None):
for node_dict in self.node_ls(id=id, name=name):
node_id = node_dict['ID']
node = DockerNode(self, node_id, inspect=node_dict)
if self._node_prefix and not node.name.startswith(self._node_prefix):
continue
yield node
def node(self, id=None, name=None):
try:
return self.nodes(id=id, name=name).next()
except StopIteration:
return None
def nodes_in_state(self, status, availability):
for node in self.nodes():
if node.in_state(status, availability):
yield node
def node_tasks(self, node):
for task_dict in self.node_ps(node.id):
yield DockerTask.from_api(self, task_dict, node=node)
#
# higher level queries
#
def services_waiting(self):
return self.services_in_state('Running', 'Pending')
def services_waiting_by_constraints(self):
return self._objects_by_attribute(self.services_waiting(), 'constraints')
def services_completed(self):
return self.services_in_state('Shutdown', 'Complete', tasks='all')
def services_terminal(self):
return [s for s in self.services() if s.terminal]
def nodes_active(self):
return self.nodes_in_state('Ready', 'Active')
def nodes_active_by_constraints(self):
return self._objects_by_attribute(self.nodes_active(), 'labels_as_constraints')
#
# operations
#
def services_clean(self):
cleaned_service_ids = []
completed_services = list(self.services_completed()) # returns a generator, should probably fix this
if completed_services:
cleaned_service_ids.extend(self.service_rm([x.id for x in completed_services]))
terminal_services = list(self.services_terminal())
for service in terminal_services:
log.warning('cleaned service in abnormal terminal state: %s (%s). state: %s', service.name, service.id, service.state)
if terminal_services:
cleaned_service_ids.extend(self.service_rm([x.id for x in terminal_services]))
return filter(lambda x: x.id in cleaned_service_ids, completed_services + terminal_services)
class DockerSwarmCLIInterface(DockerSwarmInterface, DockerCLIInterface):
container_type = 'docker_swarm_cli'
option_map = {
# `service create` options
'constraint': {'flag': '--constraint', 'type': 'list_of_kovtrips'},
'replicas': {'flag': '--replicas', 'type': 'string'},
'restart_condition': {'flag': '--restart-condition', 'type': 'string'},
'environment': {'flag': '--env', 'type': 'list_of_kvpairs'},
'name': {'flag': '--name', 'type': 'string'},
'publish_port_random': {'flag': '--publish', 'type': 'string'},
'cpu_limit': {'flag': '--limit-cpu', 'type': 'string'},
'mem_limit': {'flag': '--limit-memory', 'type': 'string'},
'cpu_reservation': {'flag': '--reserve-cpu', 'type': 'string'},
'mem_reservation': {'flag': '--reserve-memory', 'type': 'string'},
# `service update` options
'label_add': {'flag': '--label-add', 'type': 'list_of_kvpairs'},
'label_rm': {'flag': '--label-rm', 'type': 'list_of_kvpairs'},
'availability': {'flag': '--availability', 'type': 'string'},
}
#
# docker object generators
#
def services(self, id=None, name=None):
for service_dict in self.service_ls(id=id, name=name):
service_id = service_dict['ID']
service_name = service_dict['NAME']
if not service_name.startswith(self._name_prefix):
continue
task_list = self.service_ps(service_id)
yield DockerService.from_cli(self, service_dict, task_list)
def service_tasks(self, service):
for task_dict in self.service_ps(service.id):
if task_dict['NAME'].strip().startswith('\_'):
continue # historical task
yield DockerTask.from_cli(self, task_dict, service=service)
def nodes(self, id=None, name=None):
for node_dict in self.node_ls(id=id, name=name):
node_id = node_dict['ID'].strip(' *')
node_name = node_dict['HOSTNAME']
if self._node_prefix and not node_name.startswith(self._node_prefix):
continue
task_list = filter(lambda x: x['NAME'].startswith(self._name_prefix), self.node_ps(node_id))
yield DockerNode.from_cli(self, node_dict, task_list)
#
# docker subcommands
#
def service_create(self, command, image=None, **kwopts):
if ('service_create_image_constraint' in self._conf or 'service_create_cpus_constraint' in self._conf) and 'constraint' not in kwopts:
kwopts['constraint'] = []
image = self._get_image(image)
if self._conf.service_create_image_constraint:
kwopts['constraint'].append((IMAGE_CONSTRAINT, '==', image))
if self._conf.service_create_cpus_constraint:
cpus = kwopts.get('reserve_cpus', kwopts.get('limit_cpus', '1'))
kwopts['constraint'].append((CPUS_CONSTRAINT, '==', cpus))
if self._conf.cpus:
kwopts['cpu_limit'] = self._conf.cpus
kwopts['cpu_reservation'] = self._conf.cpus
if self._conf.memory:
kwopts['mem_limit'] = self._conf.memory
kwopts['mem_reservation'] = self._conf.memory
self.set_kwopts_name(kwopts)
args = '{kwopts} {image} {command}'.format(
kwopts=self._stringify_kwopts(kwopts),
image=image if image else '',
command=command if command else '',
).strip()
service_id = self._run_docker(subcommand='service create', args=args, verbose=True)
return DockerService.from_id(self, service_id)
@docker_json
def service_inspect(self, service_id):
return self._run_docker(subcommand='service inspect', args=service_id)[0]
@docker_columns
def service_ls(self, id=None, name=None):
return self._run_docker(subcommand='service ls', args=self._filter_by_id_or_name(id, name))
@docker_columns
def service_ps(self, service_id):
return self._run_docker(subcommand='service ps', args='--no-trunc {}'.format(service_id))
def service_rm(self, service_ids):
service_ids = ' '.join(service_ids)
return self._run_docker(subcommand='service rm', args=service_ids).splitlines()
@docker_json
def node_inspect(self, node_id):
return self._run_docker(subcommand='node inspect', args=node_id)[0]
@docker_columns
def node_ls(self, id=None, name=None):
return self._run_docker(subcommand='node ls', args=self._filter_by_id_or_name(id, name))
@docker_columns
def node_ps(self, node_id):
return self._run_docker(subcommand='node ps', args='--no-trunc {}'.format(node_id))
def node_update(self, node_id, **kwopts):
return self._run_docker(subcommand='node update', args='{kwopts} {node_id}'.format(
kwopts=self._stringify_kwopts(kwopts),
node_id=node_id
))
@docker_json
def task_inspect(self, task_id):
return self._run_docker(subcommand="inspect", args=task_id)
class DockerSwarmAPIInterface(DockerSwarmInterface, DockerAPIInterface):
container_type = 'docker_swarm'
placement_option_map = {
'constraint': {'param': 'constraints'},
}
service_mode_option_map = {
'service_mode': {'param': 0, 'default': 'replicated'},
'replicas': {'default': 1},
}
endpoint_spec_option_map = {
'ports': {},
}
resources_option_map = {
'cpus': {'params': ('cpu_limit', 'cpu_reservation'), 'map': lambda x: int(x * 1000000000)},
'memory': {'params': ('mem_limit', 'mem_reservation')},
}
container_spec_option_map = {
'image': {'param': 0},
'command': {},
'environment': {'param': 'env'},
'labels': {},
}
restart_policy_option_map = {
'restart_condition': {'param': 'condition', 'default': 'none'},
'restart_delay': {'param': 'delay'},
'restart_max_attempts': {'param': 'max_attemps'},
}
task_template_option_map = {
'_container_spec': {'spec_class': docker.types.ContainerSpec, 'required': True},
'_resources': {'spec_class': docker.types.Resources},
'_restart_policy': {'spec_class': docker.types.RestartPolicy},
'_placement': {'spec_class': docker.types.Placement},
}
node_spec_option_map = {
'availability': {'param': 'Availability'},
'name': {'param': 'Name'},
'role': {'param': 'Role'},
'labels': {'param': 'Labels'},
}
@staticmethod
def create_random_port_spec(port):
return {
'Protocol': 'tcp',
'PublishedPort': None,
'TargetPort': port,
}
#
# docker subcommands
#
def service_create(self, command, image=None, **kwopts):
# TODO: some of this should probably move to run_in_container when the CLI interface is removed
log.debug("Creating docker service with image '%s' for command: %s", image, command)
# insert run kwopts from config
for opt in self.conf_run_kwopts:
if self._conf[opt]:
kwopts[opt] = self._conf[opt]
# image is part of the container spec
kwopts['image'] = self._get_image(image)
# service constraints
kwopts['constraint'] = kwopts.get('constraint', [])
if self._conf.service_create_image_constraint:
kwopts['constraint'].append((IMAGE_CONSTRAINT + '==' + image))
if self._conf.service_create_cpus_constraint:
cpus = kwopts.get('reserve_cpus', kwopts.get('limit_cpus', '1'))
kwopts['constraint'].append((CPUS_CONSTRAINT + '==' + cpus))
# ports
if 'publish_port_random' in kwopts:
kwopts['ports'] = [DockerSwarmAPIInterface.create_random_port_spec(kwopts.pop('publish_port_random'))]
# create specs
service_mode = self._create_docker_api_spec('service_mode', docker.types.ServiceMode, kwopts)
endpoint_spec = self._create_docker_api_spec('endpoint_spec', docker.types.EndpointSpec, kwopts)
task_template = self._create_docker_api_spec('task_template', docker.types.TaskTemplate, kwopts)
self.set_kwopts_name(kwopts)
log.debug("Docker service task template:\n%s", safe_dumps_formatted(task_template))
log.debug("Docker service endpoint specification:\n%s", safe_dumps_formatted(endpoint_spec))
log.debug("Docker service mode:\n%s", safe_dumps_formatted(service_mode))
log.debug("Docker service creation parameters:\n%s", safe_dumps_formatted(kwopts))
success_test = partial(self._first, self.service_ls, name=kwopts['name'])
# this can raise exceptions, if necessary we could wrap them in a more generic "creation failed" exception class
service = self._client.create_service(
task_template,
mode=service_mode,
endpoint_spec=endpoint_spec,
success_test=success_test,
max_tries=5,
**kwopts)
service_id = service.get('ID')
log.debug('Created service: %s (%s)', kwopts['name'], service_id)
return DockerService.from_id(self, service_id)
def service_inspect(self, service_id):
return self._client.inspect_service(service_id)
def service_ls(self, id=None, name=None):
return self._client.services(filters=self._filter_by_id_or_name(id, name))
# roughly `docker service ps`
def service_ps(self, service_id):
return self.task_ls(filters={'service': service_id})
def service_rm(self, service_ids):
r = []
for service_id in service_ids:
self._client.remove_service(service_id)
r.append(service_id)
return r
def node_inspect(self, node_id):
return self._client.inspect_node(node_id)
def node_ls(self, id=None, name=None):
return self._client.nodes(filters=self._filter_by_id_or_name(id, name))
# roughly `docker node ps`
def node_ps(self, node_id):
return self.task_ls(filters={'node': node_id})
def node_update(self, node_id, **kwopts):
node = DockerNode.from_id(self, node_id)
spec = node.inspect['Spec']
if 'label_add' in kwopts:
kwopts['labels'] = spec.get('Labels', {})
kwopts['labels'].update(kwopts.pop('label_add'))
spec.update(self._create_docker_api_spec('node_spec', dict, kwopts))
return self._client.update_node(node.id, node.version, node_spec=spec)
def task_inspect(self, task_id):
return self._client.inspect_task(task_id)
def task_ls(self, filters=None):
return self._client.tasks(filters=filters)
......@@ -16,7 +16,6 @@ import os
from galaxy import util
from galaxy.util import plugin_config
from ..metrics import formatting
log = logging.getLogger(__name__)
......
......@@ -8,7 +8,6 @@ import sys
import tempfile
from galaxy import util
from ..collectl import stats
if sys.version_info > (3,):
......
"""The module describes the ``cgroup`` job metrics plugin."""
import logging
from galaxy.util import asbool, nice_size
from ..instrumenters import InstrumentPlugin
from ...metrics import formatting
log = logging.getLogger(__name__)
TITLES = {
"memory.memsw.max_usage_in_bytes": "Max memory usage (MEM+SWP)",
"memory.max_usage_in_bytes": "Max memory usage (MEM)",
"memory.limit_in_bytes": "Memory limit on cgroup (MEM)",
"memory.memsw.limit_in_bytes": "Memory limit on cgroup (MEM+SWP)",
"memory.soft_limit_in_bytes": "Memory softlimit on cgroup",
"memory.failcnt": "Failed to allocate memory count",
"memory.oom_control": "OOM Control enabled",
"under_oom": "Was OOM Killer active?",
"cpuacct.usage": "CPU Time"
}
CONVERSION = {
"memory.memsw.max_usage_in_bytes": nice_size,
"memory.max_usage_in_bytes": nice_size,
"memory.limit_in_bytes": nice_size,
"memory.memsw.limit_in_bytes": nice_size,
"memory.soft_limit_in_bytes": nice_size,
"under_oom": lambda x: "Yes" if x == "1" else "No",
"cpuacct.usage": lambda x: formatting.seconds_to_str(int(x) / 10**9) # convert nanoseconds
}
class CgroupPluginFormatter(formatting.JobMetricFormatter):
def format(self, key, value):
title = TITLES.get(key, key)
if key in CONVERSION:
return title, CONVERSION[key](value)
elif key.endswith("_bytes"):
try:
return title, nice_size(key)
except ValueError:
pass
return title, value
class CgroupPlugin(InstrumentPlugin):
""" Plugin that collects memory and cpu utilization from within a cgroup.
"""
plugin_type = "cgroup"
formatter = CgroupPluginFormatter()
def __init__(self, **kwargs):
self.verbose = asbool(kwargs.get("verbose", False))
def post_execute_instrument(self, job_directory):
commands = []
commands.append(self.__record_cgroup_cpu_usage(job_directory))
commands.append(self.__record_cgroup_memory_usage(job_directory))
return commands
def job_properties(self, job_id, job_directory):
metrics = self.__read_metrics(self.__cgroup_metrics_file(job_directory))
return metrics
def __record_cgroup_cpu_usage(self, job_directory):
return """if [ `command -v cgget` ] && [ -e /proc/$$/cgroup ]; then cat /proc/$$/cgroup | awk -F':' '$2=="cpuacct,cpu"{print $2":"$3}' | xargs -I{} cgget -g {} > %(metrics)s ; else echo "" > %(metrics)s; fi""" % {"metrics": self.__cgroup_metrics_file(job_directory)}
def __record_cgroup_memory_usage(self, job_directory):
return """if [ `command -v cgget` ] && [ -e /proc/$$/cgroup ]; then cat /proc/$$/cgroup | awk -F':' '$2=="memory"{print $2":"$3}' | xargs -I{} cgget -g {} >> %(metrics)s ; else echo "" > %(metrics)s; fi""" % {"metrics": self.__cgroup_metrics_file(job_directory)}
def __cgroup_metrics_file(self, job_directory):
return self._instrument_file_path(job_directory, "_metrics")
def __read_metrics(self, path):
metrics = {}
with open(path, "r") as infile:
for line in infile:
line = line.strip()
try:
key, value = line.split(": ")
if key in TITLES or self.verbose:
metrics[key] = value
except ValueError:
if line.startswith("under_oom"):
metrics["under_oom"] = line.split(" ")[1]
return metrics
__all__ = ('CgroupPlugin', )
......@@ -4,7 +4,6 @@ import os
import shutil
from galaxy import util
from ..collectl import (
cli,
processes,
......
......@@ -8,6 +8,7 @@ from ...metrics import formatting
log = logging.getLogger(__name__)
GALAXY_SLOTS_KEY = "galaxy_slots"
GALAXY_MEMORY_MB_KEY = "galaxy_memory_mb"
START_EPOCH_KEY = "start_epoch"
END_EPOCH_KEY = "end_epoch"
RUNTIME_SECONDS_KEY = "runtime_seconds"
......@@ -19,6 +20,8 @@ class CorePluginFormatter(formatting.JobMetricFormatter):
value = int(value)
if key == GALAXY_SLOTS_KEY:
return ("Cores Allocated", "%d" % value)
elif key == GALAXY_MEMORY_MB_KEY:
return ("Memory Allocated (MB)", "%d" % value)
elif key == RUNTIME_SECONDS_KEY:
return ("Job Runtime (Wall Clock)", formatting.seconds_to_str(value))
else:
......@@ -40,6 +43,7 @@ class CorePlugin(InstrumentPlugin):
def pre_execute_instrument(self, job_directory):
commands = []
commands.append(self.__record_galaxy_slots_command(job_directory))
commands.append(self.__record_galaxy_memory_mb_command(job_directory))
commands.append(self.__record_seconds_since_epoch_to_file(job_directory, "start"))
return commands
......@@ -50,9 +54,11 @@ class CorePlugin(InstrumentPlugin):
def job_properties(self, job_id, job_directory):
galaxy_slots_file = self.__galaxy_slots_file(job_directory)
galaxy_memory_mb_file = self.__galaxy_memory_mb_file(job_directory)
properties = {}
properties[GALAXY_SLOTS_KEY] = self.__read_integer(galaxy_slots_file)
properties[GALAXY_MEMORY_MB_KEY] = self.__read_integer(galaxy_memory_mb_file)
start = self.__read_seconds_since_epoch(job_directory, "start")
end = self.__read_seconds_since_epoch(job_directory, "end")
if start is not None and end is not None:
......@@ -65,6 +71,10 @@ class CorePlugin(InstrumentPlugin):
galaxy_slots_file = self.__galaxy_slots_file(job_directory)
return '''echo "$GALAXY_SLOTS" > '%s' ''' % galaxy_slots_file
def __record_galaxy_memory_mb_command(self, job_directory):
galaxy_memory_mb_file = self.__galaxy_memory_mb_file(job_directory)
return '''echo "$GALAXY_MEMORY_MB" > '%s' ''' % galaxy_memory_mb_file
def __record_seconds_since_epoch_to_file(self, job_directory, name):
path = self._instrument_file_path(job_directory, "epoch_%s" % name)
return 'date +"%s" > ' + path
......@@ -76,6 +86,9 @@ class CorePlugin(InstrumentPlugin):
def __galaxy_slots_file(self, job_directory):
return self._instrument_file_path(job_directory, "galaxy_slots")
def __galaxy_memory_mb_file(self, job_directory):
return self._instrument_file_path(job_directory, "galaxy_memory_mb")
def __read_integer(self, path):
value = None
try:
......
......@@ -3,7 +3,6 @@ import logging
import re
from galaxy import util
from ..instrumenters import InstrumentPlugin
from ...metrics import formatting
......
......@@ -3,7 +3,6 @@ import re
import sys
from galaxy import util
from ..instrumenters import InstrumentPlugin
from ...metrics import formatting
......
......@@ -10,7 +10,7 @@ import os
import random
import shutil
import threading
import time
from xml.etree import ElementTree
try:
......@@ -22,11 +22,13 @@ from galaxy.exceptions import ObjectInvalid, ObjectNotFound
from galaxy.util import (
directory_hash_id,
force_symlink,
safe_makedirs,
safe_relpath,
umask_fix_perms,
)
from galaxy.util.odict import odict
from galaxy.util.path import (
safe_makedirs,
safe_relpath,
)
from galaxy.util.sleeper import Sleeper
NO_SESSION_ERROR_MESSAGE = "Attempted to 'create' object store entity in configuration with no database session present."
......@@ -357,7 +359,7 @@ class DiskObjectStore(ObjectStore):
def empty(self, obj, **kwargs):
"""Override `ObjectStore`'s stub by checking file size on disk."""
return os.path.getsize(self.get_filename(obj, **kwargs)) == 0
return self.size(obj, **kwargs) == 0
def size(self, obj, **kwargs):
"""Override `ObjectStore`'s stub by return file size on disk.
......@@ -366,7 +368,14 @@ class DiskObjectStore(ObjectStore):
"""
if self.exists(obj, **kwargs):
try:
return os.path.getsize(self.get_filename(obj, **kwargs))
filepath = self.get_filename(obj, **kwargs)
for _ in range(0, 2):
size = os.path.getsize(filepath)
if size != 0:
break
# May be legitimately 0, or there may be an issue with the FS / kernel, so we try again
time.sleep(0.01)
return size
except OSError:
return 0
else:
......@@ -424,7 +433,9 @@ class DiskObjectStore(ObjectStore):
if preserve_symlinks and os.path.islink(file_name):
force_symlink(os.readlink(file_name), self.get_filename(obj, **kwargs))
else:
shutil.copy(file_name, self.get_filename(obj, **kwargs))
path = self.get_filename(obj, **kwargs)
shutil.copy(file_name, path)
umask_fix_perms(path, self.config.umask, 0o666)
except IOError as ex:
log.critical('Error copying %s to %s: %s' % (file_name, self._get_filename(obj, **kwargs), ex))
raise ex
......@@ -647,14 +658,15 @@ class DistributedObjectStore(NestedObjectStore):
return default
def __get_store_id_for(self, obj, **kwargs):
if obj.object_store_id is not None and obj.object_store_id in self.backends:
if obj.object_store_id is not None:
if obj.object_store_id in self.backends:
return obj.object_store_id
else:
log.warning('The backend object store ID (%s) for %s object with ID %s is invalid'
% (obj.object_store_id, obj.__class__.__name__, obj.id))
# if this instance has been switched from a non-distributed to a
# distributed object store, or if the object's store id is invalid,
# try to locate the object
log.warning('The backend object store ID (%s) for %s object with ID %s is invalid'
% (obj.object_store_id, obj.__class__.__name__, obj.id))
for id, store in self.backends.items():
if store.exists(obj, **kwargs):
log.warning('%s object with ID %s found in backend object store with ID %s'
......@@ -722,6 +734,9 @@ def build_object_store_from_config(config, fsmon=False, config_xml=None):
elif store == 's3':
from .s3 import S3ObjectStore
return S3ObjectStore(config=config, config_xml=config_xml)
elif store == 'cloud':
from .cloud import Cloud
return Cloud(config=config, config_xml=config_xml)
elif store == 'swift':
from .s3 import SwiftObjectStore
return SwiftObjectStore(config=config, config_xml=config_xml)
......