Skip to content
Commits on Source (2)
* @johanneskoester
......@@ -22,6 +22,13 @@ jobs:
source activate black
black --check snakemake tests/*.py
- name: Comment PR
if: github.event_name == 'pull_request' && failure()
uses: marocchino/sticky-pull-request-comment@v1.1.0
with:
message: 'Please format your code with [black](https://black.readthedocs.io): `black snakemake tests/*.py`.'
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
testing:
runs-on: ubuntu-latest
needs: formatting
......@@ -47,7 +54,7 @@ jobs:
CI: true
run: |
# enable coverage recording for subprocesses
echo -e "try:\n import coverage\n coverage.process_startup()\nexcept:\n pass" > sitecustomize.py
echo -e "try:\n import coverage\n coverage.process_startup()\nexcept:\n pass" > sitecustomize.py
export COVERAGE_PROCESS_START=.coveragerc
# activate conda env
......
[5.9.1] - 2019-12-20
====================
Changed
-------
- Added a missing module.
[5.9.0] - 2019-12-20
====================
Added
-----
- Support for per-rule environment module definitions to enable HPC specific software deployment (see docs).
- Allow custom log handler defitions via --log-handler-script (e.g. post errors and progress to a slack channel or send emails).
- Allow setting threads as a function of the given cores (see docs).
Changed
-------
- Various minor fixes.
[5.8.2] - 2019-12-16
====================
Added
-----
- Implemented a ``multiext`` helper, allowing to define a set of output files that just differ by extension.
Changed
-------
- Fixed a failure when caching jobs with conda environments.
- Fixed various minor bugs.
- Caching now allows to cache the output of rules using ``multiext``.
[5.8.1] - 2019-11-15
====================
Changed
......
==========================================================
Caching and reusing intermediate results between workflows
==========================================================
.. _caching:
========================
Between workflow caching
========================
Within certain data analysis fields, there are certain intermediate results that reoccur in exactly the same way in many analysis.
For example, in bioinformatics, reference genomes or annotations are downloaded, and read mapping indexes are built.
......@@ -23,7 +25,8 @@ The environment variable definition that happens in the first line (defining the
When Snakemake is executed without a shared filesystem (e.g., in the cloud, see :ref:`cloud`), the environment variable has to point to a location compatible with the given remote provider (e.g. an S3 or Google Storage bucket).
In any case, the provided location should be shared between all workflows of your group, institute or computing environment, in order to benefit from the reuse of previously obtained intermediate results.
Note that only rules with just a single output file are eligible for caching.
Note that only rules with just a single output file (or directory) or with :ref:`multiext output files <snakefiles-multiext>` are eligible for caching.
The reason is that for other rules it would be impossible to unambiguously assign the output files to cache entrys while being agnostic of the actual file names.
Also note that the rules need to retrieve all their parameters via the ``params`` directive (except input files).
It is not allowed to directly use ``wildcards``, ``config`` or any global variable in the shell command or script, because these are not captured in the hash (otherwise, reuse would be unnecessarily limited).
......
......@@ -82,7 +82,7 @@ Rules describe how to create **output files** from **input files**.
* Input and output files can contain multiple named wildcards.
* Rules can either use shell commands, plain Python code or external Python or R scripts to create output files from input files.
* Snakemake workflows can be easily executed on **workstations**, **clusters**, **the grid**, and **in the cloud** without modification. The job scheduling can be constrained by arbitrary resources like e.g. available CPU cores, memory or GPUs.
* Snakemake can automatically deploy required software dependencies of a workflow using `Conda <https://conda.io>`_ or `Singularity <http://singularity.lbl.gov/>`_.
* Snakemake can automatically deploy required software dependencies of a workflow using `Conda <https://conda.io>`_ or `Singularity <https://sylabs.io/docs/>`_.
* Snakemake can use Amazon S3, Google Storage, Dropbox, FTP, WebDAV, SFTP and iRODS to access input or output files and further access input files via HTTP and HTTPS.
......
......@@ -12,12 +12,22 @@ following structure:
├── .gitignore
├── README.md
├── LICENSE.md
├── config.yaml
├── scripts
│ ├── script1.py
│ └── script2.R
├── envs
│ └── myenv.yaml
├── workflow
│ ├── scripts
| │ ├── script1.py
| │ └── script2.R
│ ├── rules
| │ ├── module1.smk
| │ └── module2.smk
│ ├── report
| │ ├── plot1.smk
| │ └── plot2.smk
│ └── envs
| │ ├── tool1.smk
| │ └── tool2.smk
├── config
│ ├── config.yaml
│ └── some-sheet.tsv
└── Snakefile
Then, a workflow can be deployed to a new system via the following steps
......@@ -29,7 +39,7 @@ Then, a workflow can be deployed to a new system via the following steps
cd path/to/workdir
# edit config and workflow as needed
vim config.yaml
vim config/config.yaml
# execute workflow, deploy software dependencies via conda
snakemake -n --use-conda
......@@ -89,6 +99,7 @@ The path to the environment definition is interpreted as **relative to the Snake
Snakemake will store the environment persistently in ``.snakemake/conda/$hash`` with ``$hash`` being the MD5 hash of the environment definition file content. This way, updates to the environment definition are automatically detected.
Note that you need to clean up environments manually for now. However, in many cases they are lightweight and consist of symlinks to your central conda installation.
Conda deployment also works well for offline or air-gapped environments. Running ``snakemake -n --use-conda --create-envs-only`` will only install the required conda environments without running the full workflow. Subsequent runs with ``--use-conda`` will make use of the local environments without requiring internet access.
.. _singularity:
......@@ -174,6 +185,34 @@ The user can, upon execution, freely choose the desired level of reproducibility
* Conda based package management (use versions defined by the workflow developer)
* Conda based package management in containerized OS (use versions and OS defined by the workflow developer)
-------------------------
Using environment modules
-------------------------
In high performace cluster systems (HPC), it can be preferable to use environment modules for deployment of optimized versions of certain standard tools.
Snakemake allows to define environment modules per rule:
.. code-block:: python
rule bwa:
input:
"genome.fa"
"reads.fq"
output:
"mapped.bam"
conda:
"envs/bwa.yaml"
envmodules:
"bio/bwa/0.7.9",
"bio/samtools/1.9"
shell:
"bwa mem {input} | samtools view -Sbh - > {output}"
Here, when Snakemake is executed with `snakemake --use-envmodules`, it will load the defined modules in the given order, instead of using the also defined conda environment.
Note that although not mandatory, one should always provide either a conda environment or a container (see above), along with environment module definitions.
The reason is that environment modules are often highly platform specific, and cannot be assumed to be available somewhere else, thereby limiting reproducibility.
By definition an equivalent conda environment or container as a fallback, people outside of the HPC system where the workflow has been designed can still execute it, e.g. by running `snakemake --use-conda` instead of `snakemake --use-envmodules`.
--------------------------------------
Sustainable and reproducible archiving
--------------------------------------
......
......@@ -120,74 +120,121 @@ Finally, you can also define global wildcard constraints that apply for all rule
See the `Python documentation on regular expressions <http://docs.python.org/py3k/library/re.html>`_ for detailed information on regular expression syntax.
.. _snakefiles-targets:
Targets
-------
Aggregation
-----------
By default snakemake executes the first rule in the snakefile. This gives rise to pseudo-rules at the beginning of the file that can be used to define build-targets similar to GNU Make:
Input files can be Python lists, allowing to easily aggregate over parameters or samples:
.. code-block:: python
rule all:
input: ["{dataset}/file.A.txt".format(dataset=dataset) for dataset in DATASETS]
rule aggregate:
input:
["{dataset}/a.txt".format(dataset=dataset) for dataset in DATASETS]
output:
"aggregated.txt"
shell:
...
Here, for each dataset in a python list ``DATASETS`` defined before, the file ``{dataset}/file.A.txt`` is requested. In this example, Snakemake recognizes automatically that these can be created by multiple applications of the rule ``complex_conversion`` shown above.
Above expression can be simplified in two ways.
Above expression can be simplified to the following:
The expand function
~~~~~~~~~~~~~~~~~~~
.. code-block:: python
rule all:
input: expand("{dataset}/file.A.txt", dataset=DATASETS)
This may be used for "aggregation" rules for which files from multiple or all datasets are needed to produce a specific output (say, *allSamplesSummary.pdf*).
Note that *dataset* is NOT a wildcard here because it is resolved by Snakemake due to the ``expand`` statement (see below also for more information).
rule aggregate:
input:
expand("{dataset}/a.txt", dataset=DATASETS)
output:
"aggregated.txt"
shell:
...
Note that *dataset* is NOT a wildcard here because it is resolved by Snakemake due to the ``expand`` statement.
The ``expand`` function thereby allows also to combine different variables, e.g.
.. code-block:: python
rule all:
input: expand("{dataset}/file.A.{ext}", dataset=DATASETS, ext=PLOTFORMATS)
rule aggregate:
input:
expand("{dataset}/a.{ext}", dataset=DATASETS, ext=FORMATS)
output:
"aggregated.txt"
shell:
...
If now ``PLOTFORMATS=["pdf", "png"]`` contains a list of desired output formats then expand will automatically combine any dataset with any of these extensions.
If now ``FORMATS=["txt", "csv"]`` contains a list of desired output formats then expand will automatically combine any dataset with any of these extensions.
Further, the first argument can also be a list of strings. In that case, the transformation is applied to all elements of the list. E.g.
.. code-block:: python
expand(["{dataset}/plot1.{ext}", "{dataset}/plot2.{ext}"], dataset=DATASETS, ext=PLOTFORMATS)
expand(["{dataset}/a.{ext}", "{dataset}/b.{ext}"], dataset=DATASETS, ext=FORMATS)
leads to
.. code-block:: python
["ds1/plot1.pdf", "ds1/plot2.pdf", "ds2/plot1.pdf", "ds2/plot2.pdf", "ds1/plot1.png", "ds1/plot2.png", "ds2/plot1.png", "ds2/plot2.png"]
["ds1/a.txt", "ds1/b.txt", "ds2/a.txt", "ds2/b.txt", "ds1/a.csv", "ds1/b.csv", "ds2/a.csv", "ds2/b.csv"]
Per default, ``expand`` uses the python itertools function ``product`` that yields all combinations of the provided wildcard values. However by inserting a second positional argument this can be replaced by any combinatoric function, e.g. ``zip``:
.. code-block:: python
expand("{dataset}/plot1.{ext} {dataset}/plot2.{ext}".split(), zip, dataset=DATASETS, ext=PLOTFORMATS)
expand(["{dataset}/a.{ext}", "{dataset}/b.{ext}"], zip, dataset=DATASETS, ext=FORMATS)
leads to
.. code-block:: python
["ds1/plot1.pdf", "ds1/plot2.pdf", "ds2/plot1.png", "ds2/plot2.png"]
["ds1/a.txt", "ds1/b.txt", "ds2/a.csv", "ds2/b.csv"]
You can also mask a wildcard expression in expand such that it will be kept, e.g.
.. code-block:: python
expand("{{dataset}}/plot1.{ext}", ext=PLOTFORMATS)
expand("{{dataset}}/a.{ext}", ext=FORMATS)
will create strings with all values for ext but starting with the wildcard ``"{dataset}"``.
.. _snakefiles-multiext:
The multiext function
~~~~~~~~~~~~~~~~~~~~~
``multiext`` provides a simplified variant of ``expand`` that allows to define a set of output or input files that just differ by their extension:
.. code-block:: python
rule plot:
input:
...
output:
multiext("some/plot", ".pdf", ".svg", ".png")
shell:
...
The effect is the same as if you would write ``expand("some/plot.{ext}", ext=[".pdf", ".svg", ".png"])``, however, using a simpler syntax.
Moreover, defining output with ``multiext`` is the only way to use :ref:`between workflow caching <caching>` for rules with multiple output files.
will create strings with all values for ext but starting with ``"{dataset}"``.
.. _snakefiles-targets:
Targets and aggregation
-----------------------
By default snakemake executes the first rule in the snakefile. This gives rise to pseudo-rules at the beginning of the file that can be used to define build-targets similar to GNU Make:
.. code-block:: python
rule all:
input: ["{dataset}/file.A.txt".format(dataset=dataset) for dataset in DATASETS]
Here, for each dataset in a python list ``DATASETS`` defined before, the file ``{dataset}/file.A.txt`` is requested. In this example, Snakemake recognizes automatically that these can be created by multiple applications of the rule ``complex_conversion`` shown above.
.. _snakefiles-threads:
......@@ -205,8 +252,30 @@ Further, a rule can be given a number of threads to use, i.e.
threads: 8
shell: "somecommand --threads {threads} {input} {output}"
.. sidebar:: Note
On a cluster node, Snakemake uses as many cores as available on that node.
Hence, the number of threads used by a rule never exceeds the number of physically available cores on the node.
Note: This behavior is not affected by ``--local-cores``, which only applies to jobs running on the master node.
Snakemake can alter the number of cores available based on command line options. Therefore it is useful to propagate it via the built in variable ``threads`` rather than hardcoding it into the shell command.
In particular, it should be noted that the specified threads have to be seen as a maximum. When Snakemake is executed with fewer cores, the number of threads will be adjusted, i.e. ``threads = min(threads, cores)`` with ``cores`` being the number of cores specified at the command line (option ``--cores``). On a cluster node, Snakemake uses as many cores as available on that node. Hence, the number of threads used by a rule never exceeds the number of physically available cores on the node. Note: This behavior is not affected by ``--local-cores``, which only applies to jobs running on the master node.
In particular, it should be noted that the specified threads have to be seen as a maximum. When Snakemake is executed with fewer cores, the number of threads will be adjusted, i.e. ``threads = min(threads, cores)`` with ``cores`` being the number of cores specified at the command line (option ``--cores``).
Hardcoding a particular maximum number of threads like above is useful when a certain tool has a natural maximum beyond it parallelization won't help to further speed it up.
This is often the case, and should be evaluated carefully for production workflows.
If it is certain that no such maximum exists for a tool, one can instead define threads as a function of the number of cores given to Snakemake:
.. code-block:: python
rule NAME:
input: "path/to/inputfile", "path/to/other/inputfile"
output: "path/to/outputfile", "path/to/another/outputfile"
threads: workflow.cores * 0.75
shell: "somecommand --threads {threads} {input} {output}"
The number of given cores is globally available in the Snakefile as an attribute of the workflow object: ``workflow.cores``.
Any arithmetic operation can be performed to derive a number of threads from this. E.g., in the above example, we reserve 75% of the given cores for the rule.
Snakemake will always round the calculated value down (while enforcing a minimum of 1 thread).
Starting from version 3.7, threads can also be a callable that returns an ``int`` value. The signature of the callable should be ``callable(wildcards[, input])`` (input is an optional parameter). It is also possible to refer to a predefined variable (e.g, ``threads: threads_max``) so that the number of cores for a set of rules can be changed with one change only by altering the value of the variable ``threads_max``.
......
......@@ -123,7 +123,7 @@ First, we download some example data on which the workflow shall be executed:
.. code:: console
$ wget https://github.com/snakemake/snakemake-tutorial-data/archive/v5.4.5.tar.gz
$ tar -xf v5.2.3.tar.gz --strip 1
$ tar -xf v5.4.5.tar.gz --strip 1
This will create a folder ``data`` and a file ``environment.yaml`` in the working directory.
......
......@@ -30,7 +30,7 @@ e.g., via
mkdir snakemake-demo
cd snakemake-demo
wget https://github.com/snakemake/snakemake-tutorial-data/archive/v5.4.5.tar.gz
tar --wildcards -xf v5.2.3.tar.bz2 --strip 1 "*/data"
tar --wildcards -xf v5.4.5.tar.gz --strip 1 "*/data"
Step 1
------
......
" Vim syntax file
" Language: Snakemake (extended from python.vim)
" Maintainer: Jay Hesselberth (jay.hesselberth@gmail.com)
" Last Change: 2019 Jul 26
" Language: Snakemake (extended from python.vim)
" Maintainer: Jay Hesselberth (jay.hesselberth@gmail.com)
" Last Change: 2019 Nov 22
"
" Usage
"
......@@ -19,12 +19,11 @@
" load settings from system python.vim (7.4)
source $VIMRUNTIME/syntax/python.vim
source $VIMRUNTIME/indent/python.vim
"
" Snakemake rules, as of version 3.3
" Snakemake rules, as of version 5.8
"
" XXX N.B. several of the new defs are missing from this table i.e.
" subworkflow, touch etc
"
" rule = "rule" (identifier | "") ":" ruleparams
" include = "include:" stringliteral
......@@ -43,24 +42,25 @@ source $VIMRUNTIME/syntax/python.vim
" singularity = "singularity" ":" stringliteral
" conda = "conda" ":" stringliteral
" shadow = "shadow" ":" stringliteral
" group = "group" ":" stringliteral
" group = "group" ":" stringliteral
syn keyword pythonStatement include workdir onsuccess onerror
syn keyword pythonStatement ruleorder localrules configfile group
syn keyword pythonStatement touch protected temp wrapper conda shadow
syn keyword pythonStatement input output params message threads resources singularity wildcard_constraints
syn keyword pythonStatement version run shell benchmark snakefile log script
syn keyword pythonStatement rule subworkflow nextgroup=pythonFunction skipwhite
syn keyword pythonStatement include workdir onsuccess onerror onstart
syn keyword pythonStatement ruleorder localrules configfile group
syn keyword pythonStatement wrapper conda shadow
syn keyword pythonStatement input output params wildcards priority message threads resources singularity wildcard_constraints
syn keyword pythonStatement version run shell benchmark snakefile log script
syn keyword pythonStatement rule subworkflow checkpoint nextgroup=pythonFunction skipwhite
syn keyword pythonBuiltinObj config checkpoints rules
syn keyword pythonBuiltinFunc directory ancient pipe unpack expand temp touch protected
" similar to special def and class treatment from python.vim, except
" parenthetical part of def and class
syn match pythonFunction
\ "\%(\%(rule\s\|subworkflow\s\)\s*\)\@<=\h\w*" contained
syn match pythonFunction
\ "\%(\%(rule\s\|subworkflow\s\|checkpoint\s\)\s*\)\@<=\h\w*" contained
syn sync match pythonSync grouphere NONE "^\s*\%(rule\|subworkflow\)\s\+\h\w*\s*"
syn sync match pythonSync grouphere NONE "^\s*\%(rule\|subworkflow\|checkpoint\)\s\+\h\w*\s*"
let b:current_syntax = "snakemake"
" vim:set sw=2 sts=2 ts=8 noet:
......@@ -37,7 +37,7 @@ setup(
zip_safe=False,
license="MIT",
url="https://snakemake.readthedocs.io",
packages=["snakemake", "snakemake.remote", "snakemake.report", "snakemake.caching"],
packages=["snakemake", "snakemake.remote", "snakemake.report", "snakemake.caching", "snakemake.deployment"],
entry_points={
"console_scripts": [
"snakemake = snakemake:main",
......
......@@ -16,6 +16,7 @@ import webbrowser
from functools import partial
import importlib
import shutil
from importlib.machinery import SourceFileLoader
from snakemake.workflow import Workflow
from snakemake.dag import Batch
......@@ -125,6 +126,7 @@ def snakemake(
force_use_threads=False,
use_conda=False,
use_singularity=False,
use_env_modules=False,
singularity_args="",
conda_prefix=None,
list_conda_envs=False,
......@@ -228,8 +230,9 @@ def snakemake(
restart_times (int): number of times to restart failing jobs (default 0)
attempt (int): initial value of Job.attempt. This is intended for internal use only (default 1).
force_use_threads: whether to force use of threads over processes. helpful if shared memory is full or unavailable (default False)
use_conda (bool): create conda environments for each job (defined with conda directive of rules)
use_conda (bool): use conda environments for each job (defined with conda directive of rules)
use_singularity (bool): run jobs in singularity containers (if defined with singularity directive)
use_env_modules (bool): load environment modules if defined in rules
singularity_args (str): additional arguments to pass to singularity
conda_prefix (str): the directory in which conda environments will be created (default None)
singularity_prefix (str): the directory to which singularity images will be pulled (default None)
......@@ -294,7 +297,6 @@ def snakemake(
bool: True if workflow execution was successful.
"""
assert not immediate_submit or (
immediate_submit and notemp
), "immediate_submit has to be combined with notemp (it does not support temp file handling)"
......@@ -340,6 +342,7 @@ def snakemake(
use_threads = (
force_use_threads or (os.name != "posix") or cluster or cluster_sync or drmaa
)
if not keep_logger:
stdout = (
(
......@@ -350,6 +353,7 @@ def snakemake(
or list_target_rules
or list_resources
)
setup_logger(
handler=log_handler,
quiet=quiet,
......@@ -446,6 +450,7 @@ def snakemake(
verbose=verbose,
use_conda=use_conda or list_conda_envs or cleanup_conda,
use_singularity=use_singularity,
use_env_modules=use_env_modules,
conda_prefix=conda_prefix,
singularity_prefix=singularity_prefix,
shadow_prefix=shadow_prefix,
......@@ -460,6 +465,9 @@ def snakemake(
run_local=run_local,
default_resources=default_resources,
cache=cache,
cores=cores,
nodes=nodes,
resources=resources,
)
success = True
workflow.include(
......@@ -479,11 +487,8 @@ def snakemake(
# handle subworkflows
subsnakemake = partial(
snakemake,
cores=cores,
nodes=nodes,
local_cores=local_cores,
cache=cache,
resources=resources,
default_resources=default_resources,
dryrun=dryrun,
touch=touch,
......@@ -527,6 +532,7 @@ def snakemake(
force_use_threads=use_threads,
use_conda=use_conda,
use_singularity=use_singularity,
use_env_modules=use_env_modules,
conda_prefix=conda_prefix,
singularity_prefix=singularity_prefix,
shadow_prefix=shadow_prefix,
......@@ -551,8 +557,6 @@ def snakemake(
targets=targets,
dryrun=dryrun,
touch=touch,
cores=cores,
nodes=nodes,
local_cores=local_cores,
forcetargets=forcetargets,
forceall=forceall,
......@@ -601,7 +605,6 @@ def snakemake(
detailed_summary=detailed_summary,
nolock=not lock,
unlock=unlock,
resources=resources,
notemp=notemp,
keep_remote_local=keep_remote_local,
nodeps=nodeps,
......@@ -844,7 +847,7 @@ def get_argument_parser(profile=None):
nargs="?",
metavar="N",
help=(
"Use at most N cores in parallel (default: 1). "
"Use at most N cores in parallel. "
"If N is omitted or 'all', the limit is set to the number of "
"available cores."
),
......@@ -1463,6 +1466,14 @@ def get_argument_parser(profile=None):
action="store_true",
help="Automatically display logs of failed jobs.",
)
group_behavior.add_argument(
"--log-handler-script",
metavar="FILE",
default=None,
help="Provide a custom script containing a function 'def log_handler(msg):'. "
"Snakemake will call this function for every logging output (given as a dictionary msg)"
"allowing to e.g. send notifications in the form of e.g. slack messages or emails.",
)
group_cluster = parser.add_argument_group("CLUSTER")
......@@ -1699,6 +1710,18 @@ def get_argument_parser(profile=None):
metavar="ARGS",
help="Pass additional args to singularity.",
)
group_env_modules = parser.add_argument_group("ENVIRONMENT MODULES")
group_env_modules.add_argument(
"--use-envmodules",
action="store_true",
help="If defined in the rule, run job within the given environment "
"modules, loaded in the given order. This can be combined with "
"--use-conda and --use-singularity, which will then be only used as a "
"fallback for rules which don't define environment modules.",
)
return parser
......@@ -1781,7 +1804,8 @@ def main(argv=None):
)
sys.exit(1)
elif args.cores is None:
args.cores = 1
# if nothing specified, use all avaiable cores
args.cores = available_cpu_count()
if args.drmaa_log_dir is not None:
if not os.path.isabs(args.drmaa_log_dir):
......@@ -1913,6 +1937,29 @@ def main(argv=None):
# silently close
pass
else:
if args.log_handler_script is not None:
if not os.path.exists(args.log_handler_script):
print(
"Error: no log handler script found, {}.".format(
args.log_handler_script
),
file=sys.stderr,
)
sys.exit(1)
log_script = SourceFileLoader("log", args.log_handler_script).load_module()
try:
log_handler = log_script.log_handler
except:
print(
'Error: Invalid log handler script, {}. Expect python function "log_handler(msg)".'.format(
args.log_handler_script
),
file=sys.stderr,
)
sys.exit(1)
else:
log_handler = None
success = snakemake(
args.snakefile,
batch=batch,
......@@ -2004,6 +2051,7 @@ def main(argv=None):
conda_prefix=args.conda_prefix,
list_conda_envs=args.list_conda_envs,
use_singularity=args.use_singularity,
use_env_modules=args.use_envmodules,
singularity_prefix=args.singularity_prefix,
shadow_prefix=args.shadow_prefix,
singularity_args=args.singularity_args,
......@@ -2016,6 +2064,7 @@ def main(argv=None):
cluster_status=args.cluster_status,
export_cwl=args.export_cwl,
show_failed_logs=args.show_failed_logs,
log_handler=log_handler,
)
if args.runtime_profile:
......
......@@ -22,9 +22,9 @@ def get_keywords():
# setup.py/versioneer.py will grep for the variable names, so they must
# each be defined on a line of their own. _version.py will just call
# get_keywords().
git_refnames = " (HEAD -> master, tag: v5.8.1)"
git_full = "ee0d5b17311c9126d89b49eec70045a9fdd6bd9c"
git_date = "2019-11-15 15:33:42 +0100"
git_refnames = " (tag: v5.9.1)"
git_full = "138720fbca54f88a962c564f2adc5b43149e206a"
git_date = "2019-12-20 18:18:00 +0100"
keywords = {"refnames": git_refnames, "full": git_full, "date": git_date}
return keywords
......
......@@ -7,6 +7,7 @@ from abc import ABCMeta, abstractmethod
import os
from snakemake.jobs import Job
from snakemake.io import is_flagged, get_flag_value, apply_wildcards
from snakemake.exceptions import WorkflowError, CacheMissException
from snakemake.caching.hash import ProvenanceHashMap
......@@ -39,20 +40,14 @@ class AbstractOutputFileCache:
def exists(self, job: Job):
pass
def get_outputfile(self, job: Job, check_exists=True):
self.check_job(job)
outputfile = job.output[0]
if check_exists:
assert os.path.exists(
outputfile
), "Bug: Output file does not exist although it should be cached."
return outputfile
def check_job(self, job: Job):
assert (
not job.dynamic_output
), "Bug: Rules with dynamic output may not be cached."
assert len(job.output) == 1, "Bug: Only single output files are supported."
def get_outputfiles(self, job: Job):
if job.rule.output[0].is_multiext:
prefix_len = len(
apply_wildcards(job.rule.output[0].multiext_prefix, job.wildcards)
)
yield from ((f, f[prefix_len:]) for f in job.output)
else:
yield (job.output[0], "")
def raise_write_error(self, entry, exception=None):
raise WorkflowError(
......
......@@ -26,7 +26,7 @@ class ProvenanceHashMap:
def _get_provenance_hash(self, job: Job):
"""
Recursively calculate hash for the single output file of the given job
Recursively calculate hash for the output of the given job
and all upstream jobs in a blockchain fashion.
This is based on an idea of Sven Nahnsen.
......@@ -38,13 +38,6 @@ class ProvenanceHashMap:
if job in self._hashes:
return self._hashes[job]
if len(job.output) > 1:
raise WorkflowError(
"Cannot generate hash for rule {}: it has more than one output file.".format(
job.rule.name
)
)
workflow = job.dag.workflow
h = hashlib.sha256()
......@@ -63,7 +56,7 @@ class ProvenanceHashMap:
h.update(source.encode())
# Hash params.
for key, value in sorted(job.params.allitems()):
for key, value in sorted(job.params._allitems()):
h.update(key.encode())
# If this raises a TypeError, we cannot calculate a reliable hash.
h.update(json.dumps(value, sort_keys=True).encode())
......@@ -82,7 +75,7 @@ class ProvenanceHashMap:
if workflow.use_conda and job.conda_env:
if workflow.use_singularity and job.conda_env.singularity_img_url:
h.update(job.conda_env.singularity_img_url.encode())
h.update(job.conda_env.content.encode())
h.update(job.conda_env.content)
elif workflow.use_singularity and job.singularity_img_url:
h.update(job.singularity_img_url.encode())
......
......@@ -27,83 +27,84 @@ class OutputFileCache(AbstractOutputFileCache):
super().__init__()
self.path = Path(self.cache_location)
def check_writeable(self, entry):
if not (os.access(self.path, os.W_OK) or os.access(self.path / entry, os.W_OK)):
self.raise_write_error(entry)
def check_writeable(self, cachefile):
if not (os.access(cachefile.parent, os.W_OK) or os.access(cachefile, os.W_OK)):
self.raise_write_error(cachefile)
def check_readable(self, entry):
if not os.access(self.path / entry, os.R_OK):
self.raise_read_error(entry)
def check_readable(self, cachefile):
if not os.access(cachefile, os.R_OK):
self.raise_read_error(cachefile)
def store(self, job: Job):
"""
Store generated job output in the cache.
"""
outputfile = self.get_outputfile(job, check_exists=True)
provenance_hash = self.provenance_hash_map.get_provenance_hash(job)
self.check_writeable(provenance_hash)
path = self.path / provenance_hash
logger.info("Moving output file {} to cache.".format(outputfile))
with TemporaryDirectory(dir=self.path) as tmpdirname:
tmp = Path(tmpdirname) / provenance_hash
# First move is performed into a tempdir (it might involve a copy if not on the same FS).
# This is important, such that network filesystem latency
# does not lead to concurrent writes to the same file.
# We can use the plain copy method of shutil, because we do not care about the metadata.
shutil.move(outputfile, tmp, copy_function=shutil.copy)
# make readable/writeable for all
os.chmod(
tmp,
stat.S_IRUSR
| stat.S_IWUSR
| stat.S_IRGRP
| stat.S_IWGRP
| stat.S_IROTH
| stat.S_IWOTH,
)
tmpdir = Path(tmpdirname)
for outputfile, cachefile in self.get_outputfiles_and_cachefiles(job):
self.check_writeable(cachefile)
logger.info("Moving output file {} to cache.".format(outputfile))
tmp = tmpdir / cachefile.name
# First move is performed into a tempdir (it might involve a copy if not on the same FS).
# This is important, such that network filesystem latency
# does not lead to concurrent writes to the same file.
# We can use the plain copy method of shutil, because we do not care about the metadata.
shutil.move(outputfile, tmp, copy_function=shutil.copy)
# make readable/writeable for all
os.chmod(
tmp,
stat.S_IRUSR
| stat.S_IWUSR
| stat.S_IRGRP
| stat.S_IWGRP
| stat.S_IROTH
| stat.S_IWOTH,
)
# Move to the actual path (now we are on the same FS, hence move is atomic).
# Here we use the default copy function, also copying metadata (which is important here).
# It will always work, because we are guaranteed to be in the same FS.
shutil.move(tmp, path)
# now restore the outputfile via a symlink
self.symlink(path, outputfile, utime=False)
# Move to the actual path (now we are on the same FS, hence move is atomic).
# Here we use the default copy function, also copying metadata (which is important here).
# It will always work, because we are guaranteed to be in the same FS.
shutil.move(tmp, cachefile)
# now restore the outputfile via a symlink
self.symlink(cachefile, outputfile, utime=False)
def fetch(self, job: Job):
"""
Retrieve cached output file and copy to the place where the job expects it's output.
"""
self.check_job(job)
provenance_hash = self.provenance_hash_map.get_provenance_hash(job)
path = self.path / provenance_hash
for outputfile, cachefile in self.get_outputfiles_and_cachefiles(job):
if not path.exists():
self.raise_cache_miss_exception(job)
if not cachefile.exists():
self.raise_cache_miss_exception(job)
self.check_readable(provenance_hash)
self.check_readable(cachefile)
outputfile = job.output[0]
self.symlink(path, outputfile)
self.symlink(cachefile, outputfile)
def exists(self, job: Job):
"""
Return True if job is already cached
"""
self.check_job(job)
provenance_hash = self.provenance_hash_map.get_provenance_hash(job)
path = self.path / provenance_hash
for outputfile, cachefile in self.get_outputfiles_and_cachefiles(job):
if not path.exists():
return False
if not cachefile.exists():
return False
self.check_readable(provenance_hash)
self.check_readable(cachefile)
return True
def get_outputfiles_and_cachefiles(self, job: Job):
provenance_hash = self.provenance_hash_map.get_provenance_hash(job)
base_path = self.path / provenance_hash
return (
(outputfile, base_path.with_suffix(ext))
for outputfile, ext in self.get_outputfiles(job)
)
def symlink(self, path, outputfile, utime=True):
if os.utime in os.supports_follow_symlinks or not utime:
logger.info("Symlinking output file {} from cache.".format(outputfile))
......
......@@ -21,42 +21,39 @@ class OutputFileCache(AbstractOutputFileCache):
self.remote_provider = remote_provider
def store(self, job: Job):
outputfile = self.get_outputfile(job)
entry = self._get_remote(job, check_output_exists=True)
# upload to remote
try:
entry.upload()
except Exception as e:
self.raise_write_error(entry, exception=e)
for entry in self._get_remotes(job):
# upload to remote
try:
entry.upload()
except Exception as e:
self.raise_write_error(entry, exception=e)
def fetch(self, job: Job):
self.check_job(job)
entry = self._get_remote(job)
if not entry.exists():
self.raise_cache_miss_exception(job)
for entry in self._get_remotes(job):
if not entry.exists():
self.raise_cache_miss_exception(job)
# download to outputfile
try:
entry.download()
except Exception as e:
self.raise_read_error(entry, exception=e)
# download to outputfile
try:
entry.download()
except Exception as e:
self.raise_read_error(entry, exception=e)
def exists(self, job: Job):
self.check_job(job)
entry = self._get_remote(job)
for entry in self._get_remotes(job):
try:
return entry.exists()
except Exception as e:
self.raise_read_error(entry, exception=e)
try:
return entry.exists()
except Exception as e:
self.raise_read_error(entry, exception=e)
def _get_remote(self, job: Job, check_output_exists=False):
def _get_remotes(self, job: Job, check_output_exists=False):
provenance_hash = self.provenance_hash_map.get_provenance_hash(job)
f = self.remote_provider.remote(
"{}/{}".format(self.cache_location, provenance_hash)
)
remote = get_flag_value(f, "remote_object")
remote._iofile = self.get_outputfile(job, check_exists=check_output_exists)
return remote
for outputfile, ext in self.get_outputfiles(job):
f = self.remote_provider.remote(
"{}/{}{}".format(self.cache_location, provenance_hash, ext)
)
remote = get_flag_value(f, "remote_object")
# set local copy of the remote file
remote._iofile = outputfile
yield remote
......@@ -165,7 +165,7 @@ def job_to_cwl(job, dag, outputs, inputs):
},
"in": {
"cores": {"default": job.threads},
"target_files": {"default": job.output.plainstrings()},
"target_files": {"default": job.output._plainstrings()},
"rules": {"default": [job.rule.name]},
},
"out": ["output_files"],
......
......@@ -27,7 +27,7 @@ from snakemake.exceptions import RemoteFileException, WorkflowError, ChildIOExce
from snakemake.exceptions import InputFunctionException
from snakemake.logging import logger
from snakemake.common import DYNAMIC_FILL
from snakemake import conda, singularity
from snakemake.deployment import conda, singularity
from snakemake.output_index import OutputIndex
from snakemake import workflow
......