Skip to content
Commits on Source (4)
......@@ -24,7 +24,6 @@ install:
- conda install --yes nomkl
- conda install --yes -c conda-forge -c bioconda bcbio-nextgen
- conda install --yes -c conda-forge -c bioconda bcbio-nextgen-vm
- conda install --yes -c conda-forge -c bioconda cwltool toil rabix-bunny
# Clean up space with external tools we don't need for tests
- conda clean --yes --tarballs --index-cache
- conda remove --yes --force qt
......@@ -44,14 +43,16 @@ script:
# Update to latest bcbio-nextgen code within the container
- bcbio_vm.py devel setup_install -i quay.io/bcbio/bcbio-vc
# -- Standard bcbio variant tests
- docker run -v `pwd`:`pwd` quay.io/bcbio/bcbio-vc bash -c "cd `pwd` && /usr/local/share/bcbio-nextgen/anaconda/bin/py.test -p no:cacheprovider tests/unit --cov=bcbio"
- py.test -p no:cacheprovider tests/bcbio_vm -v -m docker_multicore
- docker run -v `pwd`:`pwd` quay.io/bcbio/bcbio-vc bash -c "cd `pwd` && /usr/local/share/bcbio-nextgen/anaconda/bin/py.test -p no:cacheprovider -p no:stepwise tests/unit --cov=bcbio"
- py.test -p no:cacheprovider -p no:stepwise tests/bcbio_vm -v -m docker_multicore
# -- bcbio variant CWL tests
- py.test -p no:cacheprovider tests/bcbio_vm -v -s -m cwl_docker_joint
- py.test -p no:cacheprovider tests/bcbio_vm -v -s -m cwl_docker_somatic
# XXX Currently commented out joint test, taking too long and causing Travis timeouts
# - py.test -p no:cacheprovider -p no:stepwise tests/bcbio_vm -v -s -m cwl_docker_joint
- py.test -p no:cacheprovider -p no:stepwise tests/bcbio_vm -v -s -m cwl_docker_somatic
# -- platform integration
- sudo mkdir -p /etc/pki/tls/certs && sudo ln -s /etc/ssl/certs/ca-certificates.crt /etc/pki/tls/certs/ca-bundle.crt
- py.test -p no:cacheprovider tests/bcbio_vm -v -s -m cwl_arvados
# XXX Arvados tests failing with 404 when contacting from Travis
#- py.test -p no:cacheprovider -p no:stepwise tests/bcbio_vm -v -s -m cwl_arvados
# -- Cleanup variant docker image
- docker ps -a -q | xargs --no-run-if-empty docker rm
- docker rmi -f quay.io/bcbio/bcbio-vc
......@@ -62,7 +63,7 @@ script:
- docker images
- df -h
- bcbio_vm.py devel setup_install -i quay.io/bcbio/bcbio-rnaseq
- py.test -p no:cacheprovider tests/bcbio_vm -v -s -m cwl_docker_rnaseq
- py.test -p no:cacheprovider -p no:stepwise tests/bcbio_vm -v -s -m cwl_docker_rnaseq
# -- Cleanup RNA-seq docker image
- docker ps -a -q | xargs --no-run-if-empty docker rm
- docker rmi -f quay.io/bcbio/bcbio-rnaseq
......
## 1.1.2 (12 December 2018)
- VarDict low frequency somatic filters: generalize strand and mismatch based
filter based on cross-validation to avoid over filtering on high depth panels.
- strelka2 joint calling: switch to improved gvcfgenotyper approach for calling
from gVCFs.
- VarDict RNA-seq variant calling: avoid structural variants with recent vardict-java.
- RNA-seq variation: filter RNA-seq variants close to splice junctions,
supporting STAR and hisat2.
- RNA-seq variation: add snpEff effects to output variant calls. Thanks to Manasa Surakala.
- RNA-seq: gzip/bgzip FASTQ files in `work/fastq` instead of the original directory.
- use biobambam2 BAM to FASTQ conversion instead of Picard in all cases.
- Trimming: add built-in support for adapters from the SMARTer Universal Low Input RNA Kit
(truseq2) and the Illumina NEXTera DNA prep kit from NEB (nextera2).
- ChIP/ATAC-seq: allow skipping duplicate marking.
- joint calling: ensure correct upload to final directory when no annotations present
- Logging: fix logging in parallel runs with new joblib loky backend. Thanks to
Ben Liesfeld and Roland Ewald.
## 1.1.1 (6 November 2018)
- single-cell RNA-seq: add built-in support for 10x_v2.
- Fix UMI support for small RNA. Compatible with Qiagen UMI small RNA protocol.
- Ignore .Renviron when running Rscript to head-off PATH conflicts.
- Support SRR ids to download samples with bcbio_prepare_samples script.
- tumor-only prioritization: do not apply LowPriority filter by default, instead
annotate with external databases. Use `tumoronly_germline_filter` to re-enable
previous behavior.
- UMIs: apply default filtering based on de-duplicated read depth. Uses
`--min-reads 2` with raw de-duplicated coverage of 800 or more or `--min-reads 1`
otherwise. Allows error correction with UMIs for higher depth samples.
- gemini: databases no longer created by default. Use `tools_on: [gemini]` or
`tools_on: [gemini_orig]` to create a database. We now use a reduced database
for build 37 to match build 38 and make this forward compatible with CWL.
- vcfanno: run gemini and somatic annotations by default, producing annotated
VCFs with external information.
- alignment preparation: support a list of split files from multiple sequencing
lanes, merging into a single fastq
- variant: support octopus variant caller for germline and somatic samples.
- peddy: fix bug where not all files uploaded on first pipeline run
- peddy: For somatic analyses use separate germline calls for tumor/normal, if
available, or extracted germline calls from supported callers, instead of
somatic variants.
- GATK: support ploidy specification during joint calling.
- GATK BQSR: bin qualities into static groups (10, 20, 30) to match GATK4
recommendations. Thanks to Severine Catreux.
- GATK: support 4.0.10.0 which does not use UCSC 2bit references for Spark tools
- variant calling: support bcftools 1.9 which is more strict about duplicated
key names in INFO and FORMAT.
- seq2c: Upload global calls, coverage and read_mapping files to project directory.
- RNA-seq variant calling: Apply annotations after joint calling for GATK to
avoid import errors with GenomicsDB. Thanks to Komal Rathi.
- CWL: add `--cwl` target to bcbio_nextgen.py upgrade to add and maintain bcbio-vm.
- CWL: use standard null instead of string "null" for representing None values.
- CWL: support for heterogeneity and structural variant callers that make
use of variant inputs.
- CWL: support ensemble calling for combining multiple variant callers.
- ensemble: remove no-ALT ref calls that contribute to incorrect ensemble outputs
- RNA-seq: output a matrix of un-deduped UMI counts when doing single-cell/DGE
for quality control purposes. This is called `tagcounts-dupes.mtx` in the
final directory.
- single-cell RNA-seq: allow pre-transformed FASTQ files as input to DGE/single-cell
pipeline.
- single-cell RNA-seq: only create one index per specified genome instead of per
sample
- fgbio: back compatibility for older quality setting `--min-consensus-base-quality`
- RNA-seq: fix for `fusion_caller` getting interpreted as a path, leading to
memoization/upload issues.
- RNA-seq: memoize rRNA quality calculations, speeding up reruns.
- RNA-seq: prefix `description` with an X if it starts with a number, for R
compatibility.
Thanks to Avinash Reddy and Dan Stetson at AstraZeneca.
- single-cell RNA-seq: respect `--positional` flag with the new tag counting. Thanks to
Babak Alaei at AstraZeneca.
- RNA-seq: turn on `--seqBias` flag by default for Salmon as early-version overfitting
issues have been fixed.
- RNA-seq: report insert size from Salmon fragment distribution, not samtools stats.
- RNA-seq: when processing explant samples, produce a combined tx2gene.csv file from
all organisms processed.
## 1.1.0 (11 July 2018)
- Germline calls: rename outputs to `samplename-germline` to provide easier
to understand outputs in final directory.
- Add bcbioRNASeq object creation and automatic quality report generation
with `tools_on: [bcbiornaseq]`
- CWL: Support germline/somatic calling for tumor samples.
- CNVkit: improve whole genome runs. Better speed in normalize_sv_coverage
through parallelization and avoiding logging. Avoid memory errors in segmentation.
- UMI: upload prepared UMI bam file (pre-consensus) to final output directory
- Add support for bbmap as an aligner
- RNA-seq variant calling: parallelize GATK HaplotypeCaller over regions to
avoid memory and timeout issues.
- Support joint calling with GATK using pre-prepared gVCF inputs.
- RNA-seq variant calling: allow annotation of output variants with vcfanno
- Support hg38 builds with peddy QC
- QC: support VerifyBamID2 for contamination detection
- CWL: adjust defaults for align_split_size and nomap_split_targets to match
different parallelization and overhead for these runs
- CWL: support for Cromwell runner
- custom genomes: Unzip GTF file prior to installation.
- Avoid making variant_regions required during processing (by filling with
coverage) to differentiate targeted and non analyses downstream.
- Avoid attempts to download pre-installed S3 genomes, providing better
errors with missing genome installs.
- Trimming: add explicit `polyg` option for removing 3' G stretches in NovaSeq
and NextSeq data. Now defaults to no polyG trimming unless turned on.
- Chip-seq: Add RiP calculation for chip-seq data.
- DeepVariant and Strelka2 support for customized targeted/genome calling models
per region to handle heterogeneous inputs.
- STAR: enable passing custom options for alignment.
- Add `tools_off: [coverage_qc]` option to skip calculating coverage stats (samtools-stats and picard).
- Adding BAM file for each sample in small-RNAseq pipeline, samtools
and qualimap qc metrics to multiqc report.
- Allow arbitrary genomes for ChIP-seq. Thanks to @evchambers for pointing out the issue.
## 1.0.9 (10 April 2018)
- Use smoove for lumpy variant calling and genotyping, replacing custom lumpyexpress
......
include *.txt
include *.md
include bcbio/data/umis/*.txt
include bcbio/data/umis/*.txt.gz
include bcbio/data/umis/*.json
include config/*.yaml
include config/*.ini
......
......@@ -24,9 +24,8 @@ def is_empty(bam_file):
"""Determine if a BAM file is empty
"""
bam_file = objectstore.cl_input(bam_file)
sambamba = config_utils.get_program("sambamba", {})
cmd = ("set -o pipefail; "
"{sambamba} view {bam_file} | head -1 | wc -l")
"samtools view {bam_file} | head -1 | wc -l")
p = subprocess.Popen(cmd.format(**locals()), shell=True,
executable=do.find_bash(),
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
......@@ -46,10 +45,9 @@ def is_paired(bam_file):
http://stackoverflow.com/a/12451083/252589
"""
bam_file = objectstore.cl_input(bam_file)
sambamba = config_utils.get_program("sambamba", {})
cmd = ("set -o pipefail; "
"{sambamba} view {bam_file} | head -50000 | "
"{sambamba} view -S -F paired /dev/stdin | head -1 | wc -l")
"samtools view -h {bam_file} | head -50000 | "
"samtools view -S -f 1 /dev/stdin | head -1 | wc -l")
p = subprocess.Popen(cmd.format(**locals()), shell=True,
executable=do.find_bash(),
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
......@@ -151,13 +149,18 @@ def ref_file_from_bam(bam_file, data):
def get_downsample_pct(in_bam, target_counts, data):
"""Retrieve percentage of file to downsample to get to target counts.
Avoids minimal downsample which is not especially useful for
improving QC times; 90& or more of reads.
"""
total = sum(x.aligned for x in idxstats(in_bam, data))
with pysam.Samfile(in_bam, "rb") as work_bam:
n_rgs = max(1, len(work_bam.header.get("RG", [])))
rg_target = n_rgs * target_counts
if total > rg_target:
return float(rg_target) / float(total)
pct = float(rg_target) / float(total)
if pct < 0.9:
return pct
def get_aligned_reads(in_bam, data):
index(in_bam, data["config"], check_timestamp=False)
......@@ -445,6 +448,16 @@ def _get_sort_stem(in_bam, order, out_dir):
sort_base = sort_base.split(suffix)[0]
return sort_base + SUFFIXES[order]
def aligner_from_header(in_bam):
"""Identify aligner from the BAM header; handling pre-aligned inputs.
"""
from bcbio.pipeline.alignment import TOOLS
with pysam.Samfile(in_bam, "rb") as bamfile:
for pg in bamfile.header.get("PG", []):
for ka in TOOLS.keys():
if pg.get("PN", "").lower().find(ka) >= 0:
return ka
def sample_name(in_bam):
"""Get sample name from BAM file.
"""
......@@ -476,27 +489,19 @@ def estimate_fragment_size(bam_file, nreads=5000):
return 0
return int(numpy.median(lengths))
def filter_stream_cmd(bam_file, data, filter_flag):
"""
return a command to keep only alignments matching the filter flag
see https://github.com/lomereiter/sambamba/wiki/%5Bsambamba-view%5D-Filter-expression-syntax for examples
"""
sambamba = config_utils.get_program("sambamba", data["config"])
num_cores = dd.get_num_cores(data)
cmd = ('{sambamba} view -t {num_cores} -f bam -F "{filter_flag}" {bam_file}')
return cmd.format(**locals())
def filter_primary_stream_cmd(bam_file, data):
return filter_stream_cmd(bam_file, data, "not secondary_alignment")
def filter_primary(bam_file, data):
"""Filter reads to primary only BAM.
Removes:
- not primary alignment (0x100) 256
- supplementary alignment (0x800) 2048
"""
stem, ext = os.path.splitext(bam_file)
out_file = stem + ".primary" + ext
if utils.file_exists(out_file):
return out_file
if not utils.file_exists(out_file):
with file_transaction(data, out_file) as tx_out_file:
cmd = filter_primary_stream_cmd(bam_file, data)
cmd += "> {tx_out_file}"
cores = dd.get_num_cores(data)
cmd = ("samtools view -@ {cores} -F 2304 -b {bam_file} > {tx_out_file}")
do.run(cmd.format(**locals()), ("Filtering primary alignments in %s." %
os.path.basename(bam_file)))
return out_file
......
......@@ -25,6 +25,7 @@ from bcbio.pipeline import shared
from bcbio.pipeline import datadict as dd
from bcbio.variation import coverage
from bcbio.variation import multi as vmulti
from bcbio.structural import regions
def sample_callable_bed(bam_file, ref_file, data):
......@@ -39,7 +40,8 @@ def sample_callable_bed(bam_file, ref_file, data):
return r.name == "CALLABLE" and (not noalt_calling or chromhacks.is_nonalt(r.chrom))
out_file = "%s-callable_sample.bed" % os.path.splitext(bam_file)[0]
with shared.bedtools_tmpdir(data):
callable_bed, depth_files = coverage.calculate(bam_file, data)
sv_bed = regions.get_sv_bed(data)
callable_bed, depth_files = coverage.calculate(bam_file, data, sv_bed)
input_regions_bed = dd.get_variant_regions(data)
if not utils.file_uptodate(out_file, callable_bed):
with file_transaction(data, out_file) as tx_out_file:
......@@ -260,7 +262,7 @@ def combine_sample_regions(*samples):
producing a global set of callable regions.
"""
samples = utils.unpack_worlds(samples)
samples = [cwlutils.unpack_tarballs(x, x) for x in samples]
samples = cwlutils.unpack_tarballs(samples, samples[0])
# back compatibility -- global file for entire sample set
global_analysis_file = os.path.join(samples[0]["dirs"]["work"], "analysis_blocks.bed")
if utils.file_exists(global_analysis_file) and not _needs_region_update(global_analysis_file, samples):
......@@ -338,3 +340,30 @@ def _combine_sample_regions_batch(batch, items):
return analysis_file, no_analysis_file
else:
return None, None
def get_split_regions(bed_file, data):
"""Retrieve a set of split regions using the input BED for callable regions.
Provides a less inclusive hook for parallelizing over multiple regions.
"""
out_file = "%s-analysis_blocks.bed" % utils.splitext_plus(bed_file)[0]
with shared.bedtools_tmpdir(data):
if not utils.file_uptodate(out_file, bed_file):
ref_regions = get_ref_bedtool(dd.get_ref_file(data), data["config"])
nblock_regions = ref_regions.subtract(pybedtools.BedTool(bed_file)).saveas()
min_n_size = int(tz.get_in(["config", "algorithm", "nomap_split_size"], data, 250))
block_filter = NBlockRegionPicker(ref_regions, data["config"], min_n_size)
final_nblock_regions = nblock_regions.filter(
block_filter.include_block).saveas().each(block_filter.expand_block).saveas()
with file_transaction(data, out_file) as tx_out_file:
final_regions = ref_regions.subtract(final_nblock_regions, nonamecheck=True).\
saveas().merge(d=min_n_size).saveas(tx_out_file)
chroms = set([])
with shared.bedtools_tmpdir(data):
for r in pybedtools.BedTool(bed_file):
chroms.add(r.chrom)
out = []
for r in pybedtools.BedTool(out_file):
if r.chrom in chroms:
out.append((r.chrom, r.start, r.stop))
return out
from Bio import SeqIO
from bcbio.utils import file_exists
from bcbio.distributed.transaction import file_transaction
def sequence_length(fasta):
"""
......@@ -15,3 +17,26 @@ def sequence_names(fasta):
sequences = SeqIO.parse(fasta, "fasta")
records = [record.id for record in sequences]
return records
def total_sequence_length(fasta):
"""
return the total length of all sequences in a FASTA file
"""
return sum([x for x in sequence_length(fasta).values()])
def strip_transcript_versions(fasta, out_file):
"""
strip transcript versions from a FASTA file. these appear like this:
>ENST00000434970.2 cdna chromosome:GRCh38:14:22439007:22439015:1 etc
"""
if file_exists(out_file):
return out_file
with file_transaction(out_file) as tx_out_file:
with open(tx_out_file, "w") as out_handle:
with open(fasta) as in_handle:
for line in in_handle:
if line.startswith(">"):
out_handle.write(line.split(" ")[0].split(".")[0] + "\n")
else:
out_handle.write(line)
return out_file
......@@ -23,6 +23,8 @@ def groom(in_file, data, in_qual="illumina", out_dir=None, out_file=None):
Grooms a FASTQ file from Illumina 1.3/1.5 quality scores into
sanger format, if it is not already in that format.
"""
if not out_file.endswith("gz"):
out_file = "%s.gz" % out_file
seqtk = config_utils.get_program("seqtk", data["config"])
if in_qual == "fastq-sanger":
logger.info("%s is already in Sanger format." % in_file)
......
"""Calculation of mapped reads by BAM counting, currently implemented with samtools.
"""
import contextlib
import json
import os
import time
import toolz as tz
from bcbio import bam, utils
......@@ -41,8 +43,7 @@ def number_of_mapped_reads(data, bam_file, keep_dups=True, bed_file=None, target
Uses a global cache file to store counts, making it possible to pass this single
file for CWL runs. For parallel processes it can have concurrent append writes,
with the hopes this will avoid race conditions. Need to revisit with some kind
of global file locking if it becomes an issue.
so we have a simple file locking mechanism to avoid this.
"""
# Flag explainer https://broadinstitute.github.io/picard/explain-flags.html
callable_flags = ["not unmapped", "not mate_is_unmapped", "not secondary_alignment",
......@@ -95,7 +96,26 @@ def number_of_mapped_reads(data, bam_file, keep_dups=True, bed_file=None, target
for line in in_handle:
count += int(line.rstrip().split()[-1])
# Update cache
with _simple_lock(cache_file):
with open(cache_file, "a") as out_handle:
out_handle.write("%s\t%s\n" % (key, count))
return count
@contextlib.contextmanager
def _simple_lock(f):
"""Simple file lock, times out after 20 second assuming lock is stale
"""
lock_file = f + ".lock"
timeout = 20
curtime = 0
interval = 2
while os.path.exists(lock_file):
time.sleep(interval)
curtime += interval
if curtime > timeout:
os.remove(lock_file)
with open(lock_file, "w") as out_handle:
out_handle.write("locked")
yield
if os.path.exists(lock_file):
os.remove(lock_file)
......@@ -18,7 +18,10 @@ SUPPORTED_ADAPTERS = {
"illumina": ["AACACTCTTTCCCT", "AGATCGGAAGAGCG"],
"truseq": ["AGATCGGAAGAG"],
"polya": ["AAAAAAAAAAAAA"],
"nextera": ["AATGATACGGCGA", "CAAGCAGAAGACG"]}
"nextera": ["AATGATACGGCGA", "CAAGCAGAAGACG"],
"truseq2": ["GATCGGAAGAGCACACGTCTGAACTCCAGTCAC", "GATCGGAAGAGCGTCGTGTAGGGAAAGAGTGTAGATCTCGGTGGTCGCCGTATCATT"], # 3'only: first read, second read
"nextera2": ["CTGTCTCTTATACACATCT", "AGATGTGTATAAGAGACAG"] # Second read in pair 3', 5
}
def trim_adapters(data):
to_trim = [x for x in data["files"] if x is not None and is_fastq(x)]
......@@ -70,7 +73,7 @@ def _atropos_trim(fastq_files, adapters, out_dir, data):
tx_out2 = tx_out[2]
# polyX trimming, anchored to the 3' ends of reads
if "polyx" in dd.get_adapters(data):
adapters += ["A{200}$", "C{200}$", "G{200}$", "T{200}$"]
adapters += ["A{200}", "C{200}", "G{200}", "T{200}"]
adapters_args = " ".join(["-a '%s'" % a for a in adapters])
adapters_args += " --overlap 8" # Avoid very short internal matches (default is 3)
adapters_args += " --no-default-adapters --no-cache-adapters" # Prevent GitHub queries and saving pickles
......@@ -93,10 +96,12 @@ def _atropos_trim(fastq_files, adapters, out_dir, data):
ropts = " ".join(str(x) for x in
config_utils.get_resources("atropos", data["config"]).get("options", []))
extra_opts = []
for k, alt_ks, v in [("--quality-cutoff", ["-q "], "5"),
("--minimum-length", ["-m "], str(dd.get_min_read_length(data))),
("--nextseq-trim", [], "25")]:
for k, alt_ks, v, want in [("--quality-cutoff", ["-q "], "5", True),
("--minimum-length", ["-m "], str(dd.get_min_read_length(data)), True),
("--nextseq-trim", [], "25", ("polyx" in dd.get_adapters(data) or
"polyg" in dd.get_adapters(data)))]:
if k not in ropts and not any(alt_k in ropts for alt_k in alt_ks):
if want:
extra_opts.append("%s=%s" % (k, v))
extra_opts = " ".join(extra_opts)
thread_args = ("--threads %s" % cores if cores > 1 else "")
......@@ -125,12 +130,13 @@ def _fastp_trim(fastq_files, adapters, out_dir, data):
cmd += ["-i", inf, "-o", outf]
else:
cmd += ["-I", inf, "-O", outf]
cmd += ["--trim_poly_g", "--poly_g_min_len", "8",
"--cut_by_quality3", "--cut_mean_quality", "5",
cmd += ["--cut_by_quality3", "--cut_mean_quality", "5",
"--length_required", str(dd.get_min_read_length(data)),
"--disable_quality_filtering"]
if "polyx" in dd.get_adapters(data):
cmd += ["--trim_poly_x", "--poly_x_min_len", "8"]
if "polyx" in dd.get_adapters(data) or "polyg" in dd.get_adapters(data):
cmd += ["--trim_poly_g", "--poly_g_min_len", "8"]
for a in adapters:
cmd += ["--adapter_sequence", a]
if not adapters:
......
......@@ -141,6 +141,7 @@ def fix_missing_spark_user(cl, prog, params):
https://blog.openshift.com/jupyter-on-openshift-part-6-running-as-an-assigned-user-id/
"""
if prog.find("Spark") >= 0 or "--spark-master" in params:
user = None
try:
user = getpass.getuser()
except KeyError:
......@@ -148,7 +149,11 @@ def fix_missing_spark_user(cl, prog, params):
with open("/etc/passwd", "a") as out_handle:
out_handle.write("sparkanon:x:{uid}:{uid}:sparkanon:/nonexistent:/usr/sbin/nologin\n"
.format(uid=os.getuid()))
try:
user = getpass.getuser()
except KeyError:
pass
if user:
cl = "export SPARK_USER=%s && " % (user) + cl
return cl
......@@ -527,6 +532,8 @@ def gatk_cmd(name, jvm_opts, params, config=None):
data = config
if not data or "gatk4" not in dd.get_tools_off(data):
return _gatk4_cmd(jvm_opts, params, data)
else:
name = "gatk3"
gatk_cmd = utils.which(os.path.join(os.path.dirname(os.path.realpath(sys.executable)), name))
# if we can't find via the local executable, fallback to being in the path
if not gatk_cmd:
......@@ -537,10 +544,10 @@ def gatk_cmd(name, jvm_opts, params, config=None):
" ".join(jvm_opts), " ".join([str(x) for x in params]))
def _gatk4_cmd(jvm_opts, params, data):
"""Retrieve unified command for GATK4, using gatk-launch.
"""Retrieve unified command for GATK4, using 'gatk'. GATK3 is 'gatk3'.
"""
gatk_cmd = utils.which(os.path.join(os.path.dirname(os.path.realpath(sys.executable)), "gatk-launch"))
return "%s && export PATH=%s:$PATH && gatk-launch --java-options '%s' %s" % \
gatk_cmd = utils.which(os.path.join(os.path.dirname(os.path.realpath(sys.executable)), "gatk"))
return "%s && export PATH=%s:$PATH && gatk --java-options '%s' %s" % \
(utils.clear_java_home(), utils.get_java_binpath(gatk_cmd),
" ".join(jvm_opts), " ".join([str(x) for x in params]))
......
......@@ -364,7 +364,7 @@ class PicardMetrics(object):
("INPUT", dup_bam),
("OUTPUT", tx_metrics)]
try:
self._picard.run("CalculateHsMetrics", opts)
self._picard.run("CollectHsMetrics", opts)
# HsMetrics fails regularly with memory errors
# so we catch and skip instead of aborting the
# full process
......@@ -451,12 +451,17 @@ def bed_to_interval(orig_bed, bam_file):
with open(tmp_bed, "w") as out_handle:
out_handle.write(header)
with open(orig_bed) as in_handle:
for line in in_handle:
for i, line in enumerate(in_handle):
parts = line.rstrip().split("\t")
if len(parts) == 3:
parts.append("+")
parts.append("a")
out_handle.write("\t".join(parts) + "\n")
if len(parts) == 4:
chrom, start, end, name = parts
strand = "+"
elif len(parts) >= 3:
chrom, start, end = parts[:3]
strand = "+"
name = "r%s" % i
out = [chrom, start, end, strand, name]
out_handle.write("\t".join(out) + "\n")
yield tmp_bed
......
......@@ -3,8 +3,6 @@
import os
import collections
import pysam
from bcbio.utils import file_exists
from bcbio.distributed.transaction import file_transaction, tx_tmpdir
......@@ -288,7 +286,7 @@ def bed2interval(align_file, bed, out_file=None):
http://genome.ucsc.edu/FAQ/FAQformat.html#format1.5
"""
import pysam
base, ext = os.path.splitext(align_file)
if out_file is None:
out_file = base + ".interval"
......
......@@ -13,6 +13,7 @@ from bcbio.log import logger
def clean_chipseq_alignment(data):
aligner = dd.get_aligner(data)
data["align_bam"] = dd.get_work_bam(data)
if dd.get_mark_duplicates(data):
if aligner:
if aligner == "bowtie2":
filterer = bowtie2.filter_multimappers
......@@ -27,13 +28,35 @@ def clean_chipseq_alignment(data):
logger.info("Warning: When BAM file is given as input, bcbio skips multimappers removal."
"If BAM is not cleaned for peak calling, can result in downstream errors.")
# lcr_bed = utils.get_in(data, ("genome_resources", "variation", "lcr"))
data["work_bam"] = _keep_assembled_chrom(dd.get_work_bam(data), dd.get_ref_file(data),
data["config"])
encode_bed = tz.get_in(["genome_resources", "variation", "encode_blacklist"], data)
if encode_bed:
data["work_bam"] = _prepare_bam(data["work_bam"], encode_bed, data['config'])
data["work_bam"] = _prepare_bam(dd.get_work_bam(data), encode_bed, data['config'])
bam.index(data["work_bam"], data['config'])
data["bigwig"] = _bam_coverage(dd.get_sample_name(data), dd.get_work_bam(data), data)
return [[data]]
def _keep_assembled_chrom(bam_file, genome, config):
"""Remove contigs from the BAM file"""
fai = "%s.fai" % genome
chrom = []
with open(fai) as inh:
for line in inh:
c = line.split("\t")[0]
if c.find("_") < 0:
chrom.append(c)
chroms = " ".join(chrom)
out_file = utils.append_stem(bam_file, '_chrom')
samtools = config_utils.get_program("samtools", config)
if not utils.file_exists(out_file):
with file_transaction(out_file) as tx_out:
cmd = "{samtools} view -b {bam_file} {chroms} > {tx_out}"
do.run(cmd.format(**locals()), "Remove contigs from %s" % bam_file)
bam.index(out_file, config)
return out_file
def _prepare_bam(bam_file, bed_file, config):
"""Remove regions from bed files"""
if not bam_file or not bed_file:
......@@ -43,21 +66,29 @@ def _prepare_bam(bam_file, bed_file, config):
if not utils.file_exists(out_file):
with file_transaction(out_file) as tx_out:
cmd = "{bedtools} subtract -nonamecheck -A -a {bam_file} -b {bed_file} > {tx_out}"
do.run(cmd.format(**locals()), "Clean %s" % bam_file)
do.run(cmd.format(**locals()), "Remove blacklist regions from %s" % bam_file)
return out_file
def get_genome(genome):
def get_genome(data):
"""
get the effective length of the genome, falling back to the length of the genome
if the effective length is not precomputed
"""
from bcbio.chipseq import macs2
from bcbio.bam import fasta
genome = dd.get_genome_build(data)
loaded = macs2.HS
if genome in loaded:
return loaded[genome]
else:
return sum([x for x in fasta.sequence_length(dd.get_ref_file(data)).values()])
def _bam_coverage(name, bam_input, data):
"""Run bamCoverage from deeptools"""
cmd = ("{bam_coverage} -b {bam_input} -o {bw_output} "
"--binSize 20 --effectiveGenomeSize {size} "
"--smoothLength 60 --extendReads 150 --centerReads -p {cores}")
size = int(get_genome(dd.get_genome_build(data)))
size = int(get_genome(data))
cores = dd.get_num_cores(data)
try:
bam_coverage = config_utils.get_program("bamCoverage", data)
......
import os
import subprocess
import glob
from bcbio import utils
from bcbio.provenance import do
from bcbio.pipeline import config_utils
from bcbio.pipeline import datadict as dd
from bcbio import bam
HS = {"hg19": 2.7e9,
......@@ -14,12 +14,13 @@ HS = {"hg19": 2.7e9,
"mm10": 1.87e9,
"dm3": 1.2e8}
def run(name, chip_bam, input_bam, genome_build, out_dir, method, resources, config):
def run(name, chip_bam, input_bam, genome_build, out_dir, method, resources, data):
"""
Run macs2 for chip and input samples avoiding
errors due to samples.
"""
# output file name need to have the caller name
config = dd.get_config(data)
out_file = os.path.join(out_dir, name + "_peaks_macs2.xls")
macs2_file = os.path.join(out_dir, name + "_peaks.xls")
if utils.file_exists(out_file):
......@@ -27,14 +28,8 @@ def run(name, chip_bam, input_bam, genome_build, out_dir, method, resources, con
return _get_output_files(out_dir)
macs2 = config_utils.get_program("macs2", config)
options = " ".join(resources.get("macs2", {}).get("options", ""))
if genome_build not in HS and options.find("-g") == -1:
raise ValueError("This %s genome doesn't have a pre-set value."
"You can add specific values using resources "
"option for macs2 in the YAML file (-g genome_size)."
"Check Chip-seq configuration in "
"bcbio-nextgen documentation.")
genome_size = "" if options.find("-g") > -1 else "-g %s" % HS[genome_build]
genome_size = HS.get(genome_build, bam.fasta.total_sequence_length(dd.get_ref_file(data)))
genome_size = "" if options.find("-g") > -1 else "-g %s" % genome_size
paired = "-f BAMPE" if bam.is_paired(chip_bam) else ""
with utils.chdir(out_dir):
cmd = _macs2_cmd(method)
......@@ -52,7 +47,16 @@ def run(name, chip_bam, input_bam, genome_build, out_dir, method, resources, con
return _get_output_files(out_dir)
def _get_output_files(out_dir):
return {"macs2": [os.path.abspath(fn) for fn in glob.glob(os.path.join(out_dir, "*"))]}
fns = [os.path.abspath(fn) for fn in glob.glob(os.path.join(out_dir, "*"))]
peaks = None
for fn in fns:
if fn.endswith("narrowPeak"):
peaks = fn
break
elif fn.endswith("broadPeak"):
peaks = fn
break
return {"main": peaks, "macs2": fns}
def _compres_bdg_files(out_dir):
for fn in glob.glob(os.path.join(out_dir, "*bdg")):
......
......@@ -50,7 +50,7 @@ def calling(data):
name = dd.get_sample_name(data)
out_dir = utils.safe_makedir(os.path.join(dd.get_work_dir(data), data["peak_fn"], name))
out_files = caller_fn(name, chip_bam, input_bam, dd.get_genome_build(data), out_dir,
dd.get_chip_method(data), data["resources"], data["config"])
dd.get_chip_method(data), data["resources"], data)
greylistdir = greylisting(data)
data.update({"peaks_files": out_files})
# data["input_bam_filter"] = input_bam
......
......@@ -17,10 +17,11 @@ import yaml
from bcbio import utils
from bcbio.cwl import defs, workflow
from bcbio.distributed import objectstore, resources
from bcbio.distributed.transaction import file_transaction
from bcbio.pipeline import alignment
INTEGRATION_MAP = {"keep:": "arvados", "s3:": "s3", "sbg:": "sbgenomics",
"dx:": "dnanexus"}
"dx:": "dnanexus", "gs:": "gs"}
def from_world(world, run_info_file, integrations=None, add_container_tag=None):
base = utils.splitext_plus(os.path.basename(run_info_file))[0]
......@@ -91,7 +92,7 @@ def _get_disk_estimates(name, parallel, inputs, file_estimates, samples, disk,
tmp_disk = int(math.ceil(out_disk * 0.5))
out_disk = int(math.ceil(out_disk))
bcbio_docker_disk = 1 * 1024 # Minimum requirements for bcbio Docker image
bcbio_docker_disk = (10 if cur_remotes else 1) * 1024 # Minimum requirements for bcbio Docker image
disk_hint = {"outdirMin": bcbio_docker_disk + out_disk, "tmpdirMin": tmp_disk}
# Skip input disk for steps which require only transformation (and thus no staging)
if no_files:
......@@ -169,7 +170,7 @@ def _write_tool(step_dir, name, inputs, outputs, parallel, image, programs,
if any(p.startswith(("gatk", "sentieon")) for p in programs):
out["hints"] += [{"class": "arv:APIRequirement"}]
# Multi-process methods that read heavily from BAM files need extra keep cache for Arvados
if name in ["pipeline_summary", "variantcall_batch_region"]:
if name in ["pipeline_summary", "variantcall_batch_region", "detect_sv"]:
out["hints"] += [{"class": "arv:RuntimeConstraints", "keep_cache": 4096}]
def add_to_namespaces(k, v, out):
if "$namespaces" not in out:
......@@ -194,7 +195,8 @@ def _write_tool(step_dir, name, inputs, outputs, parallel, image, programs,
"sentinel_inputs=%s" % ",".join(["%s:%s" %
(workflow.get_base_id(v["id"]),
"record" if workflow.is_cwl_record(v) else "var")
for v in inputs])]
for v in inputs]),
"run_number=0"]
out = _add_inputs_to_tool(inputs, out, parallel, use_commandline_args)
out = _add_outputs_to_tool(outputs, out)
_tool_to_file(out, out_file)
......@@ -324,7 +326,7 @@ def _place_secondary_files(inp_tool, inp_binding=None):
"""
def _is_file(val):
return (val == "File" or (isinstance(val, (list, tuple)) and
("File" in val or any(isinstance(x, dict) and x["items"] == "File") for x in val)))
("File" in val or any(isinstance(x, dict) and _is_file(val)) for x in val)))
secondary_files = inp_tool.pop("secondaryFiles", None)
if secondary_files:
key = []
......@@ -440,6 +442,9 @@ def prep_cwl(samples, workflow_fn, out_dir, out_file, integrations=None,
if "outputSource" not in wf_output:
wf_output["outputSource"] = wf_output.pop("source")
wf_output = _clean_record(wf_output)
# Avoid input/output naming clashes
if wf_output["id"] in used_inputs:
wf_output["id"] = "%s_out" % wf_output["id"]
out["outputs"].append(wf_output)
elif cur[0] == "wf_start":
parent_wfs.append(out)
......@@ -504,7 +509,7 @@ def _indexes_to_secondary_files(gresources, genome_build):
if isinstance(val, dict) and "indexes" in val:
# list of indexes -- aligners
if len(val.keys()) == 1:
indexes = val["indexes"]
indexes = sorted(val["indexes"])
if len(indexes) == 0:
if refname not in alignment.allow_noindices():
raise ValueError("Did not find indexes for %s: %s" % (refname, val))
......@@ -545,7 +550,7 @@ def _get_secondary_files(val):
for s in _get_secondary_files(x):
s_counts[s] += 1
for s, count in s_counts.items():
if s and s not in out and count == len(val):
if s and s not in out and count == len([x for x in val if x]):
out.append(s)
elif isinstance(val, dict) and (val.get("class") == "File" or "File" in val.get("class")):
if "secondaryFiles" in val:
......@@ -602,16 +607,19 @@ def _get_avro_type(val):
types.append(x)
elif ctype not in types:
types.append(ctype)
# handle empty types, allow null or a string "null" sentinel
# handle empty types, allow null
if len(types) == 0:
types = ["null", "string"]
types = ["null"]
# empty lists
if isinstance(val, (list, tuple)) and len(val) == 0:
types.append({"type": "array", "items": ["null", "string"]})
types.append({"type": "array", "items": ["null"]})
types = _avoid_duplicate_arrays(types)
# Avoid empty null only arrays which confuse some runners
if len(types) == 1 and types[0] == "null":
types.append("string")
return {"type": "array", "items": (types[0] if len(types) == 1 else types)}
elif val is None:
return ["null", "string"]
return ["null"]
# encode booleans as string True/False and unencode on other side
elif isinstance(val, bool) or isinstance(val, basestring) and val.lower() in ["true", "false", "none"]:
return ["string", "null", "boolean"]
......@@ -662,8 +670,16 @@ def _to_cwldata(key, val, get_retriever):
else:
out.append((key, _to_cwlfile_with_indexes(val, get_retriever)))
# Dump shared nested keys like resources as a JSON string
elif key in workflow.ALWAYS_AVAILABLE:
elif key in workflow.ALWAYS_AVAILABLE or key in workflow.STRING_DICT:
out.append((key, _item_to_cwldata(json.dumps(val), get_retriever)))
elif key in workflow.FLAT_DICT:
flat = []
for k, vs in val.items():
if not isinstance(vs, (list, tuple)):
vs = [vs]
for v in vs:
flat.append("%s:%s" % (k, v))
out.append((key, _item_to_cwldata(flat, get_retriever)))
else:
remain_val = {}
for nkey, nval in val.items():
......@@ -681,21 +697,36 @@ def _to_cwldata(key, val, get_retriever):
out.append((key, _item_to_cwldata(val, get_retriever)))
return out
def _remove_remote_prefix(f):
"""Remove any remote references to allow object lookups by file paths.
"""
return f.split(":")[-1] if objectstore.is_remote(f) else f
def _to_cwlfile_with_indexes(val, get_retriever):
"""Convert reads with ready to go indexes into the right CWL object.
Identifies the top level directory and creates a tarball, avoiding
trying to handle complex secondary setups which are not cross platform.
Skips doing this for reference files, which take up too much time and
space to unpack multiple times.
Skips doing this for reference files and standard setups like bwa, which
take up too much time and space to unpack multiple times.
"""
if val["base"].endswith(".fa") and any([x.endswith(".fa.fai") for x in val["indexes"]]):
return _item_to_cwldata(val["base"], get_retriever)
else:
dirname = os.path.dirname(val["base"])
assert all([x.startswith(dirname) for x in val["indexes"]])
return {"class": "File", "path": _directory_tarball(dirname)}
tval = {"base": _remove_remote_prefix(val["base"]),
"indexes": [_remove_remote_prefix(f) for f in val["indexes"]]}
# Standard named set of indices, like bwa
# Do not include snpEff, which we need to isolate inside a nested directory
# hisat2 indices do also not localize cleanly due to compilicated naming
cp_dir, cp_base = os.path.split(os.path.commonprefix([tval["base"]] + tval["indexes"]))
if (cp_base and cp_dir == os.path.dirname(tval["base"]) and
not ("/snpeff/" in cp_dir or "/hisat2" in cp_dir)):
return _item_to_cwldata(val["base"], get_retriever, val["indexes"])
else:
dirname = os.path.dirname(tval["base"])
assert all([x.startswith(dirname) for x in tval["indexes"]])
return {"class": "File", "path": directory_tarball(dirname)}
def _add_secondary_if_exists(secondary, out, get_retriever):
"""Add secondary files only if present locally or remotely.
......@@ -706,7 +737,7 @@ def _add_secondary_if_exists(secondary, out, get_retriever):
out["secondaryFiles"] = [{"class": "File", "path": f} for f in secondary]
return out
def _item_to_cwldata(x, get_retriever):
def _item_to_cwldata(x, get_retriever, indexes=None):
""""Markup an item with CWL specific metadata.
"""
if isinstance(x, (list, tuple)):
......@@ -716,7 +747,9 @@ def _item_to_cwldata(x, get_retriever):
objectstore.is_remote(x))):
if _file_local_or_remote(x, get_retriever):
out = {"class": "File", "path": x}
if x.endswith(".bam"):
if indexes:
out = _add_secondary_if_exists(indexes, out, get_retriever)
elif x.endswith(".bam"):
out = _add_secondary_if_exists([x + ".bai"], out, get_retriever)
elif x.endswith((".vcf.gz", ".bed.gz")):
out = _add_secondary_if_exists([x + ".tbi"], out, get_retriever)
......@@ -730,7 +763,7 @@ def _item_to_cwldata(x, get_retriever):
elif x.endswith(".gtf"):
out = _add_secondary_if_exists([x + ".db"], out, get_retriever)
else:
out = {"class": "File", "path": _directory_tarball(x)}
out = {"class": "File", "path": directory_tarball(x)}
return out
elif isinstance(x, bool):
return str(x)
......@@ -746,22 +779,25 @@ def _file_local_or_remote(f, get_retriever):
if integration:
return integration.file_exists(f, config)
def _directory_tarball(dirname):
def directory_tarball(dirname):
"""Create a tarball of a complex directory, avoiding complex secondaryFiles.
Complex secondary files do not work on multiple platforms and are not portable
to WDL, so for now we create a tarball that workers will unpack.
"""
assert os.path.isdir(dirname)
assert os.path.isdir(dirname), dirname
base_dir, tarball_dir = os.path.split(dirname)
while base_dir and not os.path.exists(os.path.join(base_dir, "seq")):
while not os.path.exists(os.path.join(base_dir, "seq")) and base_dir and base_dir != "/":
base_dir, extra_tarball = os.path.split(base_dir)
tarball_dir = os.path.join(extra_tarball, tarball_dir)
if base_dir == "/" and not os.path.exists(os.path.join(base_dir, "seq")):
raise ValueError("Did not find relative directory to create tarball for %s" % dirname)
tarball = os.path.join(base_dir, "%s-wf.tar.gz" % (tarball_dir.replace(os.path.sep, "--")))
if not utils.file_exists(tarball):
print("Preparing CWL input tarball: %s" % tarball)
with file_transaction({}, tarball) as tx_tarball:
with utils.chdir(base_dir):
with tarfile.open(tarball, "w:gz") as tar:
with tarfile.open(tx_tarball, "w:gz") as tar:
tar.add(tarball_dir)
return tarball
......@@ -769,16 +805,9 @@ def _clean_final_outputs(keyvals, get_retriever):
def clean_path(get_retriever, x):
integration, config = get_retriever.integration_and_config(x)
if integration:
return integration.clean_file(x)
return integration.clean_file(x, config)
else:
return x
def null_to_string(x):
"""Convert None values into the string 'null'
Required for platforms like SevenBridges without null support from inputs.
"""
return "null" if x is None else x
keyvals = _adjust_items(keyvals, null_to_string)
keyvals = _adjust_files(keyvals, functools.partial(clean_path, get_retriever))
return keyvals
......
......@@ -33,7 +33,7 @@ def to_rec_single(samples, default_keys=None):
return out
def is_cwl_run(data):
return data.get("is_cwl") or data.get("cwl_keys")
return data.get("is_cwl") or data.get("cwl_keys") or data.get("output_cwl_keys")
def handle_combined_input(args):
"""Check for cases where we have a combined input nested list.
......@@ -123,6 +123,7 @@ def _get_all_cwlkeys(items, default_keys=None):
"validate__tp", "validate__fp", "validate__fn",
"config__algorithm__coverage", "config__algorithm__coverage_merged",
"genome_resources__variation__cosmic", "genome_resources__variation__dbsnp",
"genome_resources__variation__clinvar"
])
all_keys = set([])
for data in items:
......@@ -237,6 +238,10 @@ def _get_vcf_samples(calls, items):
else:
for data in items:
for i, test_name in enumerate([dd.get_sample_name(data)] + dd.get_batches(data)):
# For tumor/normal batches, want to attach germline VCFs to normals
# Standard somatics go to tumors
if dd.get_phenotype(data) == "normal":
test_name += "-germline"
if os.path.basename(f).startswith(("%s-" % test_name,
"%s." % test_name)):
# Prefer matches to single samples (gVCF) over joint batches
......
This diff is collapsed.
"""Setup configurations for running on HPC clusters with CWL.
Contains support for setting up configuration inputs for Cromwell.
"""
import json
import os
def create_cromwell_config(args, work_dir, sample_file):
"""Prepare a cromwell configuration within the current working directory.
"""
docker_attrs = ["String? docker", "String? docker_user"]
cwl_attrs = ["Int? cpuMin", "Int? cpuMax", "Int? memoryMin", "Int? memoryMax", "String? outDirMin",
"String? outDirMax", "String? tmpDirMin", "String? tmpDirMax"]
out_file = os.path.join(work_dir, "bcbio-cromwell.conf")
run_config = _load_custom_config(args.runconfig) if args.runconfig else {}
# Avoid overscheduling jobs for local runs by limiting concurrent jobs
# Longer term would like to keep these within defined core window
joblimit = args.joblimit
if joblimit == 0 and not args.scheduler:
joblimit = 1
file_types = _get_filesystem_types(args, sample_file)
std_args = {"docker_attrs": "" if args.no_container else "\n ".join(docker_attrs),
"submit_docker": 'submit-docker: ""' if args.no_container else "",
"joblimit": "concurrent-job-limit = %s" % (joblimit) if joblimit > 0 else "",
"cwl_attrs": "\n ".join(cwl_attrs),
"filesystem": _get_filesystem_config(file_types),
"database": run_config.get("database", DATABASE_CONFIG % {"work_dir": work_dir}),
"engine": _get_engine_filesystem_config(file_types, args)}
cl_args, conf_args, scheduler, cloud_type = _args_to_cromwell(args)
conf_args.update(std_args)
main_config = {"hpc": (HPC_CONFIGS[scheduler] % conf_args) if scheduler else "",
"cloud": (CLOUD_CONFIGS[cloud_type] % conf_args) if cloud_type else "",
"work_dir": work_dir}
main_config.update(std_args)
# Local run always seems to need docker set because of submit-docker in default configuration
# Can we unset submit-docker based on configuration so it doesn't inherit?
# main_config["docker_attrs"] = "\n ".join(docker_attrs)
with open(out_file, "w") as out_handle:
out_handle.write(CROMWELL_CONFIG % main_config)
return out_file
def _get_file_paths(cur):
"""Retrieve a list of file paths, recursively traversing the
"""
out = []
if isinstance(cur, (list, tuple)):
for x in cur:
new = _get_file_paths(x)
if new:
out.extend(new)
elif isinstance(cur, dict):
if "class" in cur:
out.append(cur["path"])
else:
for k, v in cur.items():
new = _get_file_paths(v)
if new:
out.extend(new)
return out
def _load_custom_config(run_config):
"""Load custom configuration input HOCON file for cromwell.
"""
from pyhocon import ConfigFactory, HOCONConverter, ConfigTree
conf = ConfigFactory.parse_file(run_config)
out = {}
if "database" in conf:
out["database"] = HOCONConverter.to_hocon(ConfigTree({"database": conf.get_config("database")}))
return out
def args_to_cromwell_cl(args):
"""Convert input bcbio arguments into cromwell command line arguments.
"""
cl_args, conf_args, scheduler, cloud = _args_to_cromwell(args)
return cl_args
def _args_to_cromwell(args):
"""Convert input arguments into cromwell inputs for config and command line.
"""
default_config = {"slurm": {"timelimit": "1-00:00", "account": ""},
"sge": {"memtype": "mem_type", "pename": "smp"},
"lsf": {"walltime": "24:00", "account": ""},
"htcondor": {},
"torque": {"walltime": "24:00:00", "account": ""},
"pbspro": {"walltime": "24:00:00", "account": "",
"cpu_and_mem": "-l select=1:ncpus=${cpu}:mem=${memory_mb}mb"}}
prefixes = {("account", "slurm"): "-A ", ("account", "pbspro"): "-A "}
custom = {("noselect", "pbspro"): ("cpu_and_mem", "-l ncpus=${cpu} -l mem=${memory_mb}mb")}
cl = []
config = {}
# HPC scheduling
if args.scheduler:
if args.scheduler not in default_config:
raise ValueError("Scheduler not yet supported by Cromwell: %s" % args.scheduler)
if not args.queue and args.scheduler not in ["htcondor"]:
raise ValueError("Need to set queue (-q) for running with an HPC scheduler")
config = default_config[args.scheduler]
cl.append("-Dbackend.default=%s" % args.scheduler.upper())
config["queue"] = args.queue
for rs in args.resources:
for r in rs.split(";"):
parts = r.split("=")
if len(parts) == 2:
key, val = parts
config[key] = prefixes.get((key, args.scheduler), "") + val
elif len(parts) == 1 and (parts[0], args.scheduler) in custom:
key, val = custom[(parts[0], args.scheduler)]
config[key] = val
cloud_type = None
if args.cloud_project:
if args.cloud_root and args.cloud_root.startswith("gs:"):
cloud_type = "PAPI"
else:
raise ValueError("Unexpected inputs for Cromwell Cloud support: %s %s" %
(args.cloud_project, args.cloud_root))
config = {"cloud_project": args.cloud_project, "cloud_root": args.cloud_root}
cl.append("-Dbackend.default=%s" % cloud_type)
return cl, config, args.scheduler, cloud_type
def _get_filesystem_types(args, sample_file):
"""Retrieve the types of inputs and staging based on sample JSON and arguments.
"""
out = set([])
ext = "" if args.no_container else "_container"
with open(sample_file) as in_handle:
for f in _get_file_paths(json.load(in_handle)):
if f.startswith("gs:"):
out.add("gcp%s" % ext)
elif f.startswith(("https:", "http:")):
out.add("http%s" % ext)
else:
out.add("local%s" % ext)
return out
def _get_filesystem_config(file_types):
"""Retrieve filesystem configuration, including support for specified file types.
"""
out = " filesystems {\n"
for file_type in sorted(list(file_types)):
out += _FILESYSTEM_CONFIG[file_type]
out += " }\n"
return out
_FILESYSTEM_CONFIG = {
"gcp": """
gcs {
auth = "gcp-auth"
caching {
duplication-strategy = "reference"
}
}
""",
"gcp_container": """
gcs {
auth = "gcp-auth"
caching {
duplication-strategy = "copy"
}
}
""",
"http": """
http { }
""",
"http_container": """
http { }
""",
"local": """
local {
localization: ["soft-link"]
caching {
duplication-strategy: ["soft-link"]
hashing-strategy: "path"
}
}
""",
"local_container": """
local {
localization: ["hard-link", "copy"]
caching {
duplication-strategy: ["hard-link", "copy"]
hashing-strategy: "file"
}
}
"""
}
DATABASE_CONFIG = """
database {
profile = "slick.jdbc.HsqldbProfile$"
db {
driver = "org.hsqldb.jdbcDriver"
url = "jdbc:hsqldb:file:%(work_dir)s/persist/metadata;shutdown=false;hsqldb.tx=mvcc"
connectionTimeout = 200000
}
}
"""
def _get_engine_filesystem_config(file_types, args):
"""Retriever authorization and engine filesystem configuration.
"""
file_types = [x.replace("_container", "") for x in list(file_types)]
out = ""
if "gcp" in file_types:
out += _AUTH_CONFIG_GOOGLE
if "gcp" in file_types or "http" in file_types:
out += "engine {\n"
out += " filesystems {\n"
if "gcp" in file_types:
out += ' gcs {\n'
out += ' auth = "gcp-auth"\n'
if args.cloud_project:
out += ' project = "%s"\n' % args.cloud_project
out += ' }\n'
if "http" in file_types:
out += ' http {}\n'
out += " }\n"
out += "}\n"
return out
_AUTH_CONFIG_GOOGLE = """
google {
application-name = "cromwell"
auths = [
{
name = "gcp-auth"
scheme = "service_account"
json-file = ${?GOOGLE_APPLICATION_CREDENTIALS}
}
]
}
"""
CROMWELL_CONFIG = """
include required(classpath("application"))
system {
workflow-restart = true
}
call-caching {
enabled = true
}
load-control {
# Avoid watching memory, since the load-controller stops jobs on local runs
memory-threshold-in-mb = 1
}
cwltool-runner {
# Use external cwltool to avoid slow runtimes with java embedded pre-processing
class = "cwl.CwltoolProcess"
}
%(database)s
%(engine)s
backend {
providers {
Local {
config {
%(joblimit)s
runtime-attributes = \"\"\"
Int? cpu
Int? memory_mb
%(docker_attrs)s
%(cwl_attrs)s
\"\"\"
%(submit_docker)s
%(filesystem)s
}
}
%(hpc)s
%(cloud)s
}
}
"""
HPC_CONFIGS = {
"slurm": """
SLURM {
actor-factory = "cromwell.backend.impl.sfs.config.ConfigBackendLifecycleActorFactory"
config {
%(joblimit)s
runtime-attributes = \"\"\"
Int cpu = 1
Int memory_mb = 2048
String queue = "%(queue)s"
String timelimit = "%(timelimit)s"
String account = "%(account)s"
%(docker_attrs)s
%(cwl_attrs)s
\"\"\"
submit = \"\"\"
sbatch -J ${job_name} -D ${cwd} -o ${out} -e ${err} -t ${timelimit} -p ${queue} \
${"--cpus-per-task=" + cpu} --mem=${memory_mb} ${account} \
--wrap "/usr/bin/env bash ${script}"
\"\"\"
kill = "scancel ${job_id}"
check-alive = "squeue -j ${job_id}"
job-id-regex = "Submitted batch job (\\\\d+).*"
%(filesystem)s
}
}
""",
"sge": """
SGE {
actor-factory = "cromwell.backend.impl.sfs.config.ConfigBackendLifecycleActorFactory"
config {
%(joblimit)s
runtime-attributes = \"\"\"
Int cpu = 1
Int memory_mb = 2048
String queue = "%(queue)s"
String pename = "%(pename}s"
String memtype = "%(memtype)s"
%(docker_attrs)s
%(cwl_attrs)s
\"\"\"
submit = \"\"\"
qsub -V -w w -j y -N ${job_name} -wd ${cwd} \
-o ${out} -e ${err} -q ${queue} \
-pe ${pename} ${cpu} ${"-l " + mem_type + "=" + memory_mb + "m"} \
/usr/bin/env bash ${script}
\"\"\"
kill = "qdel ${job_id}"
check-alive = "qstat -j ${job_id}"
job-id-regex = "(\\\\d+)"
%(filesystem)s
}
}
""",
"lsf": """
LSF {
actor-factory = "cromwell.backend.impl.sfs.config.ConfigBackendLifecycleActorFactory"
config {
%(joblimit)s
runtime-attributes = \"\"\"
Int cpu = 1
Int memory_gb = 2
String queue = "%(queue)s"
String account = "%(account)s"
String walltime = "%(walltime)s"
%(docker_attrs)s
%(cwl_attrs)s
\"\"\"
submit = \"\"\"
bsub -J ${job_name} -cwd ${cwd} -o ${out} -e ${err} -q ${queue} -W ${walltime} \
-R rusage[mem=${memory_gb}] -n ${cpu} \
/usr/bin/env bash ${script}
\"\"\"
kill = "bkill ${job_id}"
check-alive = "bjobs ${job_id}"
job-id-regex = "Job <(\\\\d+)>.*"
%(filesystem)s
}
}
""",
"pbspro": """
PBSPRO {
actor-factory = "cromwell.backend.impl.sfs.config.ConfigBackendLifecycleActorFactory"
config {
%(joblimit)s
runtime-attributes = \"\"\"
Int cpu = 1
Int memory_mb = 2048
String queue = "%(queue)s"
String account = "%(account)s"
String walltime = "%(walltime)s"
%(docker_attrs)s
%(cwl_attrs)s
\"\"\"
submit = \"\"\"
qsub -V -l wd -N ${job_name} -o ${out} -e ${err} -q ${queue} -l walltime=${walltime} \
%(cpu_and_mem)s \
-- /usr/bin/env bash ${script}
\"\"\"
kill = "qdel ${job_id}"
check-alive = "qstat -j ${job_id}"
job-id-regex = "(\\\\d+).*"
%(filesystem)s
}
}
""",
"torque": """
TORQUE {
actor-factory = "cromwell.backend.impl.sfs.config.ConfigBackendLifecycleActorFactory"
config {
%(joblimit)s
runtime-attributes = \"\"\"
Int cpu = 1
Int memory_mb = 2048
String queue = "%(queue)s"
String account = "%(account)s"
String walltime = "%(walltime)s"
%(docker_attrs)s
%(cwl_attrs)s
\"\"\"
submit = \"\"\"
qsub -V -d ${cwd} -N ${job_name} -o ${out} -e ${err} -q ${queue} \
-l nodes=1:ppn=${cpu} -l mem=${memory_mb}mb -l walltime=${walltime} \
${script}
\"\"\"
kill = "qdel ${job_id}"
check-alive = "qstat ${job_id}"
job-id-regex = "(\\\\d+).*"
%(filesystem)s
}
}
""",
"htcondor": """
HTCONDOR {
actor-factory = "cromwell.backend.impl.sfs.config.ConfigBackendLifecycleActorFactory"
config {
%(joblimit)s
runtime-attributes = \"\"\"
Int cpu = 1
Float memory_mb = 512.0
Float disk_kb = 256000.0
String? nativeSpecs
%(docker_attrs)s
%(cwl_attrs)s
\"\"\"
submit = \"\"\"
chmod 755 ${script}
cat > ${cwd}/execution/submitFile <<EOF
Iwd=${cwd}/execution
requirements=${nativeSpecs}
leave_in_queue=true
request_memory=${memory_mb}
request_disk=${disk_kb}
error=${err}
output=${out}
log_xml=true
request_cpus=${cpu}
executable=${script}
log=${cwd}/execution/execution.log
description=${job_name}
getenv=true
queue
EOF
condor_submit ${cwd}/execution/submitFile
\"\"\"
# submit-docker = \"\"\"
# chmod 755 ${script}
# cat > ${cwd}/execution/dockerScript <<EOF
# #!/bin/bash
# docker run --rm -i -v ${cwd}:${docker_cwd} ${docker} /bin/bash ${script}
# EOF
# chmod 755 ${cwd}/execution/dockerScript
# cat > ${cwd}/execution/submitFile <<EOF
# Iwd=${cwd}/execution
# requirements=${nativeSpecs}
# leave_in_queue=true
# request_memory=${memory_mb}
# request_disk=${disk_kb}
# error=${cwd}/execution/stderr
# output=${cwd}/execution/stdout
# log_xml=true
# request_cpus=${cpu}
# executable=${cwd}/execution/dockerScript
# log=${cwd}/execution/execution.log
# queue
# EOF
# condor_submit ${cwd}/execution/submitFile
# \"\"\"
kill = "condor_rm ${job_id}"
check-alive = "condor_q ${job_id}"
job-id-regex = "(?sm).*cluster (\\\\d+)..*"
%(filesystem)s
}
}
"""
}
CLOUD_CONFIGS = {
"PAPI": """
PAPI {
actor-factory = "cromwell.backend.google.pipelines.v2alpha1.PipelinesApiLifecycleActorFactory"
config {
project = "%(cloud_project)s"
root = "%(cloud_root)s"
genomics {
auth = "gcp-auth"
endpoint-url = "https://genomics.googleapis.com/"
}
filesystems {
gcs {
auth = "gcp-auth"
project = "%(cloud_project)s"
}
}
}
}
"""
}
......@@ -8,5 +8,5 @@ def run(args):
"""
dirs, config, run_info_yaml = run_info.prep_system(args.sample_config, args.systemconfig)
integrations = args.integrations if hasattr(args, "integrations") else {}
world = run_info.organize(dirs, config, run_info_yaml, add_provenance=False, integrations=integrations)
world = run_info.organize(dirs, config, run_info_yaml, is_cwl=True, integrations=integrations)
create.from_world(world, run_info_yaml, integrations=integrations, add_container_tag=args.add_container_tag)