Skip to content
Commits on Source (2)
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
# Coveralls
.coveralls.yml
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.cache
nosetests.xml
coverage.xml
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# PyCharm
.idea
# Vim
.viminfo
# Less history
.lesshst
.dbshell
.emacs*
.ipython
.mongo*
#*.properties
......@@ -4,6 +4,7 @@ python:
- "2.7"
- "3.4"
- "3.5"
- "3.6"
services:
- mongodb
- elasticsearch
......@@ -16,6 +17,7 @@ branches:
# - "sudo apt-get update -qq"
# - "sudo apt-get install -qq libldap2-dev libsasl2-dev"
install:
- "pip install flake8"
- "pip install -r requirements.txt"
- "pip install coverage"
- "pip install python-coveralls"
......@@ -26,7 +28,9 @@ before_script:
- sleep 10
#script: nosetests --with-coverage --cover-package=biomaj -a '!network'
#script: nosetests --with-coverage --cover-package=biomaj
script: nosetests
script:
- python setup.py test
- flake8 --ignore E501,E123 biomaj
#after_success:
# - coveralls
3.1.6:
Fix #100 Catch error and log error if biomaj fails to connect to InfluxDB
Add history to update/remove operations
Add log in case of file deletion error during bank removal
check lock file exists when removing it
Update protobuf to work with biomaj.download 3.0.18
3.1.5:
Fix #97 Wrong offline dir checks
3.1.4:
Fix #88 Unset 'last_update_session' when found in pending sessions using --remove-pending
Add formats in bank info request
Add checks for some production fields before display
Add irods download support
3.1.3:
Remove post-install step for automatic upgrades, not supported by wheel package
3.1.2:
Fix #86 remove special character from README.md
Feature #85 SchemaVersion automatically add new property
3.1.1:
Fix #80 Check process exists with `--from-task` and `--process`
Manage old banks with no status
3.1.0:
## Needs database upgrade
If using biomaj-watcher, must use version >= 3.1.0
Feature #67,#66,#61 switch to micro service architecture. Still works in local monolithic install
Fix some configuration parameter loading when not defined in config
Fix HTTP parsing parameters loading
Fix download_or_copy to copy files in last production release if available instead of downloading files again
Manage user migration for micro services
Feature #74 add influxdb statistics
Feature #65 add a release info file at the root of the bank which can be used by other services to know the latest release available
Feature #25 experimental support of rsync protocol
Add rate limiting for download with micro services
Limit email size to 2Mb, log file may be truncated
3.0.20:
Fix #55: Added support for https and directhttps
Add possibility to define files to download from a local file with remote.list parameter
Fix visibility modification (bug deleted the bank properties field)
Fix #65 Add release file in bank dir after update
Add md5 or sha256 checksum checks if files are downloaded and available
3.0.19:
Fix missing README.md in package
Fix #53 avoid duplicates in pending databases
3.0.18:
Add migration method to update schema when needed
Manage HTTP month format to support text format (Jan, Feb, ...) and int format (01, 02, ...)
New optional bank property http.parse.file.date.format to extract date in HTTP protocol following python date regexp format (http://www.tutorialspoint.com/python/time_strptime.htm)
Example: %d-%b-%Y %H:%M
3.0.17:
Fix #47: save_as error with directhttp protocol
Fix #45: error with pending releases when release has dots in value
typo/pylint fixes
3.0.16:
Do not use config values, trust database values #39
Fix #42: Add optional release.separator to name the bank directory bankname_release (underscore as default)
3.0.15:
Fix #37: remote local files history from db and put it in cache.dir
Feature #38: add optional keep.old.sessions parameter to keep all sessions in database, even for removed releases
Feature #28: add optional release.format parameter to specify the date format of a release
3.0.14:
Fix in method set_owner
Force release to be a str
......@@ -29,15 +78,18 @@
Remove logs on some operations
Add --status-ko option to list bank in error state
Fix #36 manage workflows over by error or unfinished
3.0.13:
Fix #27: Thread lock issue during download
New optional attribute in bank properties: timeout.download
HTTP protocol fix (deepcopy error)
3.0.12:
Fix index deletion on bank removal
Fix lock errors on dir creation for multi-threads,
pre-create directroy structure in offline directory
Fix #26: save error when too many files in bank
3.0.11:
Fix in session management with pre and rm processes
Fix #23: Check workflow step name passed to
......@@ -45,6 +97,7 @@
Fix #24: deprecated delete_by_query method in elasticsearch
Add some controls on base directories
3.0.10:
Change dir to process.dir to find processes in subdirs
If all files found in offline dir, continue workflow with no download
......@@ -70,27 +123,31 @@
the fix prevents using Ctrl-C during download
Workflow fix:
if subtask of workflow fails, fail main task
osallou authored 14 hours ago
3.0.8:
do not test index if elasticsearch is not up
minor fixes
add http proxy support
pylint fixes
retry uncompress once in case of failure (#13)
3.0.7:
Reindent code, pep8 fixes
Various fixes on var names and OrderedDict suport for Python < 2.7
Merge config files to be able to reference global.properties variables in bank
property file in format %(xx)s
Use ConfigParser instead of SafeConfigParser that will be deprecated
3.0.6:
Add option --remove-pending to remove all pending sessions and directories
Add process env variables logdir and logfile
Fix Unicode issue with old versions of PyCurl.
3.0.5:
Fix removal workflow during an update workflow, removedrelease was current
release.
Fix shebang of biomaj-cli, and python 2/3 compat issue
3.0.4:
Update code to make it Python 3 compatible
Use ldap3 library (pure Python and p2,3 compatible) instead of python-ldap
......
BioMAJ3
=====
This project is a complete rewrite of BioMAJ (http://biomaj.genouest.org).
This project is a complete rewrite of BioMAJ and the documentation is available here : http://biomaj.genouest.org.
BioMAJ (BIOlogie Mise A Jour) is a workflow engine dedicated to data
synchronization and processing. The Software automates the update cycle and the
supervision of the locally mirrored databank repository.
Common usages are to download remote databanks (Genbank for example) and apply
some transformations (blast indexing, emboss indexing,...). Any script can be
some transformations (blast indexing, emboss indexing, etc.). Any script can be
applied on downloaded data. When all treatments are successfully applied, bank
is put in "production" on a dedicated release directory.
With cron tasks, update tasks can be executed at regular interval, data are
......@@ -16,6 +16,8 @@ downloaded again only if a change is detected.
More documentation is available in wiki page.
BioMAJ is python 2 and 3 compatible.
Getting started
===============
......@@ -30,15 +32,24 @@ Edit global.properties file to match your settings. Minimal conf are database co
Migration
=========
To migrate from previous BioMAJ, a script is available at:
To migrate from previous BioMAJ 1.x, a script is available at:
https://github.com/genouest/biomaj-migrate. Script will import old database to
the new database, and update configuration files to the modified format. Data directory is the same.
Migration for 3.0 to 3.1:
Biomaj 3.1 provides an optional micro service architecture, allowing to separate and distributute/scale biomaj components on one or many hosts. This implementation is optional but recommended for server installations. Monolithic installation can be kept for local computer installation.
To upgrade an existing 3.0 installation, as biomaj code has been split into multiple components, it is necessary to install/update biomaj python package but also biomaj-cli and biomaj-daemon packages. Then database must be upgraded manually (see Upgrading in documentation).
To execute database migration:
python biomaj_migrate_database.py
Application Features
====================
* Synchronisation:
* Multiple remote protocols (ftp, sftp, http, local copy, ....)
* Multiple remote protocols (ftp, sftp, http, local copy, etc.)
* Data transfers integrity check
* Release versioning using a incremental approach
* Multi threading
......@@ -48,8 +59,7 @@ Application Features
* Pre &Post processing :
* Advanced workflow description (D.A.G)
* Post-process indexation for various bioinformatics software (blast, srs,
fastacmd, readseq, etc…)
* Post-process indexation for various bioinformatics software (blast, srs, fastacmd, readseq, etc.)
* Easy integration of personal scripts for bank post-processing automation
......@@ -57,9 +67,18 @@ Application Features
* Optional Administration web interface (biomaj-watcher)
* CLI management
* Mail alerts for the update cycle supervision
* Prometheus and Influxdb optional integration
* Optional consul supervision of processes
* Scalability:
* Monolithic (local install) or microservice architecture (remote access to a BioMAJ server)
* Microservice installation allows per process scalability and supervision (number of process in charge of download, execution, etc.)
* Remote access:
* Optional FTP server providing authenticated or anonymous data access
Dependencies
============
......@@ -75,12 +94,9 @@ Database:
Indexing (optional):
* elasticsearch (global property, use_elastic=1)
ElasticSearch indexing add advanced search features to biomaj to find bank
having files with specific format etc...
ElasticSearch indexing adds advanced search features to biomaj to find bank having files with specific format or type.
Configuration of ElasticSearch is not in the scope of BioMAJ documentation.
For a basic installation, one instance of ElasticSearch is enough (low volume of
data), in such a case, the ElasticSearch configuration file should be modified
accordingly:
For a basic installation, one instance of ElasticSearch is enough (low volume of data), in such a case, the ElasticSearch configuration file should be modified accordingly:
node.name: "biomaj" (or any other name)
index.number_of_shards: 1
......@@ -89,10 +105,16 @@ accordingly:
Installation
============
From source:
After dependencies installation, go in BioMAJ source directory:
python setup.py install
From packages:
pip install biomaj biomaj-cli biomaj-daemon
You should consider using a Python virtual environment (virtualenv) to install BioMAJ.
......@@ -101,6 +123,7 @@ installation.
The tools/process contains example process files (python and shell).
Docker
======
......@@ -149,6 +172,21 @@ Execute unit tests but disable ones needing network access
nosetests -a '!network'
Monitoring
==========
InfluxDB (optional) can be used to monitor biomaj. Following series are available:
* biomaj.banks.quantity (number of banks)
* biomaj.production.size.total (size of all production directories)
* biomaj.workflow.duration (workflow duration)
* biomaj.production.size.latest (size of latest update)
* biomaj.bank.update.downloaded_files (number of downloaded files)
* biomaj.bank.update.new (track updates)
*WARNING* Influxdb database must be created, biomaj does not create the database (see https://docs.influxdata.com/influxdb/v1.6/query_language/database_management/#create-database)
License
=======
......@@ -166,8 +204,7 @@ To delete elasticsearch index:
Credits
======
Special thanks for tuco at Pasteur Institute for the intensive testing and new
ideas....
Special thanks for tuco at Pasteur Institute for the intensive testing and new ideas.
Thanks to the old BioMAJ team for the work they have done.
BioMAJ is developped at IRISA research institute.
This diff is collapsed.
This diff is collapsed.
from builtins import str
from builtins import object
import logging
import copy
from elasticsearch import Elasticsearch
class BmajIndex(object):
"""
ElasticSearch indexation and search
"""
"""
ElasticSearch server
"""
es = None
"""
Index name
"""
index = 'biomaj'
"""
Do indexing
"""
do_index = False
"""
Skip if failure (tests)
"""
skip_if_failure = False
@staticmethod
def load(hosts=None, index='biomaj', do_index=True):
"""
Initialize index
:param hosts: List of elastic search nodes to connect to
:type hosts: list
:param do_index: index data or not
:type do_index: bool
"""
if hosts is None:
hosts = ['localhost']
if not do_index:
return
BmajIndex.index = index
BmajIndex.do_index = do_index
if BmajIndex.es is None:
BmajIndex.es = Elasticsearch(hosts)
mapping = {
"mappings": {
"production": {
"date_detection": False
},
"releasestats": {
"date_detection": False,
"_timestamp" : {
"enabled" : True,
"store" : True
}
}
}
}
try:
if not BmajIndex.es.indices.exists(index=BmajIndex.index):
BmajIndex.es.indices.create(index=BmajIndex.index, body=mapping)
except Exception as e:
logging.error('ElasticSearch connection error, check server is running and configuration')
if BmajIndex.skip_if_failure:
BmajIndex.do_index = False
else:
raise e
@staticmethod
def _bulk_delete(query, flush=True):
try:
page = BmajIndex.es.search(index=BmajIndex.index,
doc_type='production',
search_type = "query_then_fetch",
size=1000,
body= {'query': {'match': {'bank': query['bank']}}})
if page is None:
return
bulk_delete = ''
for del_hit in page['hits']['hits']:
if ('release' in query and query['release'] == del_hit['_source']['release']) or 'release' not in query:
bulk_delete += "{ \"delete\" : {\"_index\":\""+BmajIndex.index+"\",\"_type\":\"production\", \"_id\" : \""+del_hit['_id']+"\" } }\n"
if bulk_delete:
BmajIndex.es.bulk(body=bulk_delete)
if flush:
BmajIndex.es.indices.flush(index=BmajIndex.index, force=True)
except Exception as e:
if BmajIndex.skip_if_failure:
BmajIndex.do_index = False
else:
raise e
@staticmethod
def delete_all_bank(bank_name):
"""
Delete complete index for a bank
"""
if not BmajIndex.do_index:
return
BmajIndex._bulk_delete({"bank" : bank_name}, True)
"""
query = {
"query" : {
"term" : {"bank" : bank_name}
}
}
try:
BmajIndex.es.delete_by_query(index=BmajIndex.index, body=query)
except Exception as e:
if BmajIndex.skip_if_failure:
BmajIndex.do_index = False
else:
raise e
"""
@staticmethod
def remove(bank_name, release):
"""
Remove a production release
:param bank_name: Name of the bank
:type bank_name: str
:param release: production release
:type release: str
"""
if not BmajIndex.do_index:
return
BmajIndex._bulk_delete({"release" : release, "bank": bank_name})
"""
try:
query = {
"query" : {
"term" : {"release" : release, "bank": bank_name}
}
}
BmajIndex.es.delete_by_query(index=BmajIndex.index, body=query)
except Exception as e:
logging.error('Index:Remove:'+bank_name+'_'+str(release)+':Exception:'+str(e))
if BmajIndex.skip_if_failure:
BmajIndex.do_index = False
"""
@staticmethod
def search(query):
if not BmajIndex.do_index:
return None
res = BmajIndex.es.search(index=BmajIndex.index,
doc_type='production',
search_type = "query_then_fetch",
body=query)
return res['hits']['hits']
@staticmethod
def searchq(query, size=1000):
"""
Lucene syntax search
:param query: Lucene search string
:type query: str
:param size: number of results
:type size: int
:return: list of matches
"""
if not BmajIndex.do_index:
return None
res = BmajIndex.es.search(index=BmajIndex.index, doc_type='production', q=query, size=size)
return res['hits']['hits']
@staticmethod
def add_stat(stat_id, stat):
"""
Add some statistics, must contain release and bank properties.
"""
if not BmajIndex.do_index:
return
if stat['release'] is None or stat['bank'] is None:
return False
#stat['bank'] = bank_name
try:
BmajIndex.es.index(index=BmajIndex.index, doc_type='releasestats', id=stat_id, body=stat)
except Exception:
if BmajIndex.skip_if_failure:
BmajIndex.do_index = False
else:
return False
return True
@staticmethod
def add(bank_name, prod, flush=False):
"""
Index a production release
:param bank_name: Name of the bank
:type bank_name: str
:param prod: session release object
:type prod: dict
:param flush: Force flushing
:type flush: bool
"""
if not BmajIndex.do_index:
return
obj = copy.deepcopy(prod)
if obj['release'] is None:
return
obj['bank'] = bank_name
formats = obj['formats']
try:
for fkey, fvalue in formats.items():
for elt in fvalue:
elt['format'] = fkey
elt['bank'] = bank_name
elt['release'] = obj['release']
if 'status' in obj:
elt['status'] = obj['status']
res = BmajIndex.es.index(index=BmajIndex.index, doc_type='production', body=elt)
if flush:
BmajIndex.es.indices.flush(index=BmajIndex.index, force=True)
except Exception as e:
logging.error('Index:Add:'+bank_name+'_'+str(obj['release'])+':Exception:'+str(e))
if BmajIndex.skip_if_failure:
BmajIndex.do_index = False
from __future__ import print_function
from future import standard_library
standard_library.install_aliases()
from builtins import str
from builtins import object
import logging
import logging.config
import os
import time
import sys
from biomaj.bmajindex import BmajIndex
if sys.version < '3':
import ConfigParser as configparser
else:
import configparser
class BiomajConfig(object):
"""
Manage Biomaj configuration
"""
DEFAULTS = {
'http.parse.dir.line': r'<img[\s]+src="[\S]+"[\s]+alt="\[DIR\]"[\s]*/?>[\s]*<a[\s]+href="([\S]+)/"[\s]*>.*([\d]{2}-[\w\d]{2,5}-[\d]{4}\s[\d]{2}:[\d]{2})',
'http.parse.file.line': r'<img[\s]+src="[\S]+"[\s]+alt="\[[\s]+\]"[\s]*/?>[\s]<a[\s]+href="([\S]+)".*([\d]{2}-[\w\d]{2,5}-[\d]{4}\s[\d]{2}:[\d]{2})[\s]+([\d\.]+[MKG]{0,1})',
'http.group.dir.name': 1,
'http.group.dir.date': 2,
'http.group.file.name': 1,
'http.group.file.date': 2,
'http.group.file.size': 3,
'visibility.default': 'public',
'historic.logfile.level': 'INFO',
'bank.num.threads': 2,
'files.num.threads': 4,
'use_elastic': 0,
'use_drmaa': 0,
'db.type': '',
'db.formats': '',
'keep.old.version': 1,
'docker.sudo': '1',
'auto_publish': 0
}
# Old biomaj level compatibility
LOGLEVEL = {
'DEBUG': logging.DEBUG,
'VERBOSE': logging.INFO,
'INFO': logging.INFO,
'WARN': logging.WARNING,
'ERR': logging.ERROR
}
"""
Global configuration file
"""
global_config = None
"""
Per use global configuration file, overriding global_config
"""
user_config = None
@staticmethod
def load_config(config_file=None, allow_user_config=True):
"""
Loads general config
:param config_file: global.properties file path
:type config_file: str
:param allow_user_config: use ~/.biomaj.cfg if present
:type allow_user_config: bool
"""
if config_file is None:
env_file = os.environ.get('BIOMAJ_CONF')
if env_file is not None and os.path.exists(env_file):
config_file = env_file
else:
env_file = 'global.properties'
if os.path.exists(env_file):
config_file = env_file
if config_file is None or not os.path.exists(config_file):
raise Exception('Missing global configuration file')
BiomajConfig.config_file = os.path.abspath(config_file)
BiomajConfig.global_config = configparser.ConfigParser()
if allow_user_config and os.path.exists(os.path.expanduser('~/.biomaj.cfg')):
BiomajConfig.user_config_file = os.path.expanduser('~/.biomaj.cfg')
BiomajConfig.user_config = configparser.ConfigParser()
BiomajConfig.user_config.read([os.path.expanduser('~/.biomaj.cfg')])
else:
BiomajConfig.user_config_file = None
BiomajConfig.global_config.read([config_file])
# ElasticSearch indexation support
do_index = False
if BiomajConfig.global_config.get('GENERAL', 'use_elastic') and \
BiomajConfig.global_config.get('GENERAL', 'use_elastic') == "1":
do_index = True
if do_index:
if BiomajConfig.global_config.get('GENERAL', 'elastic_nodes'):
elastic_hosts = BiomajConfig.global_config.get('GENERAL', 'elastic_nodes').split(',')
else:
elastic_hosts = ['localhost']
elastic_index = BiomajConfig.global_config.get('GENERAL', 'elastic_index')
if elastic_index is None:
elastic_index = 'biomaj'
if BiomajConfig.global_config.has_option('GENERAL', 'test') and \
BiomajConfig.global_config.get('GENERAL', 'test') == "1":
# Test connection to elasticsearch, if not available skip indexing for tests
BmajIndex.skip_if_failure = True
BmajIndex.load(index=elastic_index, hosts=elastic_hosts,
do_index=do_index)
def __init__(self, bank, options=None):
"""
Loads bank configuration
:param bank: bank name
:type bank: str
:param options: bank options
:type options: argparse
"""
self.name = bank
if BiomajConfig.global_config is None:
BiomajConfig.load_config()
self.config_bank = configparser.ConfigParser()
conf_dir = BiomajConfig.global_config.get('GENERAL', 'conf.dir')
if not os.path.exists(os.path.join(conf_dir, bank+'.properties')):
logging.error('Bank configuration file does not exists')
raise Exception('Configuration file '+bank+'.properties does not exists')
try:
config_files = [BiomajConfig.config_file]
if BiomajConfig.user_config_file is not None:
config_files.append(BiomajConfig.user_config_file)
config_files.append(os.path.join(conf_dir, bank+'.properties'))
self.config_bank.read(config_files)
except Exception as e:
print("Configuration file error: "+str(e))
logging.error("Configuration file error "+str(e))
sys.exit(1)
self.last_modified = int(os.stat(os.path.join(conf_dir, bank+'.properties')).st_mtime)
if os.path.exists(os.path.expanduser('~/.biomaj.cfg')):
logging.config.fileConfig(os.path.expanduser('~/.biomaj.cfg'))
else:
logging.config.fileConfig(BiomajConfig.config_file)
do_log = False
if options is None:
do_log = True
elif hasattr(options, 'no_log') and not options.no_log:
do_log = True
elif type(options) is dict and 'no_log' in options and not options['no_log']:
do_log = True
#if options is None or (( hasattr(options,'no_log') and not options.no_log) or ('no_log' in options and not options['no_log'])):
if do_log:
logger = logging.getLogger()
bank_log_dir = os.path.join(self.get('log.dir'), bank, str(time.time()))
if not os.path.exists(bank_log_dir):
os.makedirs(bank_log_dir)
hdlr = logging.FileHandler(os.path.join(bank_log_dir, bank+'.log'))
self.log_file = os.path.join(bank_log_dir, bank+'.log')
if options is not None and options.get_option('log') is not None:
hdlr.setLevel(BiomajConfig.LOGLEVEL[options.get_option('log')])
else:
hdlr.setLevel(BiomajConfig.LOGLEVEL[self.get('historic.logfile.level')])
formatter = logging.Formatter('%(asctime)s %(levelname)-5.5s [%(name)s][%(threadName)s] %(message)s')
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
else:
self.log_file = 'none'
cache_dir = self.get('cache.dir')
if cache_dir is None:
print("Configuration file error: cache.dir empty")
logging.error("cache.dir is not defined")
sys.exit(1)
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)
process_dir = self.get('process.dir')
if process_dir is None:
print("Configuration file error: process.dir empty")
logging.error("process.dir is not defined")
sys.exit(1)
if not os.path.exists(process_dir):
os.makedirs(process_dir)
data_dir = self.get('data.dir')
if data_dir is None:
print("Configuration file error: data.dir empty")
logging.error("data.dir is not defined")
sys.exit(1)
if not os.path.exists(data_dir):
os.makedirs(data_dir)
lock_dir = self.get('lock.dir')
if lock_dir is None:
print("Configuration file error: lock.dir empty")
logging.error("lock.dir is not defined")
sys.exit(1)
if not os.path.exists(lock_dir):
os.makedirs(lock_dir)
def set(self, prop, value, section='GENERAL'):
self.config_bank.set(section, prop, value)
def get_bool(self, prop, section='GENERAL', escape=True, default=None):
"""
Get a boolean property from bank or general configration. Optionally in section.
"""
value = self.get(prop, section, escape, default)
if value is None:
return False
if value is True or value == 'true' or value == '1':
return True
else:
return False
def get(self, prop, section='GENERAL', escape=True, default=None):
"""
Get a property from bank or general configration. Optionally in section.
"""
# Compatibility fields
if prop == 'depends':
depend = self.get('db.source', section, escape, None)
if depend:
return depend
if self.config_bank.has_option(section, prop):
val = self.config_bank.get(section, prop)
if prop == 'remote.dir' and not val.endswith('/'):
val = val + '/'
# If regexp, escape backslashes
if escape and (prop == 'local.files' or prop == 'remote.files' or prop == 'http.parse.dir.line' or prop == 'http.parse.file.line'):
val = val.replace('\\\\', '\\')
return val
if BiomajConfig.user_config is not None:
if BiomajConfig.user_config.has_option(section, prop):
return BiomajConfig.user_config.get(section, prop)
if BiomajConfig.global_config.has_option(section, prop):
return BiomajConfig.global_config.get(section, prop)
if prop in BiomajConfig.DEFAULTS:
return BiomajConfig.DEFAULTS[prop]
return default
def get_time(self):
"""
Return last modification time of config files
"""
return self.last_modified
def check(self):
"""
Check configuration
"""
self.set('localrelease', '')
self.set('remoterelease', '')
status = True
if not self.get('data.dir'):
logging.error('data.dir is not set')
status = False
if not self.get('conf.dir'):
logging.error('conf.dir is not set')
status = False
if not self.get('log.dir'):
logging.error('log.dir is not set')
status = False
if not self.get('process.dir'):
logging.error('process.dir is not set')
status = False
if not self.get('lock.dir'):
logging.error('lock.dir is not set')
status = False
if not self.get('cache.dir'):
logging.error('cache.dir is not set')
status = False
if not self.get('db.fullname'):
logging.warn('db.fullname is not set')
if not self.get('db.formats'):
logging.warn('db.formats is not set')
if self.get('use_ldap'):
if not self.get('ldap.host') or not self.get('ldap.port') or not self.get('ldap.dn'):
logging.error('use_ldap set to 1 but missing configuration')
status = False
if self.get('use_elastic'):
if not self.get('elastic_nodes') or not self.get('elastic_index'):
logging.error('use_elastic set to 1 but missing configuration')
status = False
if not self.get('celery.queue') or not self.get('celery.broker'):
logging.warn('celery config is not set, that\'s fine if you do not use Celery for background tasks')
if not self.get('mail.smtp.host'):
logging.error('SMTP mail config not set, you will not be able to send emails')
status = False
if self.get('mail.smtp.host') and not self.get('mail.from'):
logging.error('Mail origin mail.from not set')
status = False
if not self.get('offline.dir.name'):
logging.error('offline.dir.name is not set')
status = False
elif self.get('offline.dir.name').startswith('/'):
logging.error('offline dir must be relative to data.dir and should not start with a /')
status = False
if not self.get('dir.version'):
logging.error('dir.version is not set')
status = False
elif self.get('dir.version').startswith('/'):
logging.error('dir.version must be relative to data.dir and should not start with a /')
status = False
if not self.get('protocol'):
logging.error('protocol is not set')
status = False
else:
protocol = self.get('protocol')
allowed_protocols = ['none', 'multi', 'local', 'ftp', 'sftp', 'http', 'https', 'directftp', 'directhttp', 'directhttps']
if protocol not in allowed_protocols:
logging.error('Protocol not supported: '+protocol)
status = False
if protocol not in ['multi','none']:
if protocol != 'local' and not self.get('server'):
logging.error('server not set')
status = False
if not self.get('remote.dir'):
logging.error('remote.dir not set')
status = False
elif not self.get('remote.dir').endswith('/'):
logging.error('remote.dir must end with a /')
return False
if protocol not in ['direcftp', 'directhttp', 'directhttps'] and not self.get('remote.files') and not self.get('remote.list'):
logging.error('remote.files not set')
status = False
if not self.get('local.files'):
logging.error('local.files is not set')
status = False
# Remove processes
processes = ['db.remove.process', 'db.pre.process']
for process in processes:
if self.get(process):
metas = self.get(process).split(',')
for meta in metas:
if not self.get(meta):
logging.error('Metaprocess ' + meta + ' not defined')
status = False
else:
procs = self.get(meta).split(',')
for proc in procs:
if not self.get(proc+'.name'):
logging.error('Process '+proc+' not defined')
status = False
else:
if not self.get(proc+'.exe'):
logging.error('Process exe for '+proc+' not defined')
status = False
# Check blocks
if self.get('BLOCKS'):
blocks = self.get('BLOCKS').split(',')
for block in blocks:
if not self.get(block+'.db.post.process'):
logging.error('Block '+block+' not defined')
status = False
else:
metas = self.get(block+'.db.post.process').split(',')
for meta in metas:
if not self.get(meta):
logging.error('Metaprocess ' + meta + ' not defined')
status = False
else:
procs = self.get(meta).split(',')
for proc in procs:
if not self.get(proc+'.name'):
logging.error('Process '+proc+' not defined')
status = False
else:
if not self.get(proc+'.exe'):
logging.error('Process exe for '+proc+' not defined')
status = False
return status
from future import standard_library
standard_library.install_aliases()
from builtins import str
import datetime
import logging
import pycurl
import io
import os
import re
import urllib.request, urllib.parse, urllib.error
import hashlib
from biomaj.download.interface import DownloadInterface
from biomaj.download.ftp import FTPDownload
from biomaj.utils import Utils
try:
from io import BytesIO
except ImportError:
from StringIO import StringIO as BytesIO
class MultiDownload(DownloadInterface):
'''
Base interface for a downloader using multiple downloaders
'''
def __init__(self):
DownloadInterface.__init__(self)
self.downloaders = []
self.files_to_download = []
def add_downloaders(self, downloaders):
'''
Adds a list of downloaders
'''
self.downloaders += downloaders
for d in downloaders:
self.files_to_download += d.files_to_download
def match(self, patterns, file_list, dir_list=None, prefix='', submatch=False):
if dir_list is None:
dir_list = []
self.files_to_download = []
for d in self.downloaders:
d.match(patterns, d.files_to_download, [], prefix, submatch)
self.files_to_download = []
for d in self.downloaders:
self.files_to_download += d.files_to_download
def download(self, local_dir):
self.files_to_download = []
for d in self.downloaders:
if self.kill_received:
raise Exception('Kill request received, exiting')
d.download(local_dir)
self.files_to_download = []
for d in self.downloaders:
self.files_to_download += d.files_to_download
return (self.files_to_download, [])
def list(self):
self.files_to_download = []
for d in self.downloaders:
d.list()
self.files_to_download = []
for d in self.downloaders:
self.files_to_download += d.files_to_download
return (self.files_to_download, [])
def close(self):
for d in self.downloaders:
d.close()
class DirectFTPDownload(FTPDownload):
'''
download a list of files from FTP, no regexp
'''
def __init__(self, protocol, host, rootdir='', file_list=None):
'''
Initialize the files in list with today as last-modification date.
Size is also preset to zero, size will be set after download
:param file_list: list of files to download on server
:type file_list: list
'''
FTPDownload.__init__(self, protocol, host, rootdir)
self.save_as = None
if file_list is None:
file_list = []
today = datetime.date.today()
self.files_to_download = []
self.headers = {}
for file in file_list:
rfile = {}
rfile['root'] = self.rootdir
rfile['permissions'] = ''
rfile['group'] = ''
rfile['user'] = ''
rfile['size'] = 0
rfile['month'] = today.month
rfile['day'] = today.day
rfile['year'] = today.year
rfile['name'] = file
rfile['hash'] = None
self.files_to_download.append(rfile)
def list(self, directory=''):
'''
FTP protocol does not give us the possibility to get file date from remote url
'''
for rfile in self.files_to_download:
if self.save_as is None:
self.save_as = rfile['name']
rfile['save_as'] = self.save_as
return (self.files_to_download, [])
def match(self, patterns, file_list, dir_list=None, prefix='', submatch=False):
'''
All files to download match, no pattern
'''
if dir_list is None:
dir_list = []
self.files_to_download = file_list
class DirectHttpDownload(DirectFTPDownload):
def __init__(self, protocol, host, rootdir='', file_list=None):
'''
:param file_list: list of files to download on server
:type file_list: list
'''
if file_list is None:
file_list = []
DirectFTPDownload.__init__(self, protocol, host, rootdir, file_list)
self.save_as = None
self.method = 'GET'
self.param = {}
def download(self, local_dir, keep_dirs=True):
'''
Download remote files to local_dir
:param local_dir: Directory where files should be downloaded
:type local_dir: str
:param keep_dirs: keep file name directory structure or copy file in local_dir directly
:param keep_dirs: bool
:return: list of downloaded files
'''
logging.debug('DirectHTTP:Download')
nb_files = len(self.files_to_download)
if nb_files > 1:
self.files_to_download = []
logging.error('DirectHTTP accepts only 1 file')
cur_files = 1
for rfile in self.files_to_download:
if self.kill_received:
raise Exception('Kill request received, exiting')
if self.save_as is None:
self.save_as = rfile['name']
file_dir = local_dir
if keep_dirs:
file_dir = local_dir + os.path.dirname(self.save_as)
file_path = file_dir + '/' + os.path.basename(self.save_as)
# For unit tests only, workflow will take in charge directory creation before to avoid thread multi access
if not os.path.exists(file_dir):
os.makedirs(file_dir)
'''
self.mkdir_lock.acquire()
try:
if not os.path.exists(file_dir):
os.makedirs(file_dir)
except Exception as e:
logging.error(e)
finally:
self.mkdir_lock.release() # release lock, no matter what
'''
logging.debug('DirectHTTP:Download:Progress'+str(cur_files)+'/'+str(nb_files)+' downloading file '+rfile['name']+', save as '+self.save_as)
cur_files += 1
if not 'url' in rfile:
rfile['url'] = self.url
fp = open(file_path, "wb")
curl = pycurl.Curl()
if self.proxy is not None:
curl.setopt(pycurl.PROXY, self.proxy)
if self.proxy_auth is not None:
curl.setopt(pycurl.PROXYUSERPWD, self.proxy_auth)
if self.method == 'POST':
# Form data must be provided already urlencoded.
postfields = urllib.parse.urlencode(self.param)
# Sets request method to POST,
# Content-Type header to application/x-www-form-urlencoded
# and data to send in request body.
if self.credentials is not None:
curl.setopt(pycurl.USERPWD, self.credentials)
curl.setopt(pycurl.POSTFIELDS, postfields)
try:
curl.setopt(pycurl.URL, rfile['url']+rfile['root']+'/'+rfile['name'])
except Exception as a:
curl.setopt(pycurl.URL, (rfile['url']+rfile['root']+'/'+rfile['name']).encode('ascii', 'ignore'))
#curl.setopt(pycurl.URL, rfile['url']+rfile['root']+'/'+rfile['name'])
else:
url = rfile['url']+rfile['root']+'/'+rfile['name']+'?'+urllib.parse.urlencode(self.param)
#curl.setopt(pycurl.URL, url)
try:
curl.setopt(pycurl.URL, url)
except Exception as a:
curl.setopt(pycurl.URL, url.encode('ascii', 'ignore'))
curl.setopt(pycurl.WRITEDATA, fp)
curl.perform()
curl.close()
fp.close()
logging.debug('downloaded!')
rfile['name'] = self.save_as
self.set_permissions(file_path, rfile)
self.set_progress(1, nb_files)
return self.files_to_download
def header_function(self, header_line):
# HTTP standard specifies that headers are encoded in iso-8859-1.
# On Python 2, decoding step can be skipped.
# On Python 3, decoding step is required.
header_line = header_line.decode('iso-8859-1')
# Header lines include the first status line (HTTP/1.x ...).
# We are going to ignore all lines that don't have a colon in them.
# This will botch headers that are split on multiple lines...
if ':' not in header_line:
return
# Break the header line into header name and value.
name, value = header_line.split(':', 1)
# Remove whitespace that may be present.
# Header lines include the trailing newline, and there may be whitespace
# around the colon.
name = name.strip()
value = value.strip()
# Header names are case insensitive.
# Lowercase name here.
name = name.lower()
# Now we can actually record the header name and value.
self.headers[name] = value
def list(self, directory=''):
'''
Try to get file headers to get last_modification and size
'''
for rfile in self.files_to_download:
if self.save_as is None:
self.save_as = rfile['name']
rfile['save_as'] = self.save_as
self.crl.setopt(pycurl.HEADER, True)
if self.credentials is not None:
self.crl.setopt(pycurl.USERPWD, self.credentials)
if self.proxy is not None:
self.crl.setopt(pycurl.PROXY, self.proxy)
if self.proxy_auth is not None:
self.crl.setopt(pycurl.PROXYUSERPWD, self.proxy_auth)
self.crl.setopt(pycurl.NOBODY, True)
try:
self.crl.setopt(pycurl.URL, self.url+self.rootdir+rfile['name'])
except Exception as a:
self.crl.setopt(pycurl.URL, (self.url+self.rootdir+rfile['name']).encode('ascii', 'ignore'))
#self.crl.setopt(pycurl.URL, self.url+self.rootdir+file['name'])
output = BytesIO()
# lets assign this buffer to pycurl object
self.crl.setopt(pycurl.WRITEFUNCTION, output.write)
self.crl.setopt(pycurl.HEADERFUNCTION, self.header_function)
self.crl.perform()
# Figure out what encoding was sent with the response, if any.
# Check against lowercased header name.
encoding = None
if 'content-type' in self.headers:
content_type = self.headers['content-type'].lower()
match = re.search('charset=(\S+)', content_type)
if match:
encoding = match.group(1)
if encoding is None:
# Default encoding for HTML is iso-8859-1.
# Other content types may have different default encoding,
# or in case of binary data, may have no encoding at all.
encoding = 'iso-8859-1'
# lets get the output in a string
result = output.getvalue().decode(encoding)
lines = re.split(r'[\n\r]+', result)
for line in lines:
parts = line.split(':')
if parts[0].strip() == 'Content-Length':
rfile['size'] = parts[1].strip()
if parts[0].strip() == 'Last-Modified':
# Sun, 06 Nov 1994
res = re.match('(\w+),\s+(\d+)\s+(\w+)\s+(\d+)', parts[1].strip())
if res:
rfile['hash'] = hashlib.md5(str(res.group(0)).encode('utf-8')).hexdigest()
rfile['day'] = res.group(2)
rfile['month'] = Utils.month_to_num(res.group(3))
rfile['year'] = res.group(4)
continue
#Sunday, 06-Nov-94
res = re.match('(\w+),\s+(\d+)-(\w+)-(\d+)', parts[1].strip())
if res:
rfile['hash'] = hashlib.md5(str(res.group(0)).encode('utf-8')).hexdigest()
rfile['day'] = res.group(2)
rfile['month'] = Utils.month_to_num(res.group(3))
rfile['year'] = str(2000 + int(res.group(4)))
continue
#Sun Nov 6 08:49:37 1994
res = re.match('(\w+)\s+(\w+)\s+(\d+)\s+\d{2}:\d{2}:\d{2}\s+(\d+)', parts[1].strip())
if res:
rfile['hash'] = hashlib.md5(str(res.group(0)).encode('utf-8')).hexdigest()
rfile['day'] = res.group(3)
rfile['month'] = Utils.month_to_num(res.group(2))
rfile['year'] = res.group(4)
continue
return (self.files_to_download, [])
from builtins import str
from builtins import range
#import os
import logging
#import datetime
#import time
#import re
import threading
import copy
#import tarfile
#import zipfile
import traceback
class DownloadThread(threading.Thread):
NB_THREAD = 2
@staticmethod
def get_threads(downloader, local_dir):
'''
Creates a list of thread for download
:param downloader: downloader to use
:type downloader: :class:`biomaj.download.interface.DownloadInterface`
:param local_dir: directory where files should be downloaded
:type local_dir: str
:return: list of threads
'''
threads = []
# Creates threads with copies of the downloader
download_config = downloader.config
for i in range(0, DownloadThread.NB_THREAD):
downloader.config = None
new_download = copy.deepcopy(downloader)
new_download.config = download_config
new_download.files_to_download = []
th = DownloadThread(new_download, local_dir)
threads.append(th)
# Now dispatch the files to download to the threads
thread_id = 0
for dfile in downloader.files_to_download:
if thread_id == DownloadThread.NB_THREAD:
thread_id = 0
threads[thread_id].downloader.files_to_download.append(dfile)
thread_id += 1
return threads
@staticmethod
def get_threads_multi(downloaders, local_dir):
'''
Dispatch multiple downloaders on threads
:param downloaders: downlaoders to dispatch in threads
:type downloaders: list of :class:`biomaj.download.interface.DownloadInterface`
:param local_dir: directory where files should be downloaded
:type local_dir: str
:return: list of threads
'''
threads = []
# Creates threads with copies of the downloader
thread_id = 0
for downloader in downloaders:
if thread_id == DownloadThread.NB_THREAD:
thread_id = 0
th = DownloadThread(downloader, local_dir)
threads.append(th)
thread_id += 1
return threads
def __init__(self, downloader, local_dir):
'''
Download thread to download a list of files
:param downloader: downloader to use
:type downloader: :class:`biomaj.download.interface.DownloadInterface`
:param local_dir: directory to download files
:type local_dir: str
'''
threading.Thread.__init__(self)
self.downloader = downloader
self.downloader.mkdir_lock = DownloadThread.MKDIR_LOCK
self.downloader.kill_received = False
self.local_dir = local_dir
self.error = False
self._stopevent = threading.Event()
def run(self):
logging.info('Start download thread')
if self.downloader is None:
return True
self.error = False
try:
self.downloader.download(self.local_dir)
self.downloader.close()
except Exception as e:
logging.error('Error in download execution of thread: '+str(e))
logging.debug(traceback.format_exc())
self.error = True
def stop(self):
self._stopevent.set()
DownloadThread.MKDIR_LOCK = threading.Lock()
from future import standard_library
standard_library.install_aliases()
from builtins import str
import logging
import pycurl
import io
import re
import os
from datetime import datetime
import hashlib
from biomaj.utils import Utils
from biomaj.download.interface import DownloadInterface
try:
from io import BytesIO
except ImportError:
from StringIO import StringIO as BytesIO
class FTPDownload(DownloadInterface):
'''
Base class to download files from FTP
protocol=ftp
server=ftp.ncbi.nih.gov
remote.dir=/blast/db/FASTA/
remote.files=^alu.*\\.gz$
'''
def __init__(self, protocol, host, rootdir):
DownloadInterface.__init__(self)
logging.debug('Download')
self.crl = pycurl.Curl()
url = protocol+'://'+host
self.rootdir = rootdir
self.url = url
self.headers = {}
def match(self, patterns, file_list, dir_list=None, prefix='', submatch=False):
'''
Find files matching patterns. Sets instance variable files_to_download.
:param patterns: regexps to match
:type patterns: list
:param file_list: list of files to match
:type file_list: list
:param dir_list: sub directories in current dir
:type dir_list: list
:param prefix: directory prefix
:type prefix: str
:param submatch: first call to match, or called from match
:type submatch: bool
'''
logging.debug('Download:File:RegExp:'+str(patterns))
if dir_list is None:
dir_list = []
if not submatch:
self.files_to_download = []
for pattern in patterns:
subdirs_pattern = pattern.split('/')
if len(subdirs_pattern) > 1:
# Pattern contains sub directories
subdir = subdirs_pattern[0]
if subdir == '^':
subdirs_pattern = subdirs_pattern[1:]
subdir = subdirs_pattern[0]
for direlt in dir_list:
subdir = direlt['name']
logging.debug('Download:File:Subdir:Check:'+subdir)
if pattern == '**/*':
(subfile_list, subdirs_list) = self.list(prefix+'/'+subdir+'/')
self.match([pattern], subfile_list, subdirs_list, prefix+'/'+subdir, True)
for rfile in file_list:
if pattern == '**/*' or re.match(pattern, rfile['name']):
rfile['root'] = self.rootdir
if prefix != '':
rfile['name'] = prefix + '/' +rfile['name']
self.files_to_download.append(rfile)
logging.debug('Download:File:MatchRegExp:'+rfile['name'])
else:
if re.match(subdirs_pattern[0], subdir):
logging.debug('Download:File:Subdir:Match:'+subdir)
# subdir match the beginning of the pattern
# check match in subdir
(subfile_list, subdirs_list) = self.list(prefix+'/'+subdir+'/')
self.match(['/'.join(subdirs_pattern[1:])], subfile_list, subdirs_list, prefix+'/'+subdir, True)
else:
for rfile in file_list:
if re.match(pattern, rfile['name']):
rfile['root'] = self.rootdir
if prefix != '':
rfile['name'] = prefix + '/' +rfile['name']
self.files_to_download.append(rfile)
logging.debug('Download:File:MatchRegExp:'+rfile['name'])
if not submatch and len(self.files_to_download) == 0:
raise Exception('no file found matching expressions')
def curl_download(self, file_path, file_to_download):
error = True
nbtry = 1
while(error==True and nbtry<3):
fp = open(file_path, "wb")
curl = pycurl.Curl()
try:
curl.setopt(pycurl.URL, file_to_download)
except Exception as a:
curl.setopt(pycurl.URL, file_to_download.encode('ascii', 'ignore'))
if self.proxy is not None:
curl.setopt(pycurl.PROXY, self.proxy)
if self.proxy_auth is not None:
curl.setopt(pycurl.PROXYUSERPWD, self.proxy_auth)
if self.credentials is not None:
curl.setopt(pycurl.USERPWD, self.credentials)
curl.setopt(pycurl.CONNECTTIMEOUT, 300)
# Download should not take more than 5minutes
curl.setopt(pycurl.TIMEOUT, self.timeout)
curl.setopt(pycurl.NOSIGNAL, 1)
curl.setopt(pycurl.WRITEDATA, fp)
try:
curl.perform()
errcode = curl.getinfo(pycurl.HTTP_CODE)
if int(errcode) != 226 and int(errcode) != 200:
error = True
logging.error('Error while downloading '+file_to_download+' - '+str(errcode))
else:
error = False
except Exception as e:
logging.error('Could not get errcode:' + str(e))
nbtry += 1
curl.close()
fp.close()
return error
def download(self, local_dir, keep_dirs=True):
'''
Download remote files to local_dir
:param local_dir: Directory where files should be downloaded
:type local_dir: str
:param keep_dirs: keep file name directory structure or copy file in local_dir directly
:param keep_dirs: bool
:return: list of downloaded files
'''
logging.debug('FTP:Download')
nb_files = len(self.files_to_download)
cur_files = 1
for rfile in self.files_to_download:
if self.kill_received:
raise Exception('Kill request received, exiting')
file_dir = local_dir
if 'save_as' not in rfile or rfile['save_as'] is None:
rfile['save_as'] = rfile['name']
if keep_dirs:
file_dir = local_dir + '/' + os.path.dirname(rfile['save_as'])
file_path = file_dir + '/' + os.path.basename(rfile['save_as'])
# For unit tests only, workflow will take in charge directory creation before to avoid thread multi access
if not os.path.exists(file_dir):
os.makedirs(file_dir)
'''
self.mkdir_lock.acquire()
try:
if not os.path.exists(file_dir):
os.makedirs(file_dir)
except Exception as e:
logging.error(e)
finally:
self.mkdir_lock.release() # release lock, no matter what
'''
logging.debug('FTP:Download:Progress:'+str(cur_files)+'/'+str(nb_files)+' downloading file '+rfile['name'])
logging.debug('FTP:Download:Progress:'+str(cur_files)+'/'+str(nb_files)+' save as '+rfile['save_as'])
cur_files += 1
if not 'url' in rfile:
rfile['url'] = self.url
error = self.curl_download(file_path, rfile['url']+rfile['root']+'/'+rfile['name'])
if error:
raise Exception("FTP:Download:Error:"+rfile['url']+rfile['root']+'/'+rfile['name'])
#logging.debug('downloaded!')
self.set_permissions(file_path, rfile)
# Add progress only per 10 files to limit db requests
if nb_files < 10:
nb = 1
do_progress = True
else:
if cur_files == nb_files:
do_progress = True
nb = cur_files % 10
elif cur_files > 0 and cur_files % 10 == 0:
nb = 10
do_progress = True
else:
do_progress = False
if do_progress:
self.set_progress(nb, nb_files)
return self.files_to_download
def header_function(self, header_line):
# HTTP standard specifies that headers are encoded in iso-8859-1.
# On Python 2, decoding step can be skipped.
# On Python 3, decoding step is required.
header_line = header_line.decode('iso-8859-1')
# Header lines include the first status line (HTTP/1.x ...).
# We are going to ignore all lines that don't have a colon in them.
# This will botch headers that are split on multiple lines...
if ':' not in header_line:
return
# Break the header line into header name and value.
name, value = header_line.split(':', 1)
# Remove whitespace that may be present.
# Header lines include the trailing newline, and there may be whitespace
# around the colon.
name = name.strip()
value = value.strip()
# Header names are case insensitive.
# Lowercase name here.
name = name.lower()
# Now we can actually record the header name and value.
self.headers[name] = value
def list(self, directory=''):
'''
List FTP directory
:return: tuple of file and dirs in current directory with details
'''
logging.debug('Download:List:'+self.url+self.rootdir+directory)
#self.crl.setopt(pycurl.URL, self.url+self.rootdir+directory)
try:
self.crl.setopt(pycurl.URL, self.url+self.rootdir+directory)
except Exception as a:
self.crl.setopt(pycurl.URL, (self.url+self.rootdir+directory).encode('ascii', 'ignore'))
if self.proxy is not None:
self.crl.setopt(pycurl.PROXY, self.proxy)
if self.proxy_auth is not None:
self.crl.setopt(pycurl.PROXYUSERPWD, self.proxy_auth)
if self.credentials is not None:
self.crl.setopt(pycurl.USERPWD, self.credentials)
output = BytesIO()
# lets assign this buffer to pycurl object
self.crl.setopt(pycurl.WRITEFUNCTION, output.write)
self.crl.setopt(pycurl.HEADERFUNCTION, self.header_function)
self.crl.setopt(pycurl.CONNECTTIMEOUT, 300)
# Download should not take more than 5minutes
self.crl.setopt(pycurl.TIMEOUT, self.timeout)
self.crl.setopt(pycurl.NOSIGNAL, 1)
try:
self.crl.perform()
except Exception as e:
logging.error('Could not get errcode:' + str(e))
# Figure out what encoding was sent with the response, if any.
# Check against lowercased header name.
encoding = None
if 'content-type' in self.headers:
content_type = self.headers['content-type'].lower()
match = re.search('charset=(\S+)', content_type)
if match:
encoding = match.group(1)
if encoding is None:
# Default encoding for HTML is iso-8859-1.
# Other content types may have different default encoding,
# or in case of binary data, may have no encoding at all.
encoding = 'iso-8859-1'
# lets get the output in a string
result = output.getvalue().decode(encoding)
# FTP LIST output is separated by \r\n
# lets split the output in lines
#lines = result.split(r'[\r\n]+')
lines = re.split(r'[\n\r]+', result)
# lets walk through each line
rfiles = []
rdirs = []
for line in lines:
rfile = {}
# lets print each part separately
parts = line.split()
# the individual fields in this list of parts
if not parts: continue
rfile['permissions'] = parts[0]
rfile['group'] = parts[2]
rfile['user'] = parts[3]
rfile['size'] = parts[4]
rfile['month'] = Utils.month_to_num(parts[5])
rfile['day'] = parts[6]
rfile['hash'] = hashlib.md5(line.encode('utf-8')).hexdigest()
try:
rfile['year'] = int(parts[7])
except Exception as e:
# specific ftp case issues at getting date info
curdate = datetime.now()
rfile['year'] = curdate.year
# Year not precised, month feater than current means previous year
if rfile['month'] > curdate.month:
rfile['year'] = curdate.year - 1
# Same month but later day => previous year
if rfile['month'] == curdate.month and int(rfile['day']) > curdate.day:
rfile['year'] = curdate.year - 1
rfile['name'] = parts[8]
if len(parts) >= 10 and parts[9] == '->':
# Symlink, add to files AND dirs as we don't know the type of the link
rdirs.append(rfile)
is_dir = False
if re.match('^d', rfile['permissions']):
is_dir = True
if not is_dir:
rfiles.append(rfile)
else:
rdirs.append(rfile)
return (rfiles, rdirs)
def chroot(self, cwd):
logging.debug('Download: change dir '+cwd)
def close(self):
if self.crl is not None:
self.crl.close()
self.crl = None
from future import standard_library
standard_library.install_aliases()
import logging
import pycurl
import io
import re
import os
import hashlib
import datetime
from biomaj.utils import Utils
from biomaj.download.ftp import FTPDownload
try:
from io import BytesIO
except ImportError:
from StringIO import StringIO as BytesIO
class HTTPDownload(FTPDownload):
'''
Base class to download files from HTTP
Makes use of http.parse.dir.line etc.. regexps to extract page information
protocol=http
server=ftp.ncbi.nih.gov
remote.dir=/blast/db/FASTA/
remote.files=^alu.*\\.gz$
'''
def __init__(self, protocol, host, rootdir, config):
FTPDownload.__init__(self, protocol, host, rootdir)
self.config = config
def list(self, directory=''):
'''
List FTP directory
:return: tuple of file and dirs in current directory with details
'''
logging.debug('Download:List:'+self.url+self.rootdir+directory)
#self.crl.setopt(pycurl.URL, self.url+self.rootdir+directory)
try:
self.crl.setopt(pycurl.URL, self.url+self.rootdir+directory)
except Exception as a:
self.crl.setopt(pycurl.URL, (self.url+self.rootdir+directory).encode('ascii', 'ignore'))
if self.proxy is not None:
self.crl.setopt(pycurl.PROXY, self.proxy)
if self.proxy_auth is not None:
self.crl.setopt(pycurl.PROXYUSERPWD, self.proxy_auth)
if self.credentials is not None:
self.crl.setopt(pycurl.USERPWD, self.credentials)
output = BytesIO()
# lets assign this buffer to pycurl object
self.crl.setopt(pycurl.WRITEFUNCTION, output.write)
self.crl.setopt(pycurl.HEADERFUNCTION, self.header_function)
self.crl.perform()
# Figure out what encoding was sent with the response, if any.
# Check against lowercased header name.
encoding = None
if 'content-type' in self.headers:
content_type = self.headers['content-type'].lower()
match = re.search('charset=(\S+)', content_type)
if match:
encoding = match.group(1)
if encoding is None:
# Default encoding for HTML is iso-8859-1.
# Other content types may have different default encoding,
# or in case of binary data, may have no encoding at all.
encoding = 'iso-8859-1'
# lets get the output in a string
result = output.getvalue().decode(encoding)
'''
'http.parse.dir.line': r'<a[\s]+href="([\S]+)/".*alt="\[DIR\]">.*([\d]{2}-[\w\d]{2,5}-[\d]{4}\s[\d]{2}:[\d]{2})',
'http.parse.file.line': r'<a[\s]+href="([\S]+)".*([\d]{2}-[\w\d]{2,5}-[\d]{4}\s[\d]{2}:[\d]{2})[\s]+([\d\.]+[MKG]{0,1})',
'http.group.dir.name': 1,
'http.group.dir.date': 2,
'http.group.file.name': 1,
'http.group.file.date': 2,
'http.group.file.size': 3,
'''
rfiles = []
rdirs = []
dirs = re.findall(self.config.get('http.parse.dir.line'), result)
if dirs is not None and len(dirs) > 0:
for founddir in dirs:
rfile = {}
rfile['permissions'] = ''
rfile['group'] = ''
rfile['user'] = ''
rfile['size'] = '0'
date = founddir[int(self.config.get('http.group.dir.date'))-1]
dirdate = date.split()
parts = dirdate[0].split('-')
#19-Jul-2014 13:02
rfile['month'] = Utils.month_to_num(parts[1])
rfile['day'] = parts[0]
rfile['year'] = parts[2]
rfile['name'] = founddir[int(self.config.get('http.group.dir.name'))-1]
rdirs.append(rfile)
files = re.findall(self.config.get('http.parse.file.line'), result)
if files is not None and len(files)>0:
for foundfile in files:
rfile = {}
rfile['permissions'] = ''
rfile['group'] = ''
rfile['user'] = ''
rfile['size'] = foundfile[int(self.config.get('http.group.file.size'))-1]
date = foundfile[int(self.config.get('http.group.file.date'))-1]
if self.config.get('http.parse.file.date.format'):
date_object = datetime.datetime.strptime(date, self.config.get('http.parse.file.date.format').replace('%%', '%'))
rfile['month'] = date_object.month
rfile['day'] = date_object.day
rfile['year'] = date_object.year
else:
dirdate = date.split()
parts = dirdate[0].split('-')
#19-Jul-2014 13:02
rfile['month'] = Utils.month_to_num(parts[1])
rfile['day'] = parts[0]
rfile['year'] = parts[2]
rfile['name'] = foundfile[int(self.config.get('http.group.file.name'))-1]
filehash = (rfile['name']+str(date)+str(rfile['size'])).encode('utf-8')
rfile['hash'] = hashlib.md5(filehash).hexdigest()
rfiles.append(rfile)
print("###OSALLOU "+str(rfile))
return (rfiles, rdirs)
from builtins import str
from builtins import object
import os
import logging
import datetime
import time
import re
import tarfile
import zipfile
from biomaj.utils import Utils
from biomaj.mongo_connector import MongoConnector
class _FakeLock(object):
'''
Fake lock for downloaders not called by a Downloadthread
'''
def __init__(self):
pass
def acquire(self):
pass
def release(self):
pass
class DownloadInterface(object):
'''
Main interface that all downloaders must extend
'''
files_num_threads = 4
def __init__(self):
self.config = None
self.files_to_download = []
self.files_to_copy = []
self.error = False
self.credentials = None
#bank name
self.bank = None
self.mkdir_lock = _FakeLock()
self.kill_received = False
self.proxy = None
# 24h timeout
self.timeout = 3600 * 24
# Optional save target for single file downloaders
self.save_as = None
def set_proxy(self, proxy, proxy_auth=None):
'''
Use a proxy to connect to remote servers
:param proxy: proxy to use (see http://curl.haxx.se/libcurl/c/CURLOPT_PROXY.html for format)
:type proxy: str
:param proxy_auth: proxy authentication if any (user:password)
:type proxy_auth: str
'''
self.proxy = proxy
self.proxy_auth = proxy_auth
def set_progress(self, val, max):
'''
Update progress on download
:param val: number of downloaded files since last progress
:type val: int
:param max: number of files to download
:type max: int
'''
logging.debug('Download:progress:'+str(val)+'/'+str(max))
if not self.bank:
logging.debug('bank not specified, skipping record of download progress')
return
MongoConnector.banks.update({'name': self.bank},
{'$inc': {'status.download.progress': val},
'$set': {'status.download.total': max}})
def match(self, patterns, file_list, dir_list=None, prefix='', submatch=False):
'''
Find files matching patterns. Sets instance variable files_to_download.
:param patterns: regexps to match
:type patterns: list
:param file_list: list of files to match
:type file_list: list
:param dir_list: sub directories in current dir
:type dir_list: list
:param prefix: directory prefix
:type prefix: str
:param submatch: first call to match, or called from match
:type submatch: bool
'''
logging.debug('Download:File:RegExp:'+str(patterns))
if dir_list is None:
dir_list = []
if not submatch:
self.files_to_download = []
for pattern in patterns:
subdirs_pattern = pattern.split('/')
if len(subdirs_pattern) > 1:
# Pattern contains sub directories
subdir = subdirs_pattern[0]
if subdir == '^':
subdirs_pattern = subdirs_pattern[1:]
subdir = subdirs_pattern[0]
if not dir_list and pattern == '**/*':
# Take all and no more dirs, take all files
for rfile in file_list:
rfile['root'] = self.rootdir
if prefix != '':
rfile['name'] = prefix + '/' +rfile['name']
self.files_to_download.append(rfile)
logging.debug('Download:File:MatchRegExp:'+rfile['name'])
return
for direlt in dir_list:
subdir = direlt['name']
logging.debug('Download:File:Subdir:Check:'+subdir)
if pattern == '**/*':
(subfile_list, subdirs_list) = self.list(prefix+'/'+subdir+'/')
self.match([pattern], subfile_list, subdirs_list, prefix+'/'+subdir, True)
for rfile in file_list:
if pattern == '**/*' or re.match(pattern, rfile['name']):
rfile['root'] = self.rootdir
if prefix != '':
rfile['name'] = prefix + '/' +rfile['name']
self.files_to_download.append(rfile)
logging.debug('Download:File:MatchRegExp:'+rfile['name'])
else:
if re.match(subdirs_pattern[0], subdir):
logging.debug('Download:File:Subdir:Match:'+subdir)
# subdir match the beginning of the pattern
# check match in subdir
(subfile_list, subdirs_list) = self.list(prefix+'/'+subdir+'/')
self.match(['/'.join(subdirs_pattern[1:])], subfile_list, subdirs_list, prefix+'/'+subdir, True)
else:
for rfile in file_list:
if re.match(pattern, rfile['name']):
rfile['root'] = self.rootdir
if prefix != '':
rfile['name'] = prefix + '/' +rfile['name']
self.files_to_download.append(rfile)
logging.debug('Download:File:MatchRegExp:'+rfile['name'])
if not submatch and len(self.files_to_download) == 0:
raise Exception('no file found matching expressions')
def set_permissions(self, file_path, file_info):
'''
Sets file attributes to remote ones
'''
ftime = datetime.date(int(file_info['year']), int(file_info['month']), int(file_info['day']))
settime = time.mktime(ftime.timetuple())
os.utime(file_path, (settime, settime))
def download_or_copy(self, available_files, root_dir, check_exists=True):
'''
If a file to download is available in available_files, copy it instead of downloading it.
Update the instance variables files_to_download and files_to_copy
:param available_files: list of files available in root_dir
:type available files: list
:param root_dir: directory where files are available
:type root_dir: str
:param check_exists: checks if file exists locally
:type check_exists: bool
'''
self.files_to_copy = []
# In such case, it forces the download again
if not available_files:
return
available_files.sort(key=lambda x: x['name'])
self.files_to_download.sort(key=lambda x: x['name'])
new_files_to_download = []
test1_tuples = ((d['name'], d['year'], d['month'], d['day'], d['size']) for d in self.files_to_download)
test2_tuples = set((d['name'], d['year'], d['month'], d['day'], d['size']) for d in available_files)
new_or_modified_files = [t for t in test1_tuples if t not in test2_tuples]
index = 0
if len(new_or_modified_files) > 0:
for dfile in self.files_to_download:
if index < len(new_or_modified_files) and \
dfile['name'] == new_or_modified_files[index][0]:
new_files_to_download.append(dfile)
index += 1
else:
if not check_exists or os.path.exists(os.path.join(root_dir, dfile['name'])):
dfile['root'] = root_dir
self.files_to_copy.append(dfile)
else:
new_files_to_download.append(dfile)
else:
# Copy everything
for dfile in self.files_to_download:
if not check_exists or os.path.exists(os.path.join(root_dir, dfile['name'])):
dfile['root'] = root_dir
self.files_to_copy.apppend(dfile)
else:
new_files_to_download.append(dfile)
self.files_to_download = new_files_to_download
def download(self, local_dir):
'''
Download remote files to local_dir
:param local_dir: Directory where files should be downloaded
:type local_dir: str
:return: list of downloaded files
'''
pass
def list(self):
'''
List directory
:return: tuple of file list and dir list
'''
pass
def chroot(self, cwd):
'''
Change directory
'''
pass
def set_credentials(self, userpwd):
'''
Set credentials in format user:pwd
:param userpwd: credentials
:type userpwd: str
'''
self.credentials = userpwd
def close(self):
'''
Close connection
'''
pass
from future import standard_library
standard_library.install_aliases()
from builtins import str
import logging
import pycurl
import io
import re
import os
import datetime
import hashlib
from biomaj.utils import Utils
from biomaj.download.interface import DownloadInterface
class LocalDownload(DownloadInterface):
'''
Base class to copy file from local system
protocol=cp
server=localhost
remote.dir=/blast/db/FASTA/
remote.files=^alu.*\\.gz$
'''
def __init__(self, rootdir):
DownloadInterface.__init__(self)
logging.debug('Download')
self.rootdir = rootdir
def download(self, local_dir):
'''
Copy local files to local_dir
:param local_dir: Directory where files should be copied
:type local_dir: str
:return: list of downloaded files
'''
logging.debug('Local:Download')
Utils.copy_files(self.files_to_download, local_dir, lock=self.mkdir_lock)
return self.files_to_download
def list(self, directory=''):
'''
List FTP directory
:return: tuple of file and dirs in current directory with details
'''
logging.debug('Download:List:'+self.rootdir+directory)
# lets walk through each line
rfiles = []
rdirs = []
files = [f for f in os.listdir(self.rootdir + directory)]
for file_in_files in files:
rfile = {}
fstat = os.stat(os.path.join(self.rootdir + directory,file_in_files))
rfile['permissions'] = str(fstat.st_mode)
rfile['group'] = str(fstat.st_gid)
rfile['user'] = str(fstat.st_uid)
rfile['size'] = str(fstat.st_size)
fstat_mtime = datetime.datetime.fromtimestamp(fstat.st_mtime)
rfile['month'] = fstat_mtime.month
rfile['day'] = fstat_mtime.day
rfile['year'] = fstat_mtime.year
rfile['name'] = file_in_files
filehash = (rfile['name']+str(fstat.st_mtime)+rfile['size']).encode('utf-8')
rfile['hash'] = hashlib.md5(filehash).hexdigest()
is_dir = False
if os.path.isdir(os.path.join(self.rootdir + directory, file_in_files)):
is_dir = True
if not is_dir:
rfiles.append(rfile)
else:
rdirs.append(rfile)
return (rfiles, rdirs)
def chroot(self, cwd):
logging.debug('Download: change dir '+cwd)
os.chdir(cwd)
# Biological file mime types
application/fasta fasta fa fsa
application/bam bam bai
application/gff gff gff3
application/bed bed
application/fastq fastq
application/gtf gtf
application/octet-stream ab1 scf
application/axt axt
application/csFasta csfasta
application/FasttqSolexa fastqsolexa
application/Interval interval
application/Laj laj
application/Lav lav
application/Maf maf
application/QualityScore qual
application/BlastXml blastxml
application/Wiggle wig
......@@ -18,3 +18,4 @@ class MongoConnector(object):
MongoConnector.banks = MongoConnector.db.banks
MongoConnector.users = MongoConnector.db.users
MongoConnector.db_schema = MongoConnector.db.db_schema
MongoConnector.history = MongoConnector.db.history
......@@ -2,14 +2,14 @@ from builtins import str
from builtins import object
import smtplib
import email.utils
from biomaj.workflow import Workflow
import logging
import sys
if sys.version < '3':
from email.MIMEText import MIMEText
else:
from email.mime.text import MIMEText
from biomaj.workflow import Workflow
import logging
class Notify(object):
"""
......@@ -21,28 +21,33 @@ class Notify(object):
if not bank.config.get('mail.smtp.host') or bank.session is None:
logging.info('Notify:none')
return
admins = bank.config.get('mail.admin')
if not admins:
logging.info('Notify: no mail.admin defined')
return
admin_list = admins.split(',')
logging.info('Notify:' + bank.config.get('mail.admin'))
mfrom = bank.config.get('mail.from')
mto = bank.config.get('mail.admin')
log_file = bank.config.log_file
msg = MIMEText('')
if log_file:
fp = None
if sys.version < '3':
fp = open(log_file, 'rb')
msg = MIMEText(fp.read())
else:
fp = open(log_file, 'r')
msg = MIMEText(fp.read(2000000))
fp.close()
msg['To'] = email.utils.formataddr(('Recipient', mto))
msg['From'] = email.utils.formataddr(('Author', mfrom))
#msg['Subject'] = 'BANK['+bank.name+'] - STATUS['+str(bank.session.get_status(Workflow.FLOW_OVER))+'] - UPDATE['+str(bank.session.get('update'))+'] - REMOVE['+str(bank.session.get('remove'))+']'
msg['Subject'] = 'BANK[' + bank.name + '] - STATUS[' + str(bank.session.get_status(Workflow.FLOW_OVER)) + '] - UPDATE[' + str(bank.session.get('update')) + '] - REMOVE[' + str(bank.session.get('remove')) + ']' + ' - RELEASE[' + str(bank.session.get('release')) + ']'
#if bank.session.get('action') == 'update':
# msg['Subject'] = 'BANK['+bank.name+'] - STATUS['+str(bank.session.get_status(Workflow.FLOW_OVER))+'] - UPDATE['+str(bank.session.get('update'))+'] - REMOVE['+str(bank.session.get('remove'))+']' + ' - RELEASE['+str(bank.session.get('release'))+']'
#else:
# msg['Subject'] = 'BANK['+bank.name+'] - STATUS['+str(bank.session.get_status(Workflow.FLOW_OVER))+'] - UPDATE['+str(bank.session.get('update'))+'] - REMOVE['+str(bank.session.get('remove'))+']'
logging.info(msg['subject'])
server = None
for mto in admin_list:
msg['To'] = email.utils.formataddr(('Recipient', mto))
try:
server = smtplib.SMTP(bank.config.get('mail.smtp.host'))
#server.set_debuglevel(1)
if bank.config.get('mail.tls') is not None and str(bank.config.get('mail.tls')) == 'true':
server.starttls()
if bank.config.get('mail.user') is not None and str(bank.config.get('mail.user')) != '':
......
......@@ -13,17 +13,8 @@ class Options(object):
"""
Gets an option if present, else return None
"""
#if self.options is None:
# return None
#if hasattr(self.options, option):
# return getattr(self.options, option)
if hasattr(self, option):
return getattr(self, option)
#if option in self.options:
# return self.options[option]
return None
UPDATE = 'update'
......
......@@ -3,8 +3,11 @@ import threading
import logging
import os
from biomaj.process.process import Process, DrmaaProcess, DockerProcess
from biomaj_process.process import Process, DrmaaProcess, DockerProcess
from biomaj_process.process import RemoteProcess
from biomaj.mongo_connector import MongoConnector
from biomaj_zipkin.zipkin import Zipkin
class MetaProcess(threading.Thread):
'''
......@@ -46,9 +49,6 @@ class MetaProcess(threading.Thread):
self._stopevent = threading.Event()
self.bmaj_env = os.environ.copy()
#self.bmaj_env = {}
# Copy all config from bank
self.bmaj_only_env = {}
# The root directory where all databases are stored.
......@@ -108,7 +108,6 @@ class MetaProcess(threading.Thread):
self.bmaj_env['logfile'] = log_file
self.bmaj_only_env['logfile'] = log_file
self.bmaj_env['offlinedir'] = self.bank.session.get_offline_directory()
self.bmaj_only_env['offlinedir'] = self.bmaj_env['offlinedir']
......@@ -138,7 +137,6 @@ class MetaProcess(threading.Thread):
self.bmaj_env[key] = ''
self.bmaj_only_env[key] = ''
def set_progress(self, name, status=None):
'''
Update progress on download
......@@ -150,8 +148,10 @@ class MetaProcess(threading.Thread):
'''
logging.debug('Process:progress:' + name + "=" + str(status))
if self.workflow is not None:
MongoConnector.banks.update({'name': self.bank.name},
{'$set': {'status.'+self.workflow+'.progress.'+name: status}})
MongoConnector.banks.update(
{'name': self.bank.name},
{'$set': {'status.' + self.workflow + '.progress.' + name: status}}
)
def run(self):
# Run meta processes
......@@ -173,7 +173,6 @@ class MetaProcess(threading.Thread):
continue
logging.info("PROC:META:RUN:PROCESS:" + bprocess)
# bprocess.name may not be unique
#name = self.bank.config.get(bprocess+'.name')
name = bprocess
desc = self.bank.config.get(bprocess + '.desc')
cluster = self.bank.config.get_bool(bprocess + '.cluster', default=False)
......@@ -187,14 +186,58 @@ class MetaProcess(threading.Thread):
bmaj_process = DrmaaProcess(meta + '_' + name, exe, args, desc, proc_type, native,
expand, self.bmaj_env,
os.path.dirname(self.bank.config.log_file))
elif docker:
else:
if self.bank.config.get('micro.biomaj.service.process', default=None) == '1':
logging.info("PROC:META:RUN:REMOTEPROCESS: " + bprocess)
# (self, name, exe, args, desc=None, proc_type=None, expand=True,
# bank_env=None, log_dir=None,
# rabbit_mq=None, rabbit_mq_port=5672, rabbit_mq_user=None, rabbit_mq_password=None, rabbit_mq_virtualhost=None,
# proxy=None, bank=None):
proxy = self.bank.config.get('micro.biomaj.proxy.process')
if not proxy:
proxy = self.bank.config.get('micro.biomaj.proxy')
use_sudo = self.bank.config.get_bool('docker.sudo', default=True)
bmaj_process = DockerProcess(meta+'_'+name, exe, args, desc, proc_type, docker,
expand, self.bmaj_only_env,
os.path.dirname(self.bank.config.log_file), use_sudo)
bmaj_process = RemoteProcess(
meta + '_' + name,
exe,
args,
desc=desc,
proc_type=proc_type,
expand=expand,
docker=docker,
docker_sudo=use_sudo,
bank_env=self.bmaj_only_env,
log_dir=os.path.dirname(self.bank.config.log_file),
rabbit_mq=self.bank.config.get('micro.biomaj.rabbit_mq'),
rabbit_mq_port=int(self.bank.config.get('micro.biomaj.rabbit_mq_port', default='5672')),
rabbit_mq_user=self.bank.config.get('micro.biomaj.rabbit_mq_user'),
rabbit_mq_password=self.bank.config.get('micro.biomaj.rabbit_mq_password'),
rabbit_mq_virtualhost=self.bank.config.get('micro.biomaj.rabbit_mq_virtualhost', default='/'),
proxy=proxy,
bank=self.bank.name
)
else:
bmaj_process = Process(meta+'_'+name, exe, args, desc, proc_type,
expand, self.bmaj_env, os.path.dirname(self.bank.config.log_file))
if docker:
use_sudo = self.bank.config.get_bool('docker.sudo', default=True)
bmaj_process = DockerProcess(
meta + '_' + name, exe, args,
desc=desc,
proc_type=proc_type,
docker=docker,
expand=expand,
bank_env=self.bmaj_only_env,
log_dir=os.path.dirname(self.bank.config.log_file),
use_sudo=use_sudo)
else:
bmaj_process = Process(
meta + '_' + name, exe, args,
desc=desc,
proc_type=proc_type,
expand=expand,
bank_env=self.bmaj_env,
log_dir=os.path.dirname(self.bank.config.log_file)
)
self.set_progress(bmaj_process.name, None)
if self.bank.config.get(bprocess + '.format'):
bmaj_process.format = self.bank.config.get(bprocess + '.format')
......@@ -204,7 +247,18 @@ class MetaProcess(threading.Thread):
bmaj_process.tags = self.bank.config.get(bprocess + '.tags')
if self.bank.config.get(bprocess + '.files'):
bmaj_process.files = self.bank.config.get(bprocess + '.files')
span = None
if self.bank.config.get('zipkin_trace_id'):
span = Zipkin('biomaj-process', bmaj_process.name, trace_id=self.bank.config.get('zipkin_trace_id'), parent_id=self.bank.config.get('zipkin_span_id'))
bmaj_process.set_trace(span.get_trace_id(), span.get_span_id())
res = bmaj_process.run(self.simulate)
if span:
span.add_binary_annotation('status', str(res))
span.trace()
processes_status[bprocess] = res
self.set_progress(bmaj_process.name, res)
if not res:
......@@ -250,7 +304,7 @@ class MetaProcess(threading.Thread):
if meta_tags == '':
meta_tags = proc.tags
meta_files = metas[3]
if not meta_format in self.meta_data[proc_name]:
if meta_format not in self.meta_data[proc_name]:
self.meta_data[proc_name][meta_format] = []
tags = meta_tags.split(',')
tag_list = {}
......@@ -258,9 +312,11 @@ class MetaProcess(threading.Thread):
for tag in tags:
t = tag.split(':')
tag_list[t[0]] = t[1]
self.meta_data[proc_name][meta_format].append({'tags': tag_list,
self.meta_data[proc_name][meta_format].append({
'tags': tag_list,
'types': meta_type.split(','),
'files': meta_files.split(',')})
'files': meta_files.split(',')}
)
if proc.files and proc.format:
tag_list = {}
if proc.tags != '':
......@@ -268,10 +324,11 @@ class MetaProcess(threading.Thread):
t = tag.split(':')
tag_list[t[0]] = t[1]
self.meta_data[proc_name][proc.format] = []
self.meta_data[proc_name][proc.format].append({'tags': tag_list,
self.meta_data[proc_name][proc.format].append({
'tags': tag_list,
'types': proc.types.split(','),
'files': proc.files.split(',')})
'files': proc.files.split(',')}
)
def stop(self):
self._stopevent.set()