Skip to content
Commits on Source (9)
......@@ -35,6 +35,10 @@ install:
- rm -rf ~/install/bcbio-vm/anaconda/lib/python2.7/site-packages/bcbio_nextgen-*
- ~/install/bcbio-vm/anaconda/bin/python setup.py install
jobs:
include:
- stage: test
name: "Variant standard"
script:
# -- Prepare variant docker image
- docker pull quay.io/bcbio/bcbio-vc
......@@ -45,20 +49,28 @@ script:
# -- Standard bcbio variant tests
- docker run -v `pwd`:`pwd` quay.io/bcbio/bcbio-vc bash -c "cd `pwd` && /usr/local/share/bcbio-nextgen/anaconda/bin/py.test -p no:cacheprovider -p no:stepwise tests/unit --cov=bcbio"
- py.test -p no:cacheprovider -p no:stepwise tests/bcbio_vm -v -m docker_multicore
# -- bcbio variant CWL tests
# XXX Arvados tests failing with 404 when contacting from Travis
# - sudo mkdir -p /etc/pki/tls/certs && sudo ln -s /etc/ssl/certs/ca-certificates.crt /etc/pki/tls/certs/ca-bundle.crt
# - py.test -p no:cacheprovider -p no:stepwise tests/bcbio_vm -v -s -m cwl_arvados
- stage: test
name: "Variant CWL"
script:
- docker pull quay.io/bcbio/bcbio-vc
- docker images
- df -h
# Update to latest bcbio-nextgen code within the container
- bcbio_vm.py devel setup_install -i quay.io/bcbio/bcbio-vc
# XXX Currently commented out joint test, taking too long and causing Travis timeouts
# - py.test -p no:cacheprovider -p no:stepwise tests/bcbio_vm -v -s -m cwl_docker_joint
- py.test -p no:cacheprovider -p no:stepwise tests/bcbio_vm -v -s -m cwl_docker_somatic
# -- platform integration
- sudo mkdir -p /etc/pki/tls/certs && sudo ln -s /etc/ssl/certs/ca-certificates.crt /etc/pki/tls/certs/ca-bundle.crt
# XXX Arvados tests failing with 404 when contacting from Travis
#- py.test -p no:cacheprovider -p no:stepwise tests/bcbio_vm -v -s -m cwl_arvados
# -- Cleanup variant docker image
- docker ps -a -q | xargs --no-run-if-empty docker rm
- docker rmi -f quay.io/bcbio/bcbio-vc
- docker images | grep '<none>' | awk '{print $3}' | xargs --no-run-if-empty docker rmi
- docker images
# -- bcbio RNA-seq CWL tests
- stage: test
name: "RNA-seq CWL"
script:
- docker pull quay.io/bcbio/bcbio-rnaseq
- docker images
- df -h
......
## 1.1.3 (29 January 2019)
- CNV: support background inputs for CNVkit, GATK4 CNV and seq2c. Allows
pre-computed panel of normals for tumor-only or single sample CNV calling.
- variant: avoid race condition on processing input BED files for variant
calling when no pre-specific variant_regions available.
- structural variation upload: avoid uploading multiple batched calls into
sample directories. For lumpy will now have a single output per batch in a
sample folder.
- install: respect pre-specified bioconda and conda-forge in condarc
configuration. Allows use of custom package mirrors.
- seq2c: move specialized pre-call calculation upstream to coverage estimation.
Allows use of seq2c in CWL runs.
- MultiQC upload: fix bug where results from parallel run not moved to final
directory.
- GATK4 CNV: fix for standardize VCF output, correcting number of columns.
- RNA-seq variation: fix for over-filtering variants near splice junctions with
STAR.
- Structural variant gene annotation: simplify and handle issues with
multidirectional comparisons. Handle issues with out of order start/end from CNVkit.
- Catch and report unicode characters in templating or YAML descriptions.
## 1.1.2 (12 December 2018)
- VarDict low frequency somatic filters: generalize strand and mismatch based
filter based on cross-validation to avoid over filtering on high depth panels.
- strelka2 joint calling: switch to improved gvcfgenotyper approach for calling
from gVCFs.
- Heterogeneity: initial support for PureCN and TitanCNA heterogeneity analysis
including reporting on LOH in HLA for human samples. Work in progress validations:
https://github.com/bcbio/bcbio_validations/tree/master/TCGA-heterogeneity
- CNV: initial support for GATK4 CNV calling as alternative to CNVkit for
tumor normal analyses
- VarDict RNA-seq variant calling: avoid structural variants with recent vardict-java.
- RNA-seq variation: filter RNA-seq variants close to splice junctions,
supporting STAR and hisat2.
......
"""Functionality to query and extract information from aligned BAM files.
"""
from __future__ import print_function
import collections
import os
import itertools
......@@ -31,7 +32,8 @@ def is_empty(bam_file):
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
preexec_fn=lambda: signal.signal(signal.SIGPIPE, signal.SIG_DFL))
stdout, stderr = p.communicate()
stderr = stderr.strip()
stdout = stdout.decode()
stderr = stderr.decode()
if ((p.returncode == 0 or p.returncode == 141) and
(stderr == "" or (stderr.startswith("gof3r") and stderr.endswith("broken pipe")))):
return int(stdout) == 0
......@@ -53,6 +55,8 @@ def is_paired(bam_file):
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
preexec_fn=lambda: signal.signal(signal.SIGPIPE, signal.SIG_DFL))
stdout, stderr = p.communicate()
stdout = stdout.decode()
stderr = stderr.decode()
stderr = stderr.strip()
if ((p.returncode == 0 or p.returncode == 141) and
(stderr == "" or (stderr.startswith("gof3r") and stderr.endswith("broken pipe")))):
......@@ -108,7 +112,7 @@ def idxstats(in_bam, data):
index(in_bam, data["config"], check_timestamp=False)
AlignInfo = collections.namedtuple("AlignInfo", ["contig", "length", "aligned", "unaligned"])
samtools = config_utils.get_program("samtools", data["config"])
idxstats_out = subprocess.check_output([samtools, "idxstats", in_bam])
idxstats_out = subprocess.check_output([samtools, "idxstats", in_bam]).decode()
out = []
for line in idxstats_out.split("\n"):
if line.strip():
......
......@@ -10,6 +10,7 @@ genome and avoid extremes of large blocks or large numbers of
small blocks.
"""
import collections
from functools import reduce
import os
import numpy
......
......@@ -5,7 +5,7 @@ http://www.ebi.ac.uk/ena/about/cram_toolkit
import os
import subprocess
from bcbio import utils
from bcbio import bam, utils
from bcbio.provenance import do
from bcbio.pipeline import datadict as dd
from bcbio.pipeline import config_utils
......@@ -21,7 +21,8 @@ def compress(in_bam, data):
Otherwise does `cram-lossless` which only converts to CRAM.
"""
out_file = "%s.cram" % os.path.splitext(in_bam)[0]
out_dir = utils.safe_makedir(os.path.join(dd.get_work_dir(data), "archive"))
out_file = os.path.join(out_dir, "%s.cram" % os.path.splitext(os.path.basename(in_bam))[0])
cores = dd.get_num_cores(data)
ref_file = dd.get_ref_file(data)
if not utils.file_exists(out_file):
......@@ -61,3 +62,13 @@ def index(in_cram, config):
cmd = "samtools index {tx_in_file}"
do.run(cmd.format(**locals()), "Index CRAM file")
return out_file
def to_bam(in_file, out_file, data):
"""Convert CRAM file into BAM.
"""
if not utils.file_uptodate(out_file, in_file):
with file_transaction(data, out_file) as tx_out_file:
cmd = ["samtools", "view", "-O", "BAM", "-o", tx_out_file, in_file]
do.run(cmd, "Convert CRAM to BAM")
bam.index(out_file, data["config"])
return out_file
"""Utilities for working with fastq files.
"""
import six
from six.moves import zip
from itertools import product
import os
import random
import gzip
import sys
from Bio import SeqIO
......@@ -226,7 +226,7 @@ def downsample(f1, f2, data, N, quick=False):
out_files = (outf1, outf2) if outf2 else (outf1)
with file_transaction(out_files) as tx_out_files:
if isinstance(tx_out_files, basestring):
if isinstance(tx_out_files, six.string_types):
tx_out_f1 = tx_out_files
else:
tx_out_f1, tx_out_f2 = tx_out_files
......@@ -258,11 +258,11 @@ def estimate_read_length(fastq_file, quality_format="fastq-sanger", nreads=1000)
"""
in_handle = SeqIO.parse(open_fastq(fastq_file), quality_format)
read = in_handle.next()
read = next(in_handle)
average = len(read.seq)
for _ in range(nreads):
try:
average = (average + len(in_handle.next().seq)) / 2
average = (average + len(next(in_handle).seq)) / 2
except StopIteration:
break
in_handle.close()
......@@ -277,7 +277,7 @@ def estimate_maximum_read_length(fastq_file, quality_format="fastq-sanger",
lengths = []
for _ in range(nreads):
try:
lengths.append(len(in_handle.next().seq))
lengths.append(len(next(in_handle).seq))
except StopIteration:
break
in_handle.close()
......@@ -288,10 +288,5 @@ def open_fastq(in_file):
"""
if objectstore.is_remote(in_file):
return objectstore.open_file(in_file)
_, ext = os.path.splitext(in_file)
if ext == ".gz":
return gzip.open(in_file, 'rb')
if ext in [".fastq", ".fq"]:
return open(in_file, 'r')
# default to just opening it
return open(in_file, "r")
else:
return utils.open_gzipsafe(in_file)
......@@ -341,7 +341,12 @@ class BroadRunner:
["-jar", gatk_jar] + [str(x) for x in params]
def run_gatk(self, params, tmp_dir=None, log_error=True,
data=None, region=None, memscale=None, parallel_gc=False):
data=None, region=None, memscale=None, parallel_gc=False, ld_preload=False):
"""Top level interface to running a GATK command.
ld_preload injects required libraries for Java JNI calls:
https://gatkforums.broadinstitute.org/gatk/discussion/8810/something-about-create-pon-workflow
"""
needs_java7 = LooseVersion(self.get_gatk_version()) < LooseVersion("3.6")
# For old Java requirements use global java 7
if needs_java7:
......@@ -354,6 +359,8 @@ class BroadRunner:
else params.index("--analysis_type")
prog = params[atype_index + 1]
cl = fix_missing_spark_user(cl, prog, params)
if ld_preload:
cl = "export LD_PRELOAD=%s/lib/libopenblas.so && %s" % (os.path.dirname(utils.get_bcbio_bin()), cl)
do.run(cl, "GATK: {0}".format(prog), data, region=region,
log_error=log_error)
if needs_java7:
......
"""Create Common Workflow Language (CWL) runnable files and tools from a world object.
"""
from __future__ import print_function
import collections
import copy
import dateutil
......@@ -11,6 +12,7 @@ import os
import tarfile
import requests
import six
import toolz as tz
import yaml
......@@ -19,6 +21,7 @@ from bcbio.cwl import defs, workflow
from bcbio.distributed import objectstore, resources
from bcbio.distributed.transaction import file_transaction
from bcbio.pipeline import alignment
from functools import reduce
INTEGRATION_MAP = {"keep:": "arvados", "s3:": "s3", "sbg:": "sbgenomics",
"dx:": "dnanexus", "gs:": "gs"}
......@@ -399,7 +402,7 @@ def _get_cur_remotes(path):
elif isinstance(path, dict):
for v in path.values():
cur_remotes |= _get_cur_remotes(v)
elif path and isinstance(path, basestring):
elif path and isinstance(path, six.string_types):
if path.startswith(tuple(INTEGRATION_MAP.keys())):
cur_remotes.add(INTEGRATION_MAP.get(path.split(":")[0] + ":"))
return cur_remotes
......@@ -568,9 +571,9 @@ def _get_relative_ext(of, sf):
os.path.basename(orig).count(".") == os.path.basename(prefix).count("."))
# Handle remote files
if of.find(":") > 0:
of = of.split(":")[-1]
of = os.path.basename(of.split(":")[-1])
if sf.find(":") > 0:
sf = sf.split(":")[-1]
sf = os.path.basename(sf.split(":")[-1])
prefix = os.path.commonprefix([sf, of])
while prefix.endswith(".") or (half_finished_trim(sf, prefix) and half_finished_trim(of, prefix)):
prefix = prefix[:-1]
......@@ -578,7 +581,7 @@ def _get_relative_ext(of, sf):
ext_to_add = sf.replace(prefix, "")
# Return extensions relative to original
if not exts_to_remove or exts_to_remove.startswith("."):
return "^" * exts_to_remove.count(".") + ext_to_add
return str("^" * exts_to_remove.count(".") + ext_to_add)
else:
raise ValueError("No cross platform way to reference complex extension: %s %s" % (sf, of))
......@@ -621,7 +624,7 @@ def _get_avro_type(val):
elif val is None:
return ["null"]
# encode booleans as string True/False and unencode on other side
elif isinstance(val, bool) or isinstance(val, basestring) and val.lower() in ["true", "false", "none"]:
elif isinstance(val, bool) or isinstance(val, six.string_types) and val.lower() in ["true", "false", "none"]:
return ["string", "null", "boolean"]
elif isinstance(val, int):
return "long"
......@@ -700,7 +703,11 @@ def _to_cwldata(key, val, get_retriever):
def _remove_remote_prefix(f):
"""Remove any remote references to allow object lookups by file paths.
"""
return f.split(":")[-1] if objectstore.is_remote(f) else f
return f.split(":")[-1].split("/", 1)[1] if objectstore.is_remote(f) else f
def _index_blacklist(xs):
blacklist = ["-resources.yaml"]
return [x for x in xs if not any([x.find(b) >=0 for b in blacklist])]
def _to_cwlfile_with_indexes(val, get_retriever):
"""Convert reads with ready to go indexes into the right CWL object.
......@@ -711,9 +718,7 @@ def _to_cwlfile_with_indexes(val, get_retriever):
Skips doing this for reference files and standard setups like bwa, which
take up too much time and space to unpack multiple times.
"""
if val["base"].endswith(".fa") and any([x.endswith(".fa.fai") for x in val["indexes"]]):
return _item_to_cwldata(val["base"], get_retriever)
else:
val["indexes"] = _index_blacklist(val["indexes"])
tval = {"base": _remove_remote_prefix(val["base"]),
"indexes": [_remove_remote_prefix(f) for f in val["indexes"]]}
# Standard named set of indices, like bwa
......@@ -742,7 +747,7 @@ def _item_to_cwldata(x, get_retriever, indexes=None):
"""
if isinstance(x, (list, tuple)):
return [_item_to_cwldata(subx, get_retriever) for subx in x]
elif (x and isinstance(x, basestring) and
elif (x and isinstance(x, six.string_types) and
(((os.path.isfile(x) or os.path.isdir(x)) and os.path.exists(x)) or
objectstore.is_remote(x))):
if _file_local_or_remote(x, get_retriever):
......@@ -751,6 +756,8 @@ def _item_to_cwldata(x, get_retriever, indexes=None):
out = _add_secondary_if_exists(indexes, out, get_retriever)
elif x.endswith(".bam"):
out = _add_secondary_if_exists([x + ".bai"], out, get_retriever)
elif x.endswith(".cram"):
out = _add_secondary_if_exists([x + ".crai"], out, get_retriever)
elif x.endswith((".vcf.gz", ".bed.gz")):
out = _add_secondary_if_exists([x + ".tbi"], out, get_retriever)
elif x.endswith(".fa"):
......
......@@ -10,6 +10,7 @@ import os
import pprint
import tarfile
import six
import toolz as tz
from bcbio import bam, utils
......@@ -57,7 +58,7 @@ def normalize_missing(xs):
xs[k] = normalize_missing(v)
elif isinstance(xs, (list, tuple)):
xs = [normalize_missing(x) for x in xs]
elif isinstance(xs, basestring):
elif isinstance(xs, six.string_types):
if xs.lower() in ["none", "null"]:
xs = None
elif xs.lower() == "true":
......@@ -80,7 +81,7 @@ def unpack_tarballs(xs, data, use_subdir=True):
xs[k] = unpack_tarballs(v, data, use_subdir)
elif isinstance(xs, (list, tuple)):
xs = [unpack_tarballs(x, data, use_subdir) for x in xs]
elif isinstance(xs, basestring):
elif isinstance(xs, six.string_types):
if os.path.isfile(xs.encode("utf-8", "ignore")) and xs.endswith("-wf.tar.gz"):
if use_subdir:
tarball_dir = utils.safe_makedir(os.path.join(dd.get_work_dir(data), "wf-inputs"))
......
......@@ -242,7 +242,7 @@ def _variant_vc(checkpoints):
cwlout(["validate", "grading_summary"], ["File", "null"]),
cwlout(["validate", "grading_plots"], {"type": "array", "items": ["File", "null"]})],
"bcbio-vc",
disk={"files": 0.5}, cores=1)]
disk={"files": 2.0}, cores=1)]
return vc, [["validate", "grading_summary"], ["variants", "calls"], ["variants", "gvcf"]]
def _variant_ensemble(checkpoints):
......@@ -312,6 +312,7 @@ def _variant_checkpoints(samples):
checkpoints["align_split"] = not all([(dd.get_align_split_size(d) is False or
not dd.get_aligner(d))
for d in samples])
checkpoints["archive"] = any([dd.get_archive(d) for d in samples])
checkpoints["umi"] = any([dd.get_umi_consensus(d) for d in samples])
checkpoints["ensemble"] = any([dd.get_ensemble(d) for d in samples])
checkpoints["cancer"] = any(dd.get_phenotype(d) in ["tumor"] for d in samples)
......@@ -321,6 +322,9 @@ def _postprocess_alignment(checkpoints):
wf = [s("prep_samples_to_rec", "multi-combined",
[["config", "algorithm", "coverage"],
["rgnames", "sample"],
["config", "algorithm", "background", "cnv_reference"],
["config", "algorithm", "svcaller"],
["config", "algorithm", "sv_regions"],
["config", "algorithm", "variant_regions"],
["reference", "fasta", "base"]],
[cwlout("prep_samples_rec", "record")],
......@@ -340,6 +344,7 @@ def _postprocess_alignment(checkpoints):
disk={"files": 0.5}, cores=1),
s("postprocess_alignment_to_rec", "multi-combined",
[["align_bam"],
["config", "algorithm", "archive"],
["config", "algorithm", "coverage_interval"],
["config", "algorithm", "exclude_regions"],
["config", "algorithm", "variant_regions"],
......@@ -394,6 +399,13 @@ def _postprocess_alignment(checkpoints):
"bcbio-vc", ["bedtools", "htslib", "gatk4"],
disk={"files": 0.5}, cores=1)]
out = [["regions", "sample_callable"]]
if checkpoints.get("archive"):
wf += [s("archive_to_cram", "multi-parallel",
[["postprocess_alignment_rec"]],
[cwlout(["archive_bam"], ["File", "null"], [".crai"])],
"bcbio-vc", ["samtools"],
disk={"files": 3.0})]
out += [["archive_bam"]]
return wf, out
def variant(samples):
......@@ -515,6 +527,7 @@ def _variant_sv(checkpoints):
disk={"files": 2.0})]
sv_batch_inputs = [["analysis"], ["genome_build"],
["work_bam_plus", "disc"], ["work_bam_plus", "sr"],
["config", "algorithm", "background", "cnv_reference"],
["config", "algorithm", "tools_on"],
["config", "algorithm", "tools_off"],
["config", "algorithm", "svprioritize"],
......@@ -528,12 +541,14 @@ def _variant_sv(checkpoints):
steps = [s("calculate_sv_bins", "multi-combined",
[["align_bam"], ["reference", "fasta", "base"],
["metadata", "batch"], ["metadata", "phenotype"],
["config", "algorithm", "background", "cnv_reference"],
["config", "algorithm", "callable_regions"],
["config", "algorithm", "coverage_interval"],
["config", "algorithm", "exclude_regions"],
["config", "algorithm", "sv_regions"],
["config", "algorithm", "variant_regions"],
["config", "algorithm", "variant_regions_merged"],
["config", "algorithm", "seq2c_bed_ready"],
["config", "algorithm", "svcaller"],
["depth", "variant_regions", "regions"],
["genome_resources", "variation", "lcr"], ["genome_resources", "variation", "polyx"],
......@@ -552,8 +567,9 @@ def _variant_sv(checkpoints):
[cwlout("sv_rawcoverage_rec", "record",
fields=[cwlout(["depth", "bins", "target"], ["File", "null"]),
cwlout(["depth", "bins", "antitarget"], ["File", "null"]),
cwlout(["depth", "bins", "seq2c"], ["File", "null"]),
cwlout("inherit")])],
"bcbio-vc", ["mosdepth", "cnvkit"],
"bcbio-vc", ["mosdepth", "cnvkit", "seq2c"],
disk={"files": 1.5}),
s("normalize_sv_coverage", "multi-combined",
[["sv_rawcoverage_rec"]],
......@@ -576,7 +592,7 @@ def _variant_sv(checkpoints):
cwlout(["sv", "prioritize", "raw"], {"type": "array", "items": ["File", "null"]}),
cwlout(["svvalidate", "grading_summary"], ["File", "null"]),
cwlout(["svvalidate", "grading_plots"], {"type": "array", "items": ["File", "null"]})],
"bcbio-vc", disk={"files": 1.0}, cores=1)]
"bcbio-vc", ["bcbio-prioritize"], disk={"files": 1.0}, cores=1)]
final_outputs = [["sv", "calls"], ["svvalidate", "grading_summary"], ["sv", "prioritize", "tsv"],
["sv", "prioritize", "raw"], ["sv", "supplemental"]]
return steps, final_outputs
......
......@@ -24,9 +24,9 @@ def create_cromwell_config(args, work_dir, sample_file):
"joblimit": "concurrent-job-limit = %s" % (joblimit) if joblimit > 0 else "",
"cwl_attrs": "\n ".join(cwl_attrs),
"filesystem": _get_filesystem_config(file_types),
"database": run_config.get("database", DATABASE_CONFIG % {"work_dir": work_dir}),
"engine": _get_engine_filesystem_config(file_types, args)}
"database": run_config.get("database", DATABASE_CONFIG % {"work_dir": work_dir})}
cl_args, conf_args, scheduler, cloud_type = _args_to_cromwell(args)
std_args["engine"] = _get_engine_filesystem_config(file_types, args, conf_args)
conf_args.update(std_args)
main_config = {"hpc": (HPC_CONFIGS[scheduler] % conf_args) if scheduler else "",
"cloud": (CLOUD_CONFIGS[cloud_type] % conf_args) if cloud_type else "",
......@@ -110,10 +110,20 @@ def _args_to_cromwell(args):
if args.cloud_project:
if args.cloud_root and args.cloud_root.startswith("gs:"):
cloud_type = "PAPI"
cloud_root = args.cloud_root
cloud_region = None
elif ((args.cloud_root and args.cloud_root.startswith("s3:")) or
(args.cloud_project and args.cloud_project.startswith("arn:"))):
cloud_type = "AWSBATCH"
cloud_root = args.cloud_root
if not cloud_root.startswith("s3://"):
cloud_root = "s3://%s" % cloud_root
# split region from input Amazon Resource Name, ie arn:aws:batch:us-east-1:
cloud_region = args.cloud_project.split(":")[3]
else:
raise ValueError("Unexpected inputs for Cromwell Cloud support: %s %s" %
(args.cloud_project, args.cloud_root))
config = {"cloud_project": args.cloud_project, "cloud_root": args.cloud_root}
config = {"cloud_project": args.cloud_project, "cloud_root": cloud_root, "cloud_region": cloud_region}
cl.append("-Dbackend.default=%s" % cloud_type)
return cl, config, args.scheduler, cloud_type
......@@ -126,6 +136,8 @@ def _get_filesystem_types(args, sample_file):
for f in _get_file_paths(json.load(in_handle)):
if f.startswith("gs:"):
out.add("gcp%s" % ext)
elif f.startswith("s3:"):
out.add("s3%s" % ext)
elif f.startswith(("https:", "http:")):
out.add("http%s" % ext)
else:
......@@ -137,6 +149,7 @@ def _get_filesystem_config(file_types):
"""
out = " filesystems {\n"
for file_type in sorted(list(file_types)):
if file_type in _FILESYSTEM_CONFIG:
out += _FILESYSTEM_CONFIG[file_type]
out += " }\n"
return out
......@@ -196,14 +209,16 @@ database {
}
"""
def _get_engine_filesystem_config(file_types, args):
def _get_engine_filesystem_config(file_types, args, conf_args):
"""Retriever authorization and engine filesystem configuration.
"""
file_types = [x.replace("_container", "") for x in list(file_types)]
out = ""
if "gcp" in file_types:
out += _AUTH_CONFIG_GOOGLE
if "gcp" in file_types or "http" in file_types:
if "s3" in file_types:
out += _AUTH_CONFIG_AWS % conf_args["cloud_region"]
if "gcp" in file_types or "http" in file_types or "s3" in file_types:
out += "engine {\n"
out += " filesystems {\n"
if "gcp" in file_types:
......@@ -214,10 +229,23 @@ def _get_engine_filesystem_config(file_types, args):
out += ' }\n'
if "http" in file_types:
out += ' http {}\n'
if "s3" in file_types:
out += ' s3 { auth = "default" }'
out += " }\n"
out += "}\n"
return out
_AUTH_CONFIG_AWS = """
aws {
application-name = "cromwell"
auths = [{
name = "default"
scheme = "default"
}]
region = "%s"
}
"""
_AUTH_CONFIG_GOOGLE = """
google {
......@@ -496,5 +524,26 @@ CLOUD_CONFIGS = {
}
}
}
""",
"AWSBATCH": """
AWSBATCH {
actor-factory = "cromwell.backend.impl.aws.AwsBatchBackendLifecycleActorFactory"
config {
root = "%(cloud_root)s/cromwell-execution"
auth = "default"
numSubmitAttempts = 3
numCreateDefinitionAttempts = 3
default-runtime-attributes {
queueArn: "%(cloud_project)s"
}
filesystems {
s3 {
auth = "default"
}
}
}
}
"""
}
......@@ -253,6 +253,15 @@ def _cromwell_move_outputs(metadata, final_dir):
elif len(vals) > 0:
raise ValueError("Unexpected sample and outputs: %s %s %s" % (k, samples, vals))
def _run_sbgenomics(args):
"""Run CWL on SevenBridges platform and Cancer Genomics Cloud.
"""
assert not args.no_container, "Seven Bridges runs require containers"
main_file, json_file, project_name = _get_main_and_json(args.directory)
flags = []
cmd = ["sbg-cwl-runner"] + flags + args.toolargs + [main_file, json_file]
_run_tool(cmd)
def _run_funnel(args):
"""Run funnel TES server with rabix bunny for CWL.
"""
......@@ -293,6 +302,7 @@ def _run_funnel(args):
_TOOLS = {"cwltool": _run_cwltool,
"cromwell": _run_cromwell,
"arvados": _run_arvados,
"sbg": _run_sbgenomics,
"toil": _run_toil,
"bunny": _run_bunny,
"funnel": _run_funnel,
......
......@@ -3,6 +3,7 @@
import copy
import pprint
import six
import toolz as tz
from bcbio.pipeline import alignment
......@@ -233,7 +234,7 @@ def _flatten_nested_input(v):
for x in v["type"]:
if isinstance(x, dict) and x["type"] == "array":
new_type = x["items"]
elif isinstance(x, basestring) and x == "null":
elif isinstance(x, six.string_types) and x == "null":
want_null = True
else:
new_type = x
......@@ -271,7 +272,7 @@ def _clean_output(v):
return out
def _get_string_vid(vid):
if isinstance(vid, basestring):
if isinstance(vid, six.string_types):
return vid
assert isinstance(vid, (list, tuple)), vid
return "__".join(vid)
......@@ -279,7 +280,7 @@ def _get_string_vid(vid):
def _get_variable(vid, variables):
"""Retrieve an input variable from our existing pool of options.
"""
if isinstance(vid, basestring):
if isinstance(vid, six.string_types):
vid = get_base_id(vid)
else:
vid = _get_string_vid(vid)
......@@ -295,6 +296,7 @@ def _handle_special_inputs(inputs, variables):
XXX Need to better expose this at a top level definition.
"""
from bcbio import structural
optional = [["config", "algorithm", "coverage"],
["config", "algorithm", "variant_regions"],
["config", "algorithm", "sv_regions"],
......@@ -316,6 +318,12 @@ def _handle_special_inputs(inputs, variables):
out.append(vid)
found_indexes = True
assert found_indexes, "Found no snpEff indexes in %s" % [v["id"] for v in variables]
elif input == ["config", "algorithm", "background", "cnv_reference"]:
for v in variables:
vid = get_base_id(v["id"]).split("__")
if (vid[:4] == ["config", "algorithm", "background", "cnv_reference"] and
structural.supports_cnv_reference(vid[4])):
out.append(vid)
elif input in optional:
if _get_string_vid(input) in all_vs:
out.append(input)
......@@ -418,7 +426,7 @@ def _create_variable(orig_v, step, variables):
v = _get_variable(orig_v["id"], variables)
except ValueError:
v = copy.deepcopy(orig_v)
if not isinstance(v["id"], basestring):
if not isinstance(v["id"], six.string_types):
v["id"] = _get_string_vid(v["id"])
for key, val in orig_v.items():
if key not in ["id", "type"]:
......
......@@ -83,7 +83,7 @@ def run_multicore(fn, items, config, parallel=None):
if joblib is None:
raise ImportError("Need joblib for multiprocessing parallelization")
out = []
for data in joblib.Parallel(parallel["num_jobs"], batch_size=1, backend="multiprocessing")(joblib.delayed(fn)(x) for x in items):
for data in joblib.Parallel(parallel["num_jobs"], batch_size=1, backend="multiprocessing")(joblib.delayed(fn)(*x) for x in items):
if data:
out.extend(data)
return out
......@@ -23,12 +23,6 @@ SUPPORTED_REMOTES = ("s3://",)
BIODATA_INFO = {"s3": "s3://biodata/prepped/{build}/{build}-{target}.tar.gz"}
REGIONS_NEWPERMS = {"s3": ["eu-central-1"]}
if six.PY3:
BIGNUM = float("inf")
else:
BIGNUM = sys.maxint
@six.add_metaclass(abc.ABCMeta)
class FileHandle(object):
......@@ -87,7 +81,7 @@ class FileHandle(object):
pass
@abc.abstractmethod
def read(self, size=BIGNUM):
def read(self, size=sys.maxsize):
"""Read at most size bytes from the file (less if the read hits EOF
before obtaining size bytes).
"""
......@@ -122,7 +116,7 @@ class S3Handle(FileHandle):
for chunk in self._key:
yield self._decompress(chunk)
def read(self, size=BIGNUM):
def read(self, size=sys.maxsize):
"""Read at most size bytes from the file (less if the read hits EOF
before obtaining size bytes).
"""
......@@ -130,7 +124,7 @@ class S3Handle(FileHandle):
def next(self):
"""Return the next item from the container."""
return self._iter.next()
return next(self._iter)
def close(self):
"""Close the file handle."""
......@@ -207,7 +201,7 @@ class BlobHandle(FileHandle):
blob_name=self._blob_name,
x_ms_range=range_id)
def read(self, size=BIGNUM):
def read(self, size=sys.maxsize):
"""Read at most size bytes from the file (less if the read hits EOF
before obtaining size bytes).
"""
......@@ -220,7 +214,7 @@ class BlobHandle(FileHandle):
def next(self):
"""Return the next item from the container."""
return self._iter.next()
return next(self._iter)
def close(self):
"""Close the file handle."""
......
......@@ -9,6 +9,7 @@ import operator
from bcbio.pipeline import config_utils
from bcbio.log import logger
from functools import reduce
def _get_resource_programs(progs, algs):
"""Retrieve programs used in analysis based on algorithm configurations.
......
......@@ -10,6 +10,7 @@ import os
import pprint
import shutil
import six
import toolz as tz
import yaml
......@@ -53,7 +54,7 @@ def process(args):
with utils.chdir(work_dir):
with contextlib.closing(log.setup_local_logging(parallel={"wrapper": "runfn"})):
try:
out = fn(fnargs)
out = fn(*fnargs)
except:
logger.exception()
raise
......@@ -127,7 +128,7 @@ def _add_resources(data, runtime):
data["config"] = {}
# Convert input resources, which may be a JSON string
resources = data.get("resources", {}) or {}
if isinstance(resources, basestring) and resources.startswith(("{", "[")):
if isinstance(resources, six.string_types) and resources.startswith(("{", "[")):
resources = json.loads(resources)
data["resources"] = resources
assert isinstance(resources, dict), (resources, data)
......@@ -559,11 +560,11 @@ def _file_and_exists(val, input_files):
def _to_cwl(val, input_files):
"""Convert a value into CWL formatted JSON, handling files and complex things.
"""
if isinstance(val, basestring):
if isinstance(val, six.string_types):
if _file_and_exists(val, input_files):
val = {"class": "File", "path": val}
secondary = []
for idx in [".bai", ".tbi", ".gbi", ".fai", ".db"]:
for idx in [".bai", ".tbi", ".gbi", ".fai", ".crai", ".db"]:
idx_file = val["path"] + idx
if _file_and_exists(idx_file, input_files):
secondary.append({"class": "File", "path": idx_file})
......@@ -578,11 +579,12 @@ def _to_cwl(val, input_files):
if cur_file.endswith(cwlutils.DIR_TARGETS):
if os.path.exists(cur_dir):
for fname in os.listdir(cur_dir):
if fname != cur_file:
if fname != cur_file and not os.path.isdir(os.path.join(cur_dir, fname))\
and fname != 'sbg.worker.log':
secondary.append({"class": "File", "path": os.path.join(cur_dir, fname)})
else:
for f in input_files:
if f.startswith(cur_dir) and f != cur_file:
if f.startswith(cur_dir) and f != cur_file and not os.path.isdir(f):
secondary.append({"class": "File", "path": f})
if secondary:
val["secondaryFiles"] = _remove_duplicate_files(secondary)
......@@ -592,7 +594,7 @@ def _to_cwl(val, input_files):
# File representation with secondary files
if "base" in val and "secondary" in val:
out = {"class": "File", "path": val["base"]}
secondary = [{"class": "File", "path": x} for x in val["secondary"]]
secondary = [{"class": "File", "path": x} for x in val["secondary"] if not os.path.isdir(x)]
if secondary:
out["secondaryFiles"] = _remove_duplicate_files(secondary)
val = out
......
......@@ -10,8 +10,11 @@ splitting specific code.
"""
import collections
import six
from bcbio import utils
def grouped_parallel_split_combine(args, split_fn, group_fn, parallel_fn,
parallel_name, combine_name,
file_key, combine_arg_keys,
......@@ -57,7 +60,7 @@ def parallel_split_combine(args, split_fn, parallel_fn,
split_args, combine_map, finished_out, extras = _get_split_tasks(args, split_fn, file_key,
split_outfile_i)
split_output = parallel_fn(parallel_name, split_args)
if isinstance(combiner, basestring):
if isinstance(combiner, six.string_types):
combine_args, final_args = _organize_output(split_output, combine_map,
file_key, combine_arg_keys)
parallel_fn(combiner, combine_args)
......@@ -110,7 +113,7 @@ def _organize_output(output, combine_map, file_key, combine_arg_keys):
extras.append([data])
combine_args = [[v, k] + _get_extra_args(extra_args[k], combine_arg_keys)
for (k, v) in out_map.items()]
return combine_args, final_args.values() + extras
return combine_args, list(final_args.values()) + extras
def _get_split_tasks(args, split_fn, file_key, outfile_i=-1):
"""Split up input files and arguments, returning arguments for parallel processing.
......@@ -137,4 +140,4 @@ def _get_split_tasks(args, split_fn, file_key, outfile_i=-1):
extras.append([data])
else:
extras.append([data])
return split_args, combine_map, finished_map.values(), extras
return split_args, combine_map, list(finished_map.values()), extras
......@@ -56,7 +56,7 @@ class GalaxyApiAccess:
details = self._get("/nglims/api_run_details", dict(run=run_bc))
except ValueError:
raise ValueError("Could not find information in Galaxy for run: %s" % run_bc)
if details.has_key("error") and run_date is not None:
if "error" in details and run_date is not None:
try:
details = self._get("/nglims/api_run_details", dict(run=run_date))
except ValueError:
......
......@@ -10,6 +10,7 @@ import os
import subprocess
import joblib
import six
import yaml
from bcbio import utils
......@@ -18,6 +19,7 @@ from bcbio.galaxy.api import GalaxyApiAccess
from bcbio.illumina import flowcell
from bcbio.pipeline.run_info import clean_name
from bcbio.workflow import template
from functools import reduce
def prep_samples_and_config(run_folder, ldetails, fastq_dir, config):
"""Prepare sample fastq files and provide global sample configuration for the flowcell.
......@@ -105,7 +107,7 @@ def _select_default_algorithm(analysis):
def _relative_paths(xs, base_path):
"""Adjust paths to be relative to the provided base path.
"""
if isinstance(xs, basestring):
if isinstance(xs, six.string_types):
if xs.startswith(base_path):
return xs.replace(base_path + "/", "", 1)
else:
......