Skip to content
Commits on Source (5)
3.0.18:
Rename protobuf and use specific package to avoid conflicts
3.0.17:
Regenerate protobuf message desc, failing on python3
3.0.16:
Add missing req in setup.py
3.0.15:
Fix progress download control where could have infinite loop
Add irods download
3.0.14:
Allow setup of local_endpoint per service, else use default local_endpoint
......
......@@ -41,3 +41,5 @@ If you cloned the repository and installed it via python setup.py install, just
gunicorn -c gunicorn_conf.py biomaj_download.biomaj_download_web:app
Web processes should be behind a proxy/load balancer, API base url /api/download
Prometheus endpoint metrics are exposed via /metrics on web server
......@@ -16,7 +16,7 @@ from prometheus_client import multiprocess
from prometheus_client import CollectorRegistry
import consul
from biomaj_download.message import message_pb2
from biomaj_download.message import downmessage_pb2
from biomaj_download.downloadservice import DownloadService
from biomaj_core.utils import Utils
......@@ -86,7 +86,7 @@ def list_status(bank, session):
Check if listing request is over
'''
dserv = DownloadService(config_file, rabbitmq=False)
biomaj_file_info = message_pb2.DownloadFile()
biomaj_file_info = downmessage_pb2.DownloadFile()
biomaj_file_info.bank = bank
biomaj_file_info.session = session
biomaj_file_info.local_dir = '/tmp'
......@@ -100,7 +100,7 @@ def download_status(bank, session):
Get number of downloads and errors for bank and session. Progress includes successful download and errored downloads.
'''
dserv = DownloadService(config_file, rabbitmq=False)
biomaj_file_info = message_pb2.DownloadFile()
biomaj_file_info = downmessage_pb2.DownloadFile()
biomaj_file_info.bank = bank
biomaj_file_info.session = session
biomaj_file_info.local_dir = '/tmp'
......@@ -114,7 +114,7 @@ def download_error(bank, session):
Get errors info for bank and session
'''
dserv = DownloadService(config_file, rabbitmq=False)
biomaj_file_info = message_pb2.DownloadFile()
biomaj_file_info = downmessage_pb2.DownloadFile()
biomaj_file_info.bank = bank
biomaj_file_info.session = session
biomaj_file_info.local_dir = '/tmp'
......@@ -128,7 +128,7 @@ def list_result(bank, session):
Get file listing for bank and session, using FileList protobuf serialized string
'''
dserv = DownloadService(config_file, rabbitmq=False)
biomaj_file_info = message_pb2.DownloadFile()
biomaj_file_info = downmessage_pb2.DownloadFile()
biomaj_file_info.bank = bank
biomaj_file_info.session = session
biomaj_file_info.local_dir = '/tmp'
......@@ -146,7 +146,7 @@ def create_session(bank):
@app.route('/api/download/session/<bank>/<session>', methods=['DELETE'])
def clean_session(bank, session):
dserv = DownloadService(config_file, rabbitmq=False)
biomaj_file_info = message_pb2.DownloadFile()
biomaj_file_info = downmessage_pb2.DownloadFile()
biomaj_file_info.bank = bank
biomaj_file_info.session = session
dserv.clean(biomaj_file_info)
......
import logging
import os
from datetime import datetime
import time
from biomaj_download.download.interface import DownloadInterface
from irods.session import iRODSSession
from irods.models import Collection, DataObject, User
class IRODSDownload(DownloadInterface):
# To connect to irods session : sess = iRODSSession(host='localhost', port=1247, user='rods', password='rods', zone='tempZone')
# password : self.credentials
def __init__(self, protocol, server, remote_dir):
DownloadInterface.__init__(self)
self.port = None
self.remote_dir = remote_dir # directory on the remote server : zone
self.rootdir = remote_dir
self.user = None
self.password = None
self.server = server
self.zone = None
def set_param(self, param):
# self.param is a dictionnary which has the following form :{'password': u'biomaj', 'protocol': u'iget', 'user': u'biomaj', 'port': u'port'}
self.param = param
self.port = int(param['port'])
self.user = str(param['user'])
self.password = str(param['password'])
self.zone = str(param['zone'])
def list(self, directory=''):
session = iRODSSession(host=self.server, port=self.port, user=self.user, password=self.password, zone=self.zone)
rfiles = []
rdirs = []
rfile = {}
date = None
for result in session.query(Collection.name, DataObject.name, DataObject.size, DataObject.owner_name, DataObject.modify_time).filter(User.name == self.user).get_results():
# if the user is biomaj : he will have access to all the irods data (biomaj ressource) : drwxr-xr-x
# Avoid duplication
if rfile != {} and rfile['name'] == str(result[DataObject.name]) and date == str(result[DataObject.modify_time]).split(" ")[0].split('-'):
continue
rfile = {}
date = str(result[DataObject.modify_time]).split(" ")[0].split('-')
rfile['permissions'] = "-rwxr-xr-x"
rfile['size'] = int(result[DataObject.size])
rfile['month'] = int(date[1])
rfile['day'] = int(date[2])
rfile['year'] = int(date[0])
rfile['name'] = str(result[DataObject.name])
rfile['download_path'] = str(result[Collection.name])
rfiles.append(rfile)
session.cleanup()
return (rfiles, rdirs)
def download(self, local_dir, keep_dirs=True):
'''
Download remote files to local_dir
:param local_dir: Directory where files should be downloaded
:type local_dir: str
:param keep_dirs: keep file name directory structure or copy file in local_dir directly
:param keep_dirs: bool
:return: list of downloaded files
'''
logging.debug('IRODS:Download')
try:
os.chdir(local_dir)
except TypeError:
logging.error("IRODS:list:Could not find offline_dir")
nb_files = len(self.files_to_download)
cur_files = 1
# give a working directory to copy the file from irods
remote_dir = self.remote_dir
for rfile in self.files_to_download:
if self.kill_received:
raise Exception('Kill request received, exiting')
file_dir = local_dir
if 'save_as' not in rfile or rfile['save_as'] is None:
rfile['save_as'] = rfile['name']
if keep_dirs:
file_dir = local_dir + os.path.dirname(rfile['save_as'])
file_path = file_dir + '/' + os.path.basename(rfile['save_as'])
# For unit tests only, workflow will take in charge directory creation before to avoid thread multi access
if not os.path.exists(file_dir):
os.makedirs(file_dir)
logging.debug('IRODS:Download:Progress:' + str(cur_files) + '/' + str(nb_files) + ' downloading file ' + rfile['name'])
logging.debug('IRODS:Download:Progress:' + str(cur_files) + '/' + str(nb_files) + ' save as ' + rfile['save_as'])
cur_files += 1
start_time = datetime.now()
start_time = time.mktime(start_time.timetuple())
self.remote_dir = rfile['root']
error = self.irods_download(file_dir, str(self.remote_dir), str(rfile['name']))
if error:
rfile['download_time'] = 0
rfile['error'] = True
raise Exception("IRODS:Download:Error:" + rfile['root'] + '/' + rfile['name'])
end_time = datetime.now()
end_time = time.mktime(end_time.timetuple())
rfile['download_time'] = end_time - start_time
self.set_permissions(file_path, rfile)
self.remote_dir = remote_dir
return(self.files_to_download)
def irods_download(self, file_dir, file_path, file_to_download):
error = False
logging.debug('IRODS:IRODS DOWNLOAD')
session = iRODSSession(host=self.server, port=self.port, user=self.user, password=self.password, zone=self.zone)
try:
file_to_get = str(file_path) + str(file_to_download)
# Write the file to download in the wanted file_dir : with the python-irods iget
obj = session.data_objects.get(file_to_get, file_dir)
except ExceptionIRODS as e:
logging.error("RsyncError:" + str(e))
logging.error("RsyncError: irods object" + str(obj))
session.cleanup()
return(error)
class ExceptionIRODS(Exception):
def __init__(self, exception_reason):
self.exception_reason = exception_reason
def __str__(self):
return self.exception_reason
......@@ -7,7 +7,7 @@ import sys
import pika
from biomaj_download.download.downloadthreads import DownloadThread
from biomaj_download.message import message_pb2
from biomaj_download.message import downmessage_pb2
if sys.version_info[0] < 3:
from Queue import Queue
......@@ -92,15 +92,15 @@ class DownloadClient(DownloadService):
'''
for downloader in downloaders:
for file_to_download in downloader.files_to_download:
operation = message_pb2.Operation()
operation = downmessage_pb2.Operation()
operation.type = 1
message = message_pb2.DownloadFile()
message = downmessage_pb2.DownloadFile()
message.bank = self.bank
message.session = self.session
message.local_dir = offline_dir
remote_file = message_pb2.DownloadFile.RemoteFile()
remote_file = downmessage_pb2.DownloadFile.RemoteFile()
protocol = downloader.protocol
remote_file.protocol = message_pb2.DownloadFile.Protocol.Value(protocol.upper())
remote_file.protocol = downmessage_pb2.DownloadFile.Protocol.Value(protocol.upper())
remote_file.server = downloader.server
if cf.get('remote.dir'):
remote_file.remote_dir = cf.get('remote.dir')
......@@ -135,7 +135,7 @@ class DownloadClient(DownloadService):
if 'md5' in file_to_download and file_to_download['md5']:
biomaj_file.metadata.md5 = file_to_download['md5']
message.http_method = message_pb2.DownloadFile.HTTP_METHOD.Value(downloader.method.upper())
message.http_method = downmessage_pb2.DownloadFile.HTTP_METHOD.Value(downloader.method.upper())
timeout_download = cf.get('timeout.download', None)
if timeout_download:
......@@ -216,7 +216,7 @@ class DownloadClient(DownloadService):
self.ask_download(operation)
nb_submitted += 1
if progress == nb_files_to_download:
if progress >= nb_files_to_download:
over = True
logging.info("Workflow:wf_download:RemoteDownload:Completed:" + str(progress))
logging.info("Workflow:wf_download:RemoteDownload:Errors:" + str(error))
......
......@@ -18,11 +18,11 @@ from biomaj_download.download.http import HTTPDownload
from biomaj_download.download.direct import DirectFTPDownload
from biomaj_download.download.direct import DirectHttpDownload
from biomaj_download.download.localcopy import LocalDownload
from biomaj_download.message import message_pb2
from biomaj_download.message import downmessage_pb2
from biomaj_download.download.rsync import RSYNCDownload
from biomaj_core.utils import Utils
from biomaj_zipkin.zipkin import Zipkin
from biomaj_download.download.protocolirods import IRODSDownload
app = Flask(__name__)
app_log = logging.getLogger('werkzeug')
......@@ -123,7 +123,7 @@ class DownloadService(object):
credentials=None, http_parse=None, http_method=None, param=None,
proxy=None, proxy_auth='',
save_as=None, timeout_download=None, offline_dir=None):
protocol = message_pb2.DownloadFile.Protocol.Value(protocol_name.upper())
protocol = downmessage_pb2.DownloadFile.Protocol.Value(protocol_name.upper())
downloader = None
if protocol in [0, 1]:
downloader = FTPDownload(protocol_name, server, remote_dir)
......@@ -139,13 +139,14 @@ class DownloadService(object):
downloader = DirectHttpDownload('https', server, '/')
if protocol == 8:
downloader = RSYNCDownload('rsync', server, remote_dir)
if protocol == 9:
downloader = IRODSDownload('irods', server, remote_dir)
if downloader is None:
return None
for remote_file in remote_files:
if remote_file['save_as']:
save_as = remote_file['save_as']
# For direct protocol, we only keep base name
if protocol in [4, 5, 6]:
tmp_remote = []
......@@ -170,7 +171,6 @@ class DownloadService(object):
if save_as:
downloader.set_save_as(save_as)
if param:
downloader.set_param(param)
......@@ -191,7 +191,7 @@ class DownloadService(object):
server = biomaj_file_info.remote_file.server
remote_dir = biomaj_file_info.remote_file.remote_dir
protocol_name = message_pb2.DownloadFile.Protocol.Name(protocol).lower()
protocol_name = downmessage_pb2.DownloadFile.Protocol.Name(protocol).lower()
self.logger.debug('%s request to download from %s://%s' % (biomaj_file_info.bank, protocol_name, server))
remote_files = []
......@@ -220,7 +220,7 @@ class DownloadService(object):
remote_files=remote_files,
credentials=biomaj_file_info.remote_file.credentials,
http_parse=biomaj_file_info.remote_file.http_parse,
http_method=message_pb2.DownloadFile.HTTP_METHOD.Name(biomaj_file_info.http_method),
http_method=downmessage_pb2.DownloadFile.HTTP_METHOD.Name(biomaj_file_info.http_method),
param=params,
proxy=proxy,
proxy_auth=proxy_auth,
......@@ -294,7 +294,7 @@ class DownloadService(object):
file_list = self.redis_client.get(self.config['redis']['prefix'] + ':' + biomaj_file_info.bank + ':session:' + biomaj_file_info.session + ':files')
if protobuf_decode:
file_list_pb2 = message_pb2.FileList()
file_list_pb2 = downmessage_pb2.FileList()
file_list_pb2.ParseFromString(file_list_pb2)
return file_list_pb2
......@@ -306,7 +306,7 @@ class DownloadService(object):
'''
file_list = []
dir_list = []
file_list_pb2 = message_pb2.FileList()
file_list_pb2 = downmessage_pb2.FileList()
try:
(file_list, dir_list) = download_handler.list()
......@@ -318,7 +318,6 @@ class DownloadService(object):
else:
self.logger.debug('End of download for %s session %s' % (biomaj_file_info.bank, biomaj_file_info.session))
for file_elt in download_handler.files_to_download:
# file_pb2 = message_pb2.File()
file_pb2 = file_list_pb2.files.add()
file_pb2.name = file_elt['name']
file_pb2.root = file_elt['root']
......@@ -331,7 +330,7 @@ class DownloadService(object):
param = file_list_pb2.param.add()
param.name = key
param.value = file_elt['param'][key]
metadata = message_pb2.File.MetaData()
metadata = downmessage_pb2.File.MetaData()
metadata.permissions = file_elt['permissions']
metadata.group = file_elt['group']
metadata.size = int(file_elt['size'])
......@@ -438,7 +437,7 @@ class DownloadService(object):
Manage download and send ACK message
'''
try:
operation = message_pb2.Operation()
operation = downmessage_pb2.Operation()
operation.ParseFromString(body)
message = operation.download
span = None
......
package biomaj;
package biomaj.download;
message File {
// Name of the file
......@@ -72,6 +72,7 @@ message DownloadFile {
DIRECTHTTPS = 6;
LOCAL = 7;
RSYNC = 8;
IRODS = 9;
}
message Param {
......
biomaj3-download (3.0.14-2) UNRELEASED; urgency=medium
biomaj3-download (3.0.18-1) unstable; urgency=medium
[ Jelmer Vernooij ]
* Use secure copyright file specification URI.
* Trim trailing whitespace.
-- Jelmer Vernooij <jelmer@debian.org> Sat, 20 Oct 2018 13:15:44 +0000
[ Olivier Sallou ]
* New upstream release
-- Olivier Sallou <osallou@debian.org> Thu, 25 Oct 2018 08:52:07 +0000
biomaj3-download (3.0.14-1) unstable; urgency=medium
......
......@@ -25,8 +25,8 @@ Build-Depends: debhelper (>= 9), dh-python,
python3-biomaj3-zipkin
Standards-Version: 4.1.3
Homepage: https://github.com/genouest/biomaj-download
Vcs-Browser: https://anonscm.debian.org/cgit/debian-med/biomaj3-download.git
Vcs-Git: https://anonscm.debian.org/git/debian-med/biomaj3-download.git
Vcs-Browser: https://salsa.debian.org/med-team/biomaj3-download
Vcs-Git: https://salsa.debian.org/med-team/biomaj3-download.git
Package: python3-biomaj3-download
Architecture: all
......
......@@ -7,7 +7,7 @@ export PYBUILD_NAME=biomaj-download
dh $@ --with python3 --buildsystem=pybuild
override_dh_auto_build:
cd biomaj_download/message && protoc --python_out=. message.proto
cd biomaj_download/message && protoc --python_out=. downmessage.proto
dh_auto_build
override_dh_install:
......
......@@ -21,7 +21,7 @@ config = {
'url': 'http://biomaj.genouest.org',
'download_url': 'http://biomaj.genouest.org',
'author_email': 'olivier.sallou@irisa.fr',
'version': '3.0.14',
'version': '3.0.18',
'classifiers': [
# How mature is this project? Common values are
# 3 - Alpha
......@@ -54,7 +54,8 @@ config = {
'prometheus_client>=0.0.18',
'protobuf',
'requests',
'humanfriendly'
'humanfriendly',
'python-irodsclient'
],
'tests_require': ['nose', 'mock'],
'test_suite': 'nose.collector',
......
......@@ -23,6 +23,9 @@ from biomaj_download.download.http import HTTPDownload, HTTPParse
from biomaj_download.download.localcopy import LocalDownload
from biomaj_download.download.downloadthreads import DownloadThread
from biomaj_download.download.rsync import RSYNCDownload
from biomaj_download.download.protocolirods import IRODSDownload
import pprint
import unittest
......@@ -552,3 +555,106 @@ class TestBiomajRSYNCDownload(unittest.TestCase):
rsyncd.match([r'^/bank/test*'], file_list, dir_list, prefix='')
rsyncd.download(self.utils.data_dir)
self.assertTrue(len(rsyncd.files_to_download) == 3)
class iRodsResult(object):
def __init__(self, collname, dataname, datasize, owner, modify):
self.Collname = 'tests/'
self.Dataname = 'test.fasta.gz'
self.Datasize = 45
self.Dataowner_name = 'biomaj'
self.Datamodify_time = '2017-04-10 00:00:00'
def __getitem__(self, index):
from irods.models import Collection, DataObject, User
if index.icat_id == DataObject.modify_time.icat_id:
return self.Datamodify_time
elif "DATA_SIZE" in str(index):
return self.Datasize
elif "DATA_NAME" in str(index):
return 'test.fasta.gz'
elif "COLL_NAME" in str(index):
return self.Collname
elif "D_OWNER_NAME" in str(index):
return self.Dataowner_name
class MockiRODSSession(object):
'''
Simulation of python irods client
for result in session.query(Collection.name, DataObject.name, DataObject.size, DataObject.owner_name, DataObject.modify_time).filter(User.name == self.user).get_results():
'''
def __init__(self):
self.Collname="1"
self.Dataname="2"
self.Datasize="3"
self.Dataowner_name="4"
self.Datamodify_time="5"
self.Collid=""
def __getitem__(self, index):
from irods.data_object import iRODSDataObject
from irods.models import Collection, DataObject, User
print(index)
if "COLL_ID" in str(index):
return self.Collid
if "COLL_NAME" in str(index):
return self.Collname
def configure(self):
return MockiRODSSession()
def query(self,Collname, Dataname, Datasize, Dataowner_name, Datamodify_time):
return self
def all(self):
return self
def one(self):
return self
def filter(self,boo):
return self
def get_results(self):
get_result_dict= iRodsResult('tests/', 'test.fasta.gz', 45, 'biomaj', '2017-04-10 00:00:00')
return [get_result_dict]
def cleanup(self):
return self
def open(self,r):
my_test_file = open("tests/test.fasta.gz", "r+")
return(my_test_file)
@attr('irods')
@attr('roscoZone')
@attr('network')
class TestBiomajIRODSDownload(unittest.TestCase):
'''
Test IRODS downloader
'''
def setUp(self):
self.utils = UtilsForTest()
self.curdir = os.path.dirname(os.path.realpath(__file__))
self.examples = os.path.join(self.curdir,'bank') + '/'
BiomajConfig.load_config(self.utils.global_properties, allow_user_config=False)
def tearDown(self):
self.utils.clean()
@patch('irods.session.iRODSSession.configure')
@patch('irods.session.iRODSSession.query')
@patch('irods.session.iRODSSession.cleanup')
def test_irods_list(self,initialize_mock, query_mock,cleanup_mock):
mock_session=MockiRODSSession()
initialize_mock.return_value=mock_session.configure()
query_mock.return_value = mock_session.query(None,None,None,None,None)
cleanup_mock.return_value = mock_session.cleanup()
irodsd = IRODSDownload('irods', self.examples, "")
irodsd.set_credentials(None)
irodsd.set_offline_dir(self.utils.data_dir)
(files_list, dir_list) = irodsd.list()
self.assertTrue(len(files_list) != 0)