Skip to content
Commits on Source (6)
language: python
sudo: false
python: 2.7
env:
- TOX_ENV=py27-lint
......
......@@ -5,6 +5,26 @@ History
.. to_doc
---------------------
19.5.1 (2019-03-03)
---------------------
* Fix galaxy.tools.verify API method interface change breaking Planemo compatibility.
---------------------
19.5.0 (2019-03-01)
---------------------
* Synchronize with the development tip of Galaxy.
* Fix Travis CI configuration (thanks to @nsoranzo).
---------------------
18.9.2 (2019-01-02)
---------------------
* Implement option for uploading local test data.
* Bring in the latest Galaxy changes, which provide Python 3.7 support.
---------------------
18.9.1 (2018-09-15)
---------------------
......
......@@ -117,7 +117,7 @@ release-test-artifacts: dist
$(IN_VENV) twine upload -r test dist/*
open https://testpypi.python.org/pypi/$(PROJECT_NAME) || xdg-open https://testpypi.python.org/pypi/$(PROJECT_NAME)
release-aritfacts: release-test-artifacts
release-artifacts: release-test-artifacts
@while [ -z "$$CONTINUE" ]; do \
read -r -p "Have you executed release-test and reviewed results? [y/N]: " CONTINUE; \
done ; \
......@@ -131,7 +131,7 @@ commit-version:
new-version:
$(IN_VENV) python $(BUILD_SCRIPTS_DIR)/new_version.py $(SOURCE_DIR) $(VERSION) 2
release-local: commit-version release-aritfacts new-version
release-local: commit-version release-artifacts new-version
push-release:
git push $(UPSTREAM) master
......
......@@ -12,7 +12,7 @@
Overview
--------
A small subset of the Galaxy_ project for reuse outside the core. This subset has minimal dependencies and should be Python 3 compatible.
A small subset of the Galaxy_ project for reuse outside the core. This subset has minimal dependencies and is Python 3 compatible.
* Free software: Academic Free License version 3.0
* Documentation: https://galaxy-lib.readthedocs.org.
......
galaxy-lib (18.9.1-1) UNRELEASED; urgency=medium
galaxy-lib (19.5.1-1) UNRELEASED; urgency=medium
* Initial Debianization (Closes: #890061).
-- Michael R. Crusoe <michael.crusoe@gmail.com> Sat, 08 Dec 2018 21:20:38 +0100
-- Michael R. Crusoe <michael.crusoe@gmail.com> Sun, 10 Mar 2019 07:37:49 +0100
......@@ -5,7 +5,7 @@ Uploaders: Michael R. Crusoe <michael.crusoe@gmail.com>,
Section: python
Testsuite: autopkgtest-pkg-python
Priority: optional
Build-Depends: debhelper (>= 11~),
Build-Depends: debhelper (>= 12~),
dh-python,
python-all,
python-docutils,
......@@ -17,7 +17,7 @@ Build-Depends: debhelper (>= 11~),
python3-setuptools,
python3-six (>= 1.9.0),
python3-yaml
Standards-Version: 4.2.1
Standards-Version: 4.3.0
Vcs-Browser: https://salsa.debian.org/med-team/galaxy-lib
Vcs-Git: https://salsa.debian.org/med-team/galaxy-lib.git
Homepage: https://github.com/galaxyproject/galaxy-lib
......
# -*- coding: utf-8 -*-
__version__ = '18.9.1'
__version__ = '19.5.1'
PROJECT_NAME = "galaxy-lib"
PROJECT_OWNER = PROJECT_USERAME = "galaxyproject"
......
......@@ -22,7 +22,7 @@ from six import string_types, with_metaclass
from six.moves import shlex_quote
from galaxy.exceptions import ContainerCLIError
from galaxy.util.submodules import submodules
from galaxy.util.submodules import import_submodules
DEFAULT_CONTAINER_TYPE = 'docker'
......@@ -380,7 +380,7 @@ def parse_containers_config(containers_config_file):
def _get_interface_modules():
interfaces = []
modules = submodules(sys.modules[__name__])
modules = import_submodules(sys.modules[__name__])
for module in modules:
module_names = [getattr(module, _) for _ in dir(module)]
classes = [_ for _ in module_names if inspect.isclass(_) and
......
......@@ -276,12 +276,12 @@ class DockerAPIClient(object):
if tries > 1:
log.info('%s() succeeded on attempt %s', qualname, tries)
return r
except ConnectionError as exc:
except ConnectionError:
reinit = True
except docker.errors.APIError as exc:
if not DockerAPIClient._should_retry_request(exc.response.status_code):
raise
except ReadTimeout as exc:
except ReadTimeout:
reinit = True
retry_time = 0
finally:
......
......@@ -189,7 +189,7 @@ class DockerService(Container):
def from_cli(cls, interface, s, task_list):
service = cls(interface, s['ID'], name=s['NAME'], image=s['IMAGE'])
for task_dict in task_list:
if task_dict['NAME'].strip().startswith('\_'):
if task_dict['NAME'].strip().startswith(r'\_'):
continue # historical task
service.task_add(DockerTask.from_cli(interface, task_dict, service=service))
return service
......
......@@ -280,7 +280,7 @@ class DockerSwarmCLIInterface(DockerSwarmInterface, DockerCLIInterface):
def service_tasks(self, service):
for task_dict in self.service_ps(service.id):
if task_dict['NAME'].strip().startswith('\_'):
if task_dict['NAME'].strip().startswith(r'\_'):
continue # historical task
yield DockerTask.from_cli(self, task_dict, service=service)
......
......@@ -103,6 +103,10 @@ class ToolMissingException(MessageException):
status_code = 400
err_code = error_codes.USER_TOOL_MISSING_PROBLEM
def __init__(self, err_msg=None, type="info", tool_id=None, **extra_error_info):
super(ToolMissingException, self).__init__(err_msg, type, **extra_error_info)
self.tool_id = tool_id
class RequestParameterInvalidException(MessageException):
status_code = 400
......@@ -145,6 +149,11 @@ class AdminRequiredException(MessageException):
err_code = error_codes.ADMIN_REQUIRED
class UserActivationRequiredException(MessageException):
status_code = 403
err_code = error_codes.USER_ACTIVATION_REQUIRED
class ObjectNotFound(MessageException):
""" Accessed object was not found """
status_code = 404
......@@ -180,6 +189,15 @@ class InternalServerError(MessageException):
err_code = error_codes.INTERNAL_SERVER_ERROR
class ToolExecutionError(MessageException):
status_code = 500
err_code = error_codes.TOOL_EXECUTION_ERROR
def __init__(self, err_msg, type="error", job=None):
super(ToolExecutionError, self).__init__(err_msg, type)
self.job = job
class NotImplemented(MessageException):
status_code = 501
err_code = error_codes.NOT_IMPLEMENTED
......@@ -216,3 +234,9 @@ class ContainerRunError(Exception):
super(ContainerRunError, self).__init__(msg, **kwargs)
self.image = image
self.command = command
class HandlerAssignmentError(Exception):
def __init__(self, msg=None, obj=None, **kwargs):
super(HandlerAssignmentError, self).__init__(msg, **kwargs)
self.obj = obj
......@@ -27,7 +27,7 @@
{
"name": "USER_OBJECT_ATTRIBUTE_MISSING",
"code": 400005,
"message": "Attempted to create object without required attribute."
"message": "Attempted to create or update object without required attribute."
},
{
"name": "USER_SLUG_DUPLICATE",
......@@ -109,6 +109,11 @@
"code": 403006,
"message": "Action requires admin account."
},
{
"name": "USER_ACTIVATION_REQUIRED",
"code": 403007,
"message": "Action requires account activation."
},
{
"name": "USER_OBJECT_NOT_FOUND",
"code": 404001,
......@@ -139,6 +144,11 @@
"code": 500003,
"message": "Error in a configuration file."
},
{
"name": "TOOL_EXECUTION_ERROR",
"code": 500004,
"message": "Tool execution failed due to an internal server error."
},
{
"name": "NOT_IMPLEMENTED",
"code": 501001,
......
......@@ -48,7 +48,7 @@ class EnvPlugin(InstrumentPlugin):
# We use '\n\}\n' as regex termination because shell
# functions can be nested.
# We use the non-greedy '.+?' because of re.DOTALL .
m = re.match('([^=]+)=(\(\) \{.+?\n\})\n', env_string, re.DOTALL)
m = re.match(r'([^=]+)=(\(\) \{.+?\n\})\n', env_string, re.DOTALL)
if m is None:
m = re.match('([^=]+)=(.*)\n', env_string)
if m is None:
......
This diff is collapsed.
......@@ -38,21 +38,81 @@ NO_BLOBSERVICE_ERROR_MESSAGE = ("ObjectStore configured, but no azure.storage.bl
log = logging.getLogger(__name__)
def parse_config_xml(config_xml):
try:
auth_xml = config_xml.findall('auth')[0]
account_name = auth_xml.get('account_name')
account_key = auth_xml.get('account_key')
container_xml = config_xml.find('container')
container_name = container_xml.get('name')
max_chunk_size = int(container_xml.get('max_chunk_size', 250)) # currently unused
c_xml = config_xml.findall('cache')[0]
cache_size = float(c_xml.get('size', -1))
staging_path = c_xml.get('path', None)
tag, attrs = 'extra_dir', ('type', 'path')
extra_dirs = config_xml.findall(tag)
if not extra_dirs:
msg = 'No {tag} element in XML tree'.format(tag=tag)
log.error(msg)
raise Exception(msg)
extra_dirs = [dict(((k, e.get(k)) for k in attrs)) for e in extra_dirs]
return {
'auth': {
'account_name': account_name,
'account_key': account_key,
},
'container': {
'name': container_name,
'max_chunk_size': max_chunk_size,
},
'cache': {
'size': cache_size,
'path': staging_path,
},
'extra_dirs': extra_dirs,
}
except Exception:
# Toss it back up after logging, we can't continue loading at this point.
log.exception("Malformed ObjectStore Configuration XML -- unable to continue")
raise
class AzureBlobObjectStore(ObjectStore):
"""
Object store that stores objects as blobs in an Azure Blob Container. A local
cache exists that is used as an intermediate location for files between
Galaxy and Azure.
"""
store_type = 'azure_blob'
def __init__(self, config, config_dict):
super(AzureBlobObjectStore, self).__init__(config, config_dict)
self.transfer_progress = 0
auth_dict = config_dict["auth"]
container_dict = config_dict["container"]
cache_dict = config_dict["cache"]
self.account_name = auth_dict.get('account_name')
self.account_key = auth_dict.get('account_key')
def __init__(self, config, config_xml):
self.container_name = container_dict.get('name')
self.max_chunk_size = container_dict.get('max_chunk_size', 250) # currently unused
self.cache_size = cache_dict.get('size', -1)
self.staging_path = cache_dict.get('path') or self.config.object_store_cache_path
self._initialize()
def _initialize(self):
if BlockBlobService is None:
raise Exception(NO_BLOBSERVICE_ERROR_MESSAGE)
super(AzureBlobObjectStore, self).__init__(config)
self.staging_path = self.config.file_path
self.transfer_progress = 0
self._parse_config_xml(config_xml)
self._configure_connection()
# Clean cache only if value is set in galaxy.ini
......@@ -65,33 +125,32 @@ class AzureBlobObjectStore(ObjectStore):
self.cache_monitor_thread.start()
log.info("Cache cleaner manager started")
def to_dict(self):
as_dict = super(AzureBlobObjectStore, self).to_dict()
as_dict.update({
'auth': {
'account_name': self.account_name,
'account_key': self.account_key,
},
'container': {
'name': self.container_name,
'max_chunk_size': self.max_chunk_size,
},
'cache': {
'size': self.cache_size,
'path': self.staging_path,
}
})
return as_dict
###################
# Private Methods #
###################
# config_xml is an ElementTree object.
def _parse_config_xml(self, config_xml):
try:
auth_xml = config_xml.find('auth')
self.account_name = auth_xml.get('account_name')
self.account_key = auth_xml.get('account_key')
container_xml = config_xml.find('container')
self.container_name = container_xml.get('name')
self.max_chunk_size = int(container_xml.get('max_chunk_size', 250)) # currently unused
cache_xml = config_xml.find('cache')
self.cache_size = float(cache_xml.get('size', -1))
self.staging_path = cache_xml.get('path', self.config.object_store_cache_path)
for d_xml in config_xml.findall('extra_dir'):
self.extra_dirs[d_xml.get('type')] = d_xml.get('path')
log.debug("Object cache dir: %s", self.staging_path)
log.debug(" job work dir: %s", self.extra_dirs['job_work'])
except Exception:
# Toss it back up after logging, we can't continue loading at this point.
log.exception("Malformed ObjectStore Configuration XML -- unable to continue")
raise
@classmethod
def parse_xml(clazz, config_xml):
return parse_config_xml(config_xml)
def _configure_connection(self):
log.debug("Configuring Connection")
......
......@@ -15,14 +15,14 @@ from galaxy.exceptions import ObjectInvalid, ObjectNotFound
from galaxy.util import (
directory_hash_id,
safe_relpath,
string_as_bool,
umask_fix_perms,
)
from galaxy.util.sleeper import Sleeper
from .s3 import CloudConfigMixin, parse_config_xml
from ..objectstore import convert_bytes, ObjectStore
try:
from cloudbridge.cloud.factory import CloudProviderFactory, ProviderList
from cloudbridge.cloud.interfaces.exceptions import InvalidNameException
except ImportError:
CloudProviderFactory = None
ProviderList = None
......@@ -35,19 +35,45 @@ NO_CLOUDBRIDGE_ERROR_MESSAGE = (
)
class Cloud(ObjectStore):
class Cloud(ObjectStore, CloudConfigMixin):
"""
Object store that stores objects as items in an cloud storage. A local
cache exists that is used as an intermediate location for files between
Galaxy and the cloud storage.
"""
def __init__(self, config, config_xml):
super(Cloud, self).__init__(config)
store_type = 'cloud'
def __init__(self, config, config_dict):
super(Cloud, self).__init__(config, config_dict)
self.transfer_progress = 0
auth_dict = config_dict['auth']
bucket_dict = config_dict['bucket']
connection_dict = config_dict.get('connection', {})
cache_dict = config_dict['cache']
self.access_key = auth_dict.get('access_key')
self.secret_key = auth_dict.get('secret_key')
self.bucket = bucket_dict.get('name')
self.use_rr = bucket_dict.get('use_reduced_redundancy', False)
self.max_chunk_size = bucket_dict.get('max_chunk_size', 250)
self.host = connection_dict.get('host', None)
self.port = connection_dict.get('port', 6000)
self.multipart = connection_dict.get('multipart', True)
self.is_secure = connection_dict.get('is_secure', True)
self.conn_path = connection_dict.get('conn_path', '/')
self.cache_size = cache_dict.get('size', -1)
self.staging_path = cache_dict.get('path') or self.config.object_store_cache_path
self._initialize()
def _initialize(self):
if CloudProviderFactory is None:
raise Exception(NO_CLOUDBRIDGE_ERROR_MESSAGE)
self.staging_path = self.config.file_path
self.transfer_progress = 0
self._parse_config_xml(config_xml)
self._configure_connection()
self.bucket = self._get_bucket(self.bucket)
# Clean cache only if value is set in galaxy.ini
......@@ -72,38 +98,14 @@ class Cloud(ObjectStore):
'aws_secret_key': self.secret_key}
self.conn = CloudProviderFactory().create_provider(ProviderList.AWS, aws_config)
def _parse_config_xml(self, config_xml):
try:
a_xml = config_xml.findall('auth')[0]
self.access_key = a_xml.get('access_key')
self.secret_key = a_xml.get('secret_key')
b_xml = config_xml.findall('bucket')[0]
self.bucket = b_xml.get('name')
self.max_chunk_size = int(b_xml.get('max_chunk_size', 250))
cn_xml = config_xml.findall('connection')
if not cn_xml:
cn_xml = {}
else:
cn_xml = cn_xml[0]
self.host = cn_xml.get('host', None)
self.port = int(cn_xml.get('port', 6000))
self.multipart = string_as_bool(cn_xml.get('multipart', 'True'))
self.is_secure = string_as_bool(cn_xml.get('is_secure', 'True'))
self.conn_path = cn_xml.get('conn_path', '/')
c_xml = config_xml.findall('cache')[0]
self.cache_size = float(c_xml.get('size', -1))
self.staging_path = c_xml.get('path', self.config.object_store_cache_path)
for d_xml in config_xml.findall('extra_dir'):
self.extra_dirs[d_xml.get('type')] = d_xml.get('path')
log.debug("Object cache dir: %s", self.staging_path)
log.debug(" job work dir: %s", self.extra_dirs['job_work'])
@classmethod
def parse_xml(clazz, config_xml):
return parse_config_xml(config_xml)
except Exception:
# Toss it back up after logging, we can't continue loading at this point.
log.exception("Malformed ObjectStore Configuration XML -- unable to continue")
raise
def to_dict(self):
as_dict = super(Cloud, self).to_dict()
as_dict.update(self._config_to_dict())
return as_dict
def __cache_monitor(self):
time.sleep(2) # Wait for things to load before starting the monitor
......@@ -171,16 +173,19 @@ class Cloud(ObjectStore):
def _get_bucket(self, bucket_name):
try:
bucket = self.conn.object_store.get(bucket_name)
bucket = self.conn.storage.buckets.get(bucket_name)
if bucket is None:
log.debug("Bucket not found, creating a bucket with handle '%s'", bucket_name)
bucket = self.conn.object_store.create(bucket_name)
bucket = self.conn.storage.buckets.create(bucket_name)
log.debug("Using cloud ObjectStore with bucket '%s'", bucket.name)
return bucket
except InvalidNameException:
log.exception("Invalid bucket name -- unable to continue")
raise
except Exception:
# These two generic exceptions will be replaced by specific exceptions
# once proper exceptions are exposed by CloudBridge.
log.exception("Could not get bucket '%s'.", bucket_name)
log.exception("Could not get bucket '{}'".format(bucket_name))
raise Exception
def _fix_permissions(self, rel_path):
......@@ -239,7 +244,7 @@ class Cloud(ObjectStore):
def _get_size_in_cloud(self, rel_path):
try:
obj = self.bucket.get(rel_path)
obj = self.bucket.objects.get(rel_path)
if obj:
return obj.size
except Exception:
......@@ -252,13 +257,13 @@ class Cloud(ObjectStore):
# A hackish way of testing if the rel_path is a folder vs a file
is_dir = rel_path[-1] == '/'
if is_dir:
keyresult = self.bucket.list(prefix=rel_path)
keyresult = self.bucket.objects.list(prefix=rel_path)
if len(keyresult) > 0:
exists = True
else:
exists = False
else:
exists = True if self.bucket.get(rel_path) is not None else False
exists = True if self.bucket.objects.get(rel_path) is not None else False
except Exception:
log.exception("Trouble checking existence of S3 key '%s'", rel_path)
return False
......@@ -288,7 +293,7 @@ class Cloud(ObjectStore):
def _download(self, rel_path):
try:
log.debug("Pulling key '%s' into cache to %s", rel_path, self._get_cache_path(rel_path))
key = self.bucket.get(rel_path)
key = self.bucket.objects.get(rel_path)
# Test if cache is large enough to hold the new file
if self.cache_size > 0 and key.size > self.cache_size:
log.critical("File %s is larger (%s) than the cache size (%s). Cannot download.",
......@@ -322,27 +327,27 @@ class Cloud(ObjectStore):
try:
source_file = source_file if source_file else self._get_cache_path(rel_path)
if os.path.exists(source_file):
if os.path.getsize(source_file) == 0 and (self.bucket.get(rel_path) is not None):
if os.path.getsize(source_file) == 0 and (self.bucket.objects.get(rel_path) is not None):
log.debug("Wanted to push file '%s' to S3 key '%s' but its size is 0; skipping.", source_file,
rel_path)
return True
if from_string:
if not self.bucket.get(rel_path):
created_obj = self.bucket.create_object(rel_path)
if not self.bucket.objects.get(rel_path):
created_obj = self.bucket.objects.create(rel_path)
created_obj.upload(source_file)
else:
self.bucket.get(rel_path).upload(source_file)
self.bucket.objects.get(rel_path).upload(source_file)
log.debug("Pushed data from string '%s' to key '%s'", from_string, rel_path)
else:
start_time = datetime.now()
log.debug("Pushing cache file '%s' of size %s bytes to key '%s'", source_file,
os.path.getsize(source_file), rel_path)
self.transfer_progress = 0 # Reset transfer progress counter
if not self.bucket.get(rel_path):
created_obj = self.bucket.create_object(rel_path)
if not self.bucket.objects.get(rel_path):
created_obj = self.bucket.objects.create(rel_path)
created_obj.upload_from_file(source_file)
else:
self.bucket.get(rel_path).upload_from_file(source_file)
self.bucket.objects.get(rel_path).upload_from_file(source_file)
end_time = datetime.now()
log.debug("Pushed cache file '%s' to key '%s' (%s bytes transfered in %s sec)",
......@@ -468,7 +473,7 @@ class Cloud(ObjectStore):
# but requires iterating through each individual key in S3 and deleing it.
if entire_dir and extra_dir:
shutil.rmtree(self._get_cache_path(rel_path))
results = self.bucket.list(prefix=rel_path)
results = self.bucket.objects.list(prefix=rel_path)
for key in results:
log.debug("Deleting key %s", key.name)
key.delete()
......@@ -478,7 +483,7 @@ class Cloud(ObjectStore):
os.unlink(self._get_cache_path(rel_path))
# Delete from S3 as well
if self._key_exists(rel_path):
key = self.bucket.get(rel_path)
key = self.bucket.objects.get(rel_path)
log.debug("Deleting key %s", key.name)
key.delete()
return True
......@@ -566,7 +571,7 @@ class Cloud(ObjectStore):
if self.exists(obj, **kwargs):
rel_path = self._construct_path(obj, **kwargs)
try:
key = self.bucket.get(rel_path)
key = self.bucket.objects.get(rel_path)
return key.generate_url(expires_in=86400) # 24hrs
except Exception:
log.exception("Trouble generating URL for dataset '%s'", rel_path)
......
......@@ -84,25 +84,34 @@ class PithosObjectStore(ObjectStore):
Object store that stores objects as items in a Pithos+ container.
Cache is ignored for the time being.
"""
store_type = 'pithos'
def __init__(self, config, config_xml):
if KamakiClient is None:
raise Exception(NO_KAMAKI_ERROR_MESSAGE)
super(PithosObjectStore, self).__init__(config)
def __init__(self, config, config_dict):
super(PithosObjectStore, self).__init__(config, config_dict)
self.staging_path = self.config.file_path
self.transfer_progress = 0
log.info('Parse config_xml for pithos object store')
self.config_dict = parse_config_xml(config_xml)
self.config_dict = config_dict
log.debug(self.config_dict)
self._initialize()
def _initialize(self):
if KamakiClient is None:
raise Exception(NO_KAMAKI_ERROR_MESSAGE)
log.info('Authenticate Synnefo account')
self._authenticate()
log.info('Initialize Pithos+ client')
self._init_pithos()
log.info('Define extra_dirs')
self.extra_dirs = dict(
(e['type'], e['path']) for e in self.config_dict['extra_dirs'])
@classmethod
def parse_xml(clazz, config_xml):
return parse_config_xml(config_xml)
def to_dict(self):
as_dict = super(PithosObjectStore, self).to_dict()
as_dict.update(self.config_dict)
return as_dict
def _authenticate(self):
auth = self.config_dict['auth']
......
......@@ -38,20 +38,154 @@ log = logging.getLogger(__name__)
logging.getLogger('boto').setLevel(logging.INFO) # Otherwise boto is quite noisy
class S3ObjectStore(ObjectStore):
def parse_config_xml(config_xml):
try:
a_xml = config_xml.findall('auth')[0]
access_key = a_xml.get('access_key')
secret_key = a_xml.get('secret_key')
b_xml = config_xml.findall('bucket')[0]
bucket_name = b_xml.get('name')
use_rr = string_as_bool(b_xml.get('use_reduced_redundancy', "False"))
max_chunk_size = int(b_xml.get('max_chunk_size', 250))
cn_xml = config_xml.findall('connection')
if not cn_xml:
cn_xml = {}
else:
cn_xml = cn_xml[0]
host = cn_xml.get('host', None)
port = int(cn_xml.get('port', 6000))
multipart = string_as_bool(cn_xml.get('multipart', 'True'))
is_secure = string_as_bool(cn_xml.get('is_secure', 'True'))
conn_path = cn_xml.get('conn_path', '/')
c_xml = config_xml.findall('cache')[0]
cache_size = float(c_xml.get('size', -1))
staging_path = c_xml.get('path', None)
tag, attrs = 'extra_dir', ('type', 'path')
extra_dirs = config_xml.findall(tag)
if not extra_dirs:
msg = 'No {tag} element in XML tree'.format(tag=tag)
log.error(msg)
raise Exception(msg)
extra_dirs = [dict(((k, e.get(k)) for k in attrs)) for e in extra_dirs]
return {
'auth': {
'access_key': access_key,
'secret_key': secret_key,
},
'bucket': {
'name': bucket_name,
'use_reduced_redundancy': use_rr,
'max_chunk_size': max_chunk_size,
},
'connection': {
'host': host,
'port': port,
'multipart': multipart,
'is_secure': is_secure,
'conn_path': conn_path,
},
'cache': {
'size': cache_size,
'path': staging_path,
},
'extra_dirs': extra_dirs,
}
except Exception:
# Toss it back up after logging, we can't continue loading at this point.
log.exception("Malformed ObjectStore Configuration XML -- unable to continue")
raise
class CloudConfigMixin(object):
def _config_to_dict(self):
return {
'auth': {
'access_key': self.access_key,
'secret_key': self.secret_key,
},
'bucket': {
'name': self.bucket,
'use_reduced_redundancy': self.use_rr,
},
'connection': {
'host': self.host,
'port': self.port,
'multipart': self.multipart,
'is_secure': self.is_secure,
'conn_path': self.conn_path,
},
'cache': {
'size': self.cache_size,
'path': self.staging_path,
}
}
class S3ObjectStore(ObjectStore, CloudConfigMixin):
"""
Object store that stores objects as items in an AWS S3 bucket. A local
cache exists that is used as an intermediate location for files between
Galaxy and S3.
"""
store_type = 's3'
def __init__(self, config, config_xml):
if boto is None:
raise Exception(NO_BOTO_ERROR_MESSAGE)
def __init__(self, config, config_dict):
super(S3ObjectStore, self).__init__(config)
self.staging_path = self.config.file_path
self.transfer_progress = 0
self._parse_config_xml(config_xml)
auth_dict = config_dict['auth']
bucket_dict = config_dict['bucket']
connection_dict = config_dict.get('connection', {})
cache_dict = config_dict['cache']
self.access_key = auth_dict.get('access_key')
self.secret_key = auth_dict.get('secret_key')
self.bucket = bucket_dict.get('name')
self.use_rr = bucket_dict.get('use_reduced_redundancy', False)
self.max_chunk_size = bucket_dict.get('max_chunk_size', 250)
self.host = connection_dict.get('host', None)
self.port = connection_dict.get('port', 6000)
self.multipart = connection_dict.get('multipart', True)
self.is_secure = connection_dict.get('is_secure', True)
self.conn_path = connection_dict.get('conn_path', '/')
self.cache_size = cache_dict.get('size', -1)
self.staging_path = cache_dict.get('path') or self.config.object_store_cache_path
extra_dirs = dict(
(e['type'], e['path']) for e in config_dict.get('extra_dirs', []))
self.extra_dirs.update(extra_dirs)
log.debug("Object cache dir: %s", self.staging_path)
log.debug(" job work dir: %s", self.extra_dirs['job_work'])
self._initialize()
def _initialize(self):
if boto is None:
raise Exception(NO_BOTO_ERROR_MESSAGE)
# for multipart upload
self.s3server = {'access_key': self.access_key,
'secret_key': self.secret_key,
'is_secure': self.is_secure,
'max_chunk_size': self.max_chunk_size,
'host': self.host,
'port': self.port,
'use_rr': self.use_rr,
'conn_path': self.conn_path}
self._configure_connection()
self.bucket = self._get_bucket(self.bucket)
# Clean cache only if value is set in galaxy.ini
......@@ -73,48 +207,14 @@ class S3ObjectStore(ObjectStore):
log.debug("Configuring S3 Connection")
self.conn = S3Connection(self.access_key, self.secret_key)
def _parse_config_xml(self, config_xml):
try:
a_xml = config_xml.findall('auth')[0]
self.access_key = a_xml.get('access_key')
self.secret_key = a_xml.get('secret_key')
b_xml = config_xml.findall('bucket')[0]
self.bucket = b_xml.get('name')
self.use_rr = string_as_bool(b_xml.get('use_reduced_redundancy', "False"))
self.max_chunk_size = int(b_xml.get('max_chunk_size', 250))
cn_xml = config_xml.findall('connection')
if not cn_xml:
cn_xml = {}
else:
cn_xml = cn_xml[0]
self.host = cn_xml.get('host', None)
self.port = int(cn_xml.get('port', 6000))
self.multipart = string_as_bool(cn_xml.get('multipart', 'True'))
self.is_secure = string_as_bool(cn_xml.get('is_secure', 'True'))
self.conn_path = cn_xml.get('conn_path', '/')
c_xml = config_xml.findall('cache')[0]
self.cache_size = float(c_xml.get('size', -1))
self.staging_path = c_xml.get('path', self.config.object_store_cache_path)
for d_xml in config_xml.findall('extra_dir'):
self.extra_dirs[d_xml.get('type')] = d_xml.get('path')
log.debug("Object cache dir: %s", self.staging_path)
log.debug(" job work dir: %s", self.extra_dirs['job_work'])
@classmethod
def parse_xml(clazz, config_xml):
return parse_config_xml(config_xml)
# for multipart upload
self.s3server = {'access_key': self.access_key,
'secret_key': self.secret_key,
'is_secure': self.is_secure,
'max_chunk_size': self.max_chunk_size,
'host': self.host,
'port': self.port,
'use_rr': self.use_rr,
'conn_path': self.conn_path}
except Exception:
# Toss it back up after logging, we can't continue loading at this point.
log.exception("Malformed ObjectStore Configuration XML -- unable to continue")
raise
def to_dict(self):
as_dict = super(S3ObjectStore, self).to_dict()
as_dict.update(self._config_to_dict())
return as_dict
def __cache_monitor(self):
time.sleep(2) # Wait for things to load before starting the monitor
......@@ -623,6 +723,7 @@ class SwiftObjectStore(S3ObjectStore):
cache exists that is used as an intermediate location for files between
Galaxy and Swift.
"""
store_type = 'swift'
def _configure_connection(self):
log.debug("Configuring Swift Connection")
......