Skip to content
Commits on Source (3)
3.1.6:
Fix #100 Catch error and log error if biomaj fails to connect to InfluxDB
Add history to update/remove operations
Add log in case of file deletion error during bank removal
check lock file exists when removing it
Update protobuf to work with biomaj.download 3.0.18
3.1.5:
Fix #97 Wrong offline dir checks
3.1.4:
Fix #88 Unset 'last_update_session' when found in pending sessions using --remove-pending
Add formats in bank info request
......
BioMAJ3
=====
This project is a complete rewrite of BioMAJ (http://biomaj.genouest.org).
This project is a complete rewrite of BioMAJ and the documentation is available here : http://biomaj.genouest.org.
BioMAJ (BIOlogie Mise A Jour) is a workflow engine dedicated to data
synchronization and processing. The Software automates the update cycle and the
......@@ -176,7 +176,7 @@ Execute unit tests but disable ones needing network access
Monitoring
==========
InfluxDB can be used to monitor biomaj. Following series are available:
InfluxDB (optional) can be used to monitor biomaj. Following series are available:
* biomaj.banks.quantity (number of banks)
* biomaj.production.size.total (size of all production directories)
......@@ -185,6 +185,8 @@ InfluxDB can be used to monitor biomaj. Following series are available:
* biomaj.bank.update.downloaded_files (number of downloaded files)
* biomaj.bank.update.new (track updates)
*WARNING* Influxdb database must be created, biomaj does not create the database (see https://docs.influxdata.com/influxdb/v1.6/query_language/database_management/#create-database)
License
=======
......
......@@ -74,6 +74,7 @@ class Bank(object):
BiomajConfig.global_config.get('GENERAL', 'db.name'))
self.banks = MongoConnector.banks
self.history = MongoConnector.history
self.bank = self.banks.find_one({'name': self.name})
if self.bank is None:
......@@ -107,6 +108,22 @@ class Bank(object):
else:
return False
@staticmethod
def get_history(limit=100):
"""
Get list of bank update/remove operations
"""
if MongoConnector.db is None:
MongoConnector(BiomajConfig.global_config.get('GENERAL', 'db.url'),
BiomajConfig.global_config.get('GENERAL', 'db.name'))
hist_list = []
hist = MongoConnector.history.find({}).sort("start", -1).limit(limit)
for h in hist:
del h['_id']
hist_list.append(h)
return hist_list
@staticmethod
def get_banks_disk_usage():
"""
......@@ -561,9 +578,17 @@ class Bank(object):
'freeze': False}
self.bank['production'].append(production)
self.banks.update({'name': self.name},
{'$push': {'production': production},
'$pull': {'pending': {'release': self.session.get('release'),
'id': self.session._session['id']}}
{
'$push': {'production': production}
})
self.banks.update({'name': self.name},
{
'$pull': {
'pending': {
'release': self.session.get('release'),
'id': self.session._session['id']
}
}
})
self.bank = self.banks.find_one({'name': self.name})
......@@ -917,6 +942,8 @@ class Bank(object):
:type force: bool
:return: bool
"""
start_time = datetime.now()
start_time = time.mktime(start_time.timetuple())
if not force:
has_freeze = False
for prod in self.bank['production']:
......@@ -932,13 +959,35 @@ class Bank(object):
bank_data_dir = self.get_data_dir()
logging.warn('DELETE ' + bank_data_dir)
if os.path.exists(bank_data_dir):
try:
shutil.rmtree(bank_data_dir)
except Exception:
logging.exception('Failed to delete bank directory: ' + bank_data_dir)
logging.error('Bank will be deleted but some files/dirs may still be present on system, you can safely manually delete them')
bank_offline_dir = os.path.join(self.config.get('data.dir'), self.config.get('offline.dir.name'))
if os.path.exists(bank_offline_dir):
try:
shutil.rmtree(bank_offline_dir)
except Exception:
logging.exception('Failed to delete bank offline directory: ' + bank_offline_dir)
logging.error('Bank will be deleted but some files/dirs may still be present on system, you can safely manually delete them')
bank_log_dir = os.path.join(self.config.get('log.dir'), self.name)
if os.path.exists(bank_log_dir) and self.no_log:
try:
shutil.rmtree(bank_log_dir)
except Exception:
logging.exception('Failed to delete bank log directory: ' + bank_log_dir)
logging.error('Bank will be deleted but some files/dirs may still be present on system, you can safely manually delete them')
end_time = datetime.now()
end_time = time.mktime(end_time.timetuple())
self.history.insert({
'bank': self.name,
'error': False,
'start': start_time,
'end': end_time,
'action': 'remove',
'updated': None
})
return True
def get_status(self):
......@@ -1012,6 +1061,8 @@ class Bank(object):
"""
release = str(release)
logging.warning('Bank:' + self.name + ':Remove')
start_time = datetime.now()
start_time = time.mktime(start_time.timetuple())
if not self.is_owner():
logging.error('Not authorized, bank owned by ' + self.bank['properties']['owner'])
......@@ -1047,8 +1098,17 @@ class Bank(object):
# Reset status, we take an update session
res = self.start_remove(session)
self.session.set('workflow_status', res)
self.save_session()
end_time = datetime.now()
end_time = time.mktime(end_time.timetuple())
self.history.insert({
'bank': self.name,
'error': not res,
'start': start_time,
'end': end_time,
'action': 'remove',
'updated': None
})
return res
......@@ -1061,6 +1121,8 @@ class Bank(object):
:return: bool
"""
logging.warning('Bank:' + self.name + ':Update')
start_time = datetime.now()
start_time = time.mktime(start_time.timetuple())
if not self.is_owner():
logging.error('Not authorized, bank owned by ' + self.bank['properties']['owner'])
......@@ -1108,7 +1170,20 @@ class Bank(object):
res = self.start_update()
self.session.set('workflow_status', res)
self.save_session()
try:
self.__stats()
except Exception:
logging.exception('Failed to send stats')
end_time = datetime.now()
end_time = time.mktime(end_time.timetuple())
self.history.insert({
'bank': self.name,
'error': not res,
'start': start_time,
'end': end_time,
'action': 'update',
'updated': self.session.get('update')
})
return res
def __stats(self):
......@@ -1139,7 +1214,8 @@ class Bank(object):
else:
influxdb = InfluxDBClient(host=db_host, port=db_port, database=db_name)
except Exception as e:
logging.error('InfluxDB connection error: ' + str(e))
logging.error('Failed to connect to InfluxDB, database may not be created, skipping the record of statistics')
logging.exception('InfluxDB connection error: ' + str(e))
return
metrics = []
......@@ -1230,7 +1306,14 @@ class Bank(object):
}
metrics.append(influx_metric)
res = None
try:
res = influxdb.write_points(metrics, time_precision="s")
except Exception as e:
logging.error('Failed to connect to InfluxDB, database may not be created, skipping the record of statistics')
logging.exception('InfluxDB connection error: ' + str(e))
return
if not res:
logging.error('Failed to send metrics to database')
......
......@@ -18,3 +18,4 @@ class MongoConnector(object):
MongoConnector.banks = MongoConnector.db.banks
MongoConnector.users = MongoConnector.db.users
MongoConnector.db_schema = MongoConnector.db.db_schema
MongoConnector.history = MongoConnector.db.history
......@@ -14,7 +14,7 @@ import hashlib
from biomaj_core.utils import Utils
from biomaj_download.downloadclient import DownloadClient
from biomaj_download.message import message_pb2
from biomaj_download.message import downmessage_pb2
from biomaj_download.download.http import HTTPParse
from biomaj_download.download.localcopy import LocalDownload
......@@ -251,6 +251,7 @@ class Workflow(object):
data_dir = self.session.config.get('data.dir')
lock_dir = self.session.config.get('lock.dir', default=data_dir)
lock_file = os.path.join(lock_dir, self.name + '.lock')
if os.path.exists(lock_file):
os.remove(lock_file)
return True
......@@ -287,7 +288,12 @@ class RemoveWorkflow(Workflow):
return False
if os.path.exists(self.session.get_full_release_directory()):
logging.info('Workflow:wf_remove:delete:' + self.session.get_full_release_directory())
try:
shutil.rmtree(self.session.get_full_release_directory())
except Exception:
logging.exception('Failed to delete bank release directory: ' + self.session.get_full_release_directory())
logging.error('Bank will be deleted but some files/dirs may still be present on system, you can safely manually delete them')
return self.bank.remove_session(self.session.get('update_session_id'))
def wf_removeprocess(self):
......@@ -1082,6 +1088,7 @@ class UpdateWorkflow(Workflow):
credentials = cf.get('server.credentials')
remote_dir = cf.get('remote.dir')
if protocol == 'directhttp' or protocol == 'directhttps' or protocol == 'directftp':
keys = cf.get('url.params')
if keys is not None:
......@@ -1240,12 +1247,18 @@ class UpdateWorkflow(Workflow):
keep_files = []
nb_expected_files += len(downloader.files_to_download)
if os.path.exists(offline_dir):
logging.debug('Workflow:wf_download:offline_check_dir:' + offline_dir)
for file_to_download in downloader.files_to_download:
# If file is in offline dir and has same date and size, do not download again
if os.path.exists(offline_dir + '/' + file_to_download['name']):
offline_file = file_to_download['name']
if file_to_download.get('save_as', None):
offline_file = file_to_download['save_as']
logging.debug('Workflow:wf_download:offline_check_file:' + offline_file)
if os.path.exists(offline_dir + '/' + offline_file):
logging.debug('Workflow:wf_download:offline_check_file_identical:' + offline_file)
try:
file_stat = os.stat(offline_dir + '/' + file_to_download['name'])
f_stat = datetime.datetime.fromtimestamp(os.path.getmtime(offline_dir + '/' + file_to_download['name']))
file_stat = os.stat(offline_dir + '/' + offline_file)
f_stat = datetime.datetime.fromtimestamp(os.path.getmtime(offline_dir + '/' + offline_file))
year = str(f_stat.year)
month = str(f_stat.month)
day = str(f_stat.day)
......@@ -1253,16 +1266,16 @@ class UpdateWorkflow(Workflow):
str(year) != str(file_to_download['year']) or \
str(month) != str(file_to_download['month']) or \
str(day) != str(file_to_download['day']):
logging.debug('Workflow:wf_download:different_from_offline:' + file_to_download['name'])
logging.debug('Workflow:wf_download:different_from_offline:' + offline_file)
keep_files.append(file_to_download)
else:
logging.debug('Workflow:wf_download:offline:' + file_to_download['name'])
logging.debug('Workflow:wf_download:same_as_offline:' + offline_file)
files_in_offline += 1
copied_files.append(file_to_download)
except Exception as e:
# Could not get stats on file
logging.debug('Workflow:wf_download:offline:failed to stat file: ' + str(e))
os.remove(offline_dir + '/' + file_to_download['name'])
os.remove(offline_dir + '/' + offline_file)
keep_files.append(file_to_download)
else:
keep_files.append(file_to_download)
......@@ -1352,15 +1365,15 @@ class UpdateWorkflow(Workflow):
for downloader in downloaders:
for file_to_download in downloader.files_to_download:
operation = message_pb2.Operation()
operation = downmessage_pb2.Operation()
operation.type = 1
message = message_pb2.DownloadFile()
message = downmessage_pb2.DownloadFile()
message.bank = self.name
message.session = session
message.local_dir = offline_dir
remote_file = message_pb2.DownloadFile.RemoteFile()
remote_file = downmessage_pb2.DownloadFile.RemoteFile()
protocol = downloader.protocol
remote_file.protocol = message_pb2.DownloadFile.Protocol.Value(protocol.upper())
remote_file.protocol = downmessage_pb2.DownloadFile.Protocol.Value(protocol.upper())
if downloader.credentials:
remote_file.credentials = downloader.credentials
......@@ -1372,7 +1385,7 @@ class UpdateWorkflow(Workflow):
remote_file.remote_dir = ''
if http_parse:
msg_http_parse = message_pb2.DownloadFile.HttpParse()
msg_http_parse = downmessage_pb2.DownloadFile.HttpParse()
msg_http_parse.dir_line = http_parse.dir_line
msg_http_parse.file_line = http_parse.file_line
msg_http_parse.dir_name = http_parse.dir_name
......@@ -1412,7 +1425,7 @@ class UpdateWorkflow(Workflow):
if 'md5' in file_to_download and file_to_download['md5']:
biomaj_file.metadata.md5 = file_to_download['md5']
message.http_method = message_pb2.DownloadFile.HTTP_METHOD.Value(downloader.method.upper())
message.http_method = downmessage_pb2.DownloadFile.HTTP_METHOD.Value(downloader.method.upper())
timeout_download = cf.get('timeout.download', default=None)
if timeout_download:
......@@ -1422,7 +1435,7 @@ class UpdateWorkflow(Workflow):
logging.error('Wrong timeout type for timeout.download: ' + str(e))
if self.span:
trace = message_pb2.Operation.Trace()
trace = downmessage_pb2.Operation.Trace()
trace.trace_id = self.span.get_trace_id()
trace.span_id = self.span.get_span_id()
operation.trace.MergeFrom(trace)
......
biomaj3 (3.1.4-2) UNRELEASED; urgency=medium
biomaj3 (3.1.6-1) unstable; urgency=medium
[ Jelmer Vernooij ]
* Use secure copyright file specification URI.
-- Jelmer Vernooij <jelmer@debian.org> Sat, 20 Oct 2018 13:14:48 +0000
[ Olivier Sallou ]
* New upstream release
-- Olivier Sallou <osallou@debian.org> Thu, 25 Oct 2018 09:18:06 +0000
biomaj3 (3.1.4-1) unstable; urgency=medium
......
......@@ -35,7 +35,7 @@ config = {
'url': 'http://biomaj.genouest.org',
'download_url': 'http://biomaj.genouest.org',
'author_email': 'olivier.sallou@irisa.fr',
'version': '3.1.4',
'version': '3.1.6',
'classifiers': [
# How mature is this project? Common values are
# 3 - Alpha
......