Skip to content
Commits on Source (5)
[5.4.0] - 2018-12-18
====================
Added
-----
- Snakemake now allows for data-dependent conditional re-evaluation of the job DAG via checkpoints. This feature also deprecates the ``dynamic`` flag. See `the docs <https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#data-dependent-conditional-execution>`_.
[5.3.1] - 2018-12-06
====================
Changed
-------
- Various fixed bugs and papercuts, e.g., in group handling, kubernetes execution, singularity support, wrapper and script usage, benchmarking, schema validation.
[5.3.0] - 2018-09-18
====================
......@@ -26,8 +41,10 @@ Added
Changed
-------
- fixed permission issue when using script directive
- fixed various minor bugs and papercuts.
- Fixed permission issue when using the script directive. This is a breaking change
for scripts referring to files relative to the script directory (see the
`docs <https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#external-scripts>`__).
- Fixed various minor bugs and papercuts.
- Allow URL to local git repo with wrapper directive
(``git+file:///path/to/your/repo/path_to_file@@version``)
......@@ -126,7 +143,7 @@ Changed
====================
Added
=====
-----
- Group jobs for reduced queuing and network overhead, in particular
with short running jobs.
......@@ -135,7 +152,11 @@ Added
transferred directly without using disk.
- Command line flags to clean output files.
- Command line flag to list files in working directory that are not
tracked by Snakemake. # Changes
tracked by Snakemake.
Changed
-------
- Fix of --default-remote-prefix in case of input functions returning
lists or dicts.
- Scheduler no longer prefers jobs with many downstream jobs.
......@@ -144,7 +165,7 @@ Added
====================
Added
=====
-----
- Allow URLs for the conda directive. # Changed
- Various minor updates in the docs.
......@@ -162,7 +183,7 @@ Added
====================
Added
~~~~~
-----
- Integration with CWL: the ``cwl`` directive allows to use CWL tool
definitions in addition to shell commands or Snakemake wrappers.
......@@ -202,7 +223,7 @@ Changed
====================
Added
~~~~~
-----
- Input and output files can now tag pathlib objects. # ## Changed
- Various minor bug fixes.
......@@ -211,7 +232,7 @@ Added
====================
Added
~~~~~
-----
- iRODS remote provider # ## Changed
- Bug fix in shell usage of scripts and wrappers.
......@@ -221,10 +242,14 @@ Added
--------------------
Added
~~~~~
-----
- A new shadow mode (minimal) that only symlinks input files has been
added. # ## Changed
added.
Changed
-------
- The default shell is now bash on linux and macOS. If bash is not
installed, we fall back to sh. Previously, Snakemake used the default
shell of the user, which defeats the purpose of portability. If the
......@@ -239,10 +264,14 @@ Added
--------------------
Added
~~~~~
-----
- List all conda environments with their location on disk via
--list-conda-envs. # ## Changed
--list-conda-envs.
Changed
-------
- Do not clean up shadow on dry-run.
- Allow R wrappers.
......@@ -250,7 +279,7 @@ Added
--------------------
Added
~~~~~
-----
- GridFTP remote provider. This is a specialization of the GFAL remote
provider that uses globus-url-copy to download or upload files. # ##
......@@ -265,11 +294,15 @@ Added
--------------------
Added
~~~~~
-----
- Support for executing jobs in per-rule singularity images. This is
meant as an alternative to the conda directive (see docs), providing
even more guarantees for reproducibility. # ## Changed
even more guarantees for reproducibility.
Changed
-------
- In cluster mode, jobs that are still running after Snakemake has been
killed are automatically resumed.
- Various fixes to GFAL remote provider.
......@@ -280,7 +313,7 @@ Added
--------------------
Added
~~~~~
-----
- Support for configuration profiles. Profiles allow to specify default
options, e.g., a cluster submission command. They can be used via
......@@ -302,7 +335,7 @@ Added
--------------------
Added
~~~~~
-----
- Cloud computing support via Kubernetes. Snakemake workflows can be
executed transparently in the cloud, while storing input and output
......@@ -363,7 +396,7 @@ Changed
---------------------
Added
~~~~~
-----
- An NCBI remote provider. By this, you can seamlessly integrate any
NCBI resouce (reference genome, gene/protein sequences, ...) as input
......@@ -382,7 +415,7 @@ Added
---------------------
Added
~~~~~
-----
- Support for RMarkdown (.Rmd) in script directives.
- New option --debug-dag that prints all decisions while building the
......@@ -421,7 +454,7 @@ Changed
---------------------
Added
~~~~~
-----
- Param functions can now also refer to threads. # ## Changed
- Improved tutorial and docs.
......@@ -445,7 +478,7 @@ Changed
---------------------
Added
~~~~~
-----
- Workflows can now be archived to a tarball with
``snakemake --archive my-workflow.tar.gz``. The archive contains all
......@@ -466,7 +499,7 @@ Added
--------------------
Added
~~~~~
-----
- Jobs can be restarted upon failure (--restart-times). # ## Changed
- The docs have been restructured and improved. Now available under
......@@ -479,7 +512,7 @@ Added
--------------------
Added
~~~~~
-----
- Ability to define isolated conda software environments (YAML) per
rule. Environments will be deployed by Snakemake upon workflow
......@@ -518,7 +551,7 @@ Changed
--------------------
Added
~~~~~
-----
- Wildcards can now be constrained by rule and globally via the new
``wildcard_constraints`` directive (see the
......@@ -537,7 +570,10 @@ Added
quoted (see the
`docs <https://bitbucket.org/snakemake/snakemake/wiki/Documentation#markdown-header-rules>`__).
This is usefull when dealing with filenames that contain whitespaces.
# ## Changed
Changed
-------
- Snakemake now deletes output files before job exection. Further, it
touches output files after job execution. This solves various
problems with slow NFS filesystems.
......@@ -559,7 +595,7 @@ Changed
--------------------
Added
~~~~~
-----
- The entries in ``resources`` and the ``threads`` job attribute can
now be callables that must return ``int`` values.
......@@ -591,7 +627,7 @@ Changed
--------------------
Added
~~~~~
-----
- onstart handler, that allows to add code that shall be only executed
before the actual workflow execution (not on dryrun).
......@@ -612,7 +648,7 @@ Added
--------------------
Added
~~~~~
-----
- New experimental wrapper directive, which allows to refer to
re-usable `wrapper
......@@ -645,7 +681,7 @@ Changed
--------------------
Added
~~~~~
-----
- Support for easy integration of external R and Python scripts via the
new `script
......@@ -655,9 +691,12 @@ Added
Google Storage, FTP, SFTP, HTTP and Dropbox.
- Simon Ye has implemented support for sandboxing jobs with `shadow
rules <https://bitbucket.org/snakemake/snakemake/wiki/Documentation#markdown-header-shadow-rules>`__.
# ## Changed
Changed
-------
- Manuel Holtgrewe has fixed dynamic output files in combination with
mutliple wildcards.
multiple wildcards.
- It is now possible to add suffixes to all shell commands with
shell.suffix("mysuffix").
- Job execution has been refactored to spawn processes only when
......@@ -703,14 +742,18 @@ Changed
------------------
Added
~~~~~
-----
- This release adds support for executing jobs on clusters in
synchronous mode (e.g. qsub -sync). Thanks to David Alexander for
implementing this.
- There is now vim syntax highlighting support (thanks to Jay
Hesselberth).
- Snakemake is now available as Conda package. # ## Changed
- Snakemake is now available as Conda package.
Changed
-------
- Lots of bugs have been fixed. Thanks go to e.g. David Koppstein,
Marcel Martin, John Huddleston and Tao Wen for helping with useful
reports and debugging.
......
snakemake (5.4.0-1) UNRELEASED; urgency=medium
* Team upload.
* New upstream version.
New Build-Dep: python3-git
* Standards-Version: 4.3.0
-- Dylan Aïssi <daissi@debian.org> Thu, 03 Jan 2019 08:13:50 +0100
snakemake (5.3.0-1) unstable; urgency=medium
* Team upload.
......
......@@ -10,6 +10,7 @@ Build-Depends: debhelper (>= 11~),
python3-boto,
python3-configargparse,
python3-datrie,
python3-git,
python3-jsonschema,
python3-networkx,
python3-nose,
......@@ -28,7 +29,7 @@ Build-Depends: debhelper (>= 11~),
python3-wrapt,
python3-yaml,
r-cran-rmarkdown
Standards-Version: 4.2.1
Standards-Version: 4.3.0
Vcs-Browser: https://salsa.debian.org/med-team/snakemake
Vcs-Git: https://salsa.debian.org/med-team/snakemake.git
Homepage: https://bitbucket.org/snakemake/snakemake
......
......@@ -126,6 +126,7 @@ Publications using Snakemake
In the following you find an **incomplete list** of publications making use of Snakemake for their analyses.
Please consider to add your own.
* Doris et al. 2018. `Spt6 is required for the fidelity of promoter selection <https://doi.org/10.1016/j.molcel.2018.09.005>`_. Molecular Cell.
* Karlsson et al. 2018. `Four evolutionary trajectories underlie genetic intratumoral variation in childhood cancer <https://www.nature.com/articles/s41588-018-0131-y>`_. Nature Genetics.
* Planchard et al. 2018. `The translational landscape of Arabidopsis mitochondria <https://academic.oup.com/nar/advance-article/doi/10.1093/nar/gky489/5033161>`_. Nucleic acids research.
* Schult et al. 2018. `Effect of UV irradiation on Sulfolobus acidocaldarius and involvement of the general transcription factor TFB3 in the early UV response <https://academic.oup.com/nar/article/46/14/7179/5047281>`_. Nucleic acids research.
......
......@@ -75,7 +75,7 @@ This entails the pipefail option, which reports errors from within a pipe to out
.. code-block:: bash
set +o pipefile;
set +o pipefail;
to your shell command in the problematic rule.
......@@ -605,3 +605,29 @@ You should have a look if maybe you are missing some library or a certain compil
If everything seems fine, please report to the upstream developers of the failing dependency.
Note that in general it is recommended to install Snakemake via `Conda <https://conda.io>`_ which gives you precompiled packages and the additional benefit of having :ref:`automatic software deployment <integrated_package_management>` integrated into your workflow execution.
How to enable autocompletion for the zsh shell?
-----------------------------------------------
For users of the `Z shell <https://www.zsh.org/>`_ (zsh), just run the following (assuming an activated zsh) to activate autocompletion for snakemake:
.. code-block:: console
compdef _gnu_generic snakemake
Example:
Say you have forgotten how to use the various options starting ``force``, just type the partial match i.e. ``--force`` which results in a list of all potential hits along with a description:
.. code-block:: console
$snakemake --force**pressing tab**
--force -- Force the execution of the selected target or the
--force-use-threads -- Force threads rather than processes. Helpful if shared
--forceall -- Force the execution of the selected (or the first)
--forcerun -- (TARGET (TARGET ...)), -R (TARGET (TARGET ...))
To activate this autocompletion permanently, put this line in ``~/.zshrc``.
`Here <https://github.com/zsh-users/zsh-completions/blob/master/zsh-completions-howto.org>`_ is some further reading.
......@@ -111,11 +111,16 @@ This allows to create links between otherwise separate data analyses.
.. code-block:: python
subworkflow otherworkflow:
workdir: "../path/to/otherworkflow"
snakefile: "../path/to/otherworkflow/Snakefile"
workdir:
"../path/to/otherworkflow"
snakefile:
"../path/to/otherworkflow/Snakefile"
configfile:
"path/to/custom_configfile.yaml"
rule a:
input: otherworkflow("test.txt")
input:
otherworkflow("test.txt")
output: ...
shell: ...
......@@ -123,6 +128,7 @@ Here, the subworkflow is named "otherworkflow" and it is located in the working
The snakefile is in the same directory and called ``Snakefile``.
If ``snakefile`` is not defined for the subworkflow, it is assumed be located in the workdir location and called ``Snakefile``, hence, above we could have left the ``snakefile`` keyword out as well.
If ``workdir`` is not specified, it is assumed to be the same as the current one.
The (optional) definition of a ``configfile`` allows to parameterize the subworkflow as needed.
Files that are output from the subworkflow that we depend on are marked with the ``otherworkflow`` function (see the input of rule a).
This function automatically determines the absolute path to the file (here ``../path/to/otherworkflow/test.txt``).
......
......@@ -426,6 +426,8 @@ Apart from Python scripts, this mechanism also allows you to integrate R_ and R
In the R script, an S4 object named ``snakemake`` analog to the Python case above is available and allows access to input and output files and other parameters. Here the syntax follows that of S4 classes with attributes that are R lists, e.g. we can access the first input file with ``snakemake@input[[1]]`` (note that the first file does not have index ``0`` here, because R starts counting from ``1``). Named input and output files can be accessed in the same way, by just providing the name instead of an index, e.g. ``snakemake@input[["myfile"]]``.
For technical reasons, scripts are executed in ``.snakemake/scripts``. The original script directory is available as ``scriptdir`` in the ``snakemake`` object. A convenience method, ``snakemake@source()``, acts as a wrapper for the normal R ``source()`` function, and can be used to source files relative to the original script directory.
An example external Python script would could look like this:
.. code-block:: python
......@@ -530,8 +532,7 @@ Further, an output file marked as ``temp`` is deleted after all rules that use i
Directories as outputs
----------------------
There are situations where it can be convenient to have directories, rather than files, as outputs of a rule. For example, some tools generate different output files based on which settings they are run with. Rather than covering all these cases with conditional statements in the Snakemake rule, you can let the rule output a directory that contains all the output files regardless of settings. Another use case could be when the number of outputs is large or unknown, say one file per identified species in a metagenomics sample or one file per cluster from a clustering algorithm. If all downstream rules rely on the whole sets of outputs, rather than on the individual species/clusters, then having a directory as an output can be a faster and easier solution compared to using the ``dynamic`` keyword.
As of version 5.2.0, directories as outputs have to be explicitly marked with ``directory``. This is primarily for safety reasons; since all outputs are deleted before a job is executed, we don't want to risk deleting important directories if the user makes some mistake. Marking the output as ``directory`` makes the intent clear, and the output can be safely removed. Another reason comes down to how modification time for directories work. The modification time on a directory changes when a file or a subdirectory is added, removed or renamed. This can easily happen in not-quite-intended ways, such as when Apple macOS or MS Windows add ``.DS_Store`` or ``thumbs.db`` files to store parameters for how the directory contents should be displayed. When the ``directory`` flag is used, then a hidden file called ``.snakemake_timestamp`` is created in the output directory, and the modification time of that file is used when determining whether the rule output is up to date or if it needs to be rerun.
Sometimes it can be convenient to have directories, rather than files, as outputs of a rule. As of version 5.2.0, directories as outputs have to be explicitly marked with ``directory``. This is primarily for safety reasons; since all outputs are deleted before a job is executed, we don't want to risk deleting important directories if the user makes some mistake. Marking the output as ``directory`` makes the intent clear, and the output can be safely removed. Another reason comes down to how modification time for directories work. The modification time on a directory changes when a file or a subdirectory is added, removed or renamed. This can easily happen in not-quite-intended ways, such as when Apple macOS or MS Windows add ``.DS_Store`` or ``thumbs.db`` files to store parameters for how the directory contents should be displayed. When the ``directory`` flag is used a hidden file called ``.snakemake_timestamp`` is created in the output directory, and the modification time of that file is used when determining whether the rule output is up to date or if it needs to be rerun. Always consider if you can't formulate your workflow using normal files before resorting to using ``directory()``.
.. code-block:: python
......@@ -1038,3 +1039,174 @@ Naturally, a pipe output may only have a single consumer.
It is possible to combine explicit group definition as above with pipe outputs.
Thereby, pipe jobs can live within, or (automatically) extend existing groups.
However, the two jobs connected by a pipe may not exist in conflicting groups.
.. _snakefiles-checkpoints:
Data-dependent conditional execution
------------------------------------
From Snakemake 5.4 on, conditional reevaluation of the DAG of jobs based on the content outputs is possible.
The key idea is that rules can be declared as checkpoints, e.g.,
.. code-block:: python
checkpoint somestep:
input:
"samples/{sample}.txt"
output:
"somestep/{sample}.txt"
shell:
"somecommand {input} > {output}"
Snakemake allows to re-evaluate the DAG after the successful execution of every job spawned from a checkpoint.
For this, every checkpoint is registered by its name in a globally available ``checkpoints`` object.
The ``checkpoints`` object can be accessed by :ref:`input functions <snakefiles-input_functions>`.
Assuming that the checkpoint is named ``somestep`` as above, the output files for a particular job can be retrieved with
.. code-block:: python
checkpoints.somestep.get(sample="a").output
Thereby, the ``get`` method throws ``snakemake.exceptions.IncompleteCheckpointException`` if the checkpoint has not yet been executed for these particular wildcard value(s).
Inside an input function, the exception will be automatically handled by Snakemake, and leads to a re-evaluation after the checkpoint has been successfully passed.
To illustrate the possibilities of this mechanism, consider the following complete example:
.. code-block:: python
# a target rule to define the desired final output
rule all:
input:
"aggregated/a.txt",
"aggregated/b.txt"
# the checkpoint that shall trigger re-evaluation of the DAG
checkpoint somestep:
input:
"samples/{sample}.txt"
output:
"somestep/{sample}.txt"
shell:
# simulate some output vale
"echo {wildcards.sample} > somestep/{wildcards.sample}.txt"
# intermediate rule
rule intermediate:
input:
"somestep/{sample}.txt"
output:
"post/{sample}.txt"
shell:
"touch {output}"
# alternative intermediate rule
rule alt_intermediate:
input:
"somestep/{sample}.txt"
output:
"alt/{sample}.txt"
shell:
"touch {output}"
# input function for the rule aggregate
def aggregate_input(wildcards):
# decision based on content of output file
with open(checkpoints.somestep.get(sample=wildcards.sample).output[0]) as f:
if f.read().strip() == "a":
return "post/{sample}.txt"
else:
return "alt/{sample}.txt"
rule aggregate:
input:
aggregate_input
output:
"aggregated/{sample}.txt"
shell:
"touch {output}"
As can be seen, the rule aggregate uses an input function.
Inside the function, we first retrieve the output files of the checkpoint ``somestep`` with the wildcards, passing through the value of the wildcard sample.
Upon execution, if the checkpoint is not yet complete, Snakemake will record ``somestep`` as a direct dependency of the rule ``aggregate``.
Once ``somestep`` has finished for a given sample, the input function will automatically be re-evaluated and the method ``get`` will no longer return an exception.
Instead, the output file will be opened, and depending on its contents either ``"post/{sample}.txt"`` or ``"alt/{sample}.txt"`` will be returned by the input function.
This way, the DAG becomes conditional on some produced data.
It is also possible to use checkpoints for cases where the output files are unknown before execution.
A typical example is a clustering process with an unknown number of clusters, where each cluster shall be saved into a separate file.
Consider the following example:
.. code-block:: python
# a target rule to define the desired final output
rule all:
input:
"aggregated/a.txt",
"aggregated/b.txt"
# the checkpoint that shall trigger re-evaluation of the DAG
checkpoint clustering:
input:
"samples/{sample}.txt"
output:
clusters=directory("clustering/{sample}")
shell:
"mkdir clustering/{wildcards.sample}; "
"for i in 1 2 3; do echo $i > clustering/{wildcards.sample}/$i.txt; done"
# an intermediate rule
rule intermediate:
input:
"clustering/{sample}/{i}.txt"
output:
"post/{sample}/{i}.txt"
shell:
"cp {input} {output}"
def aggregate_input(wildcards):
checkpoint_output = checkpoints.clustering.get(**wildcards).output[0]
return expand("post/{sample}/{i}.txt",
sample=wildcards.sample,
i=glob_wildcards(os.path.join(checkpoint_output, "{i}.txt")).i)
# an aggregation over all produced clusters
rule aggregate:
input:
aggregate_input
output:
"aggregated/{sample}.txt"
shell:
"cat {input} > {output}"
Here, our checkpoint simulates a clustering.
We pretend that the number of clusters is unknown beforehand.
Hence, the checkpoint only defines an output ``directory``.
The rule ``aggregate`` again uses the ``checkpoints`` object to retrieve the output of the checkpoint.
This time, instead of explicitly writing
.. code-block:: python
checkpoints.clustering.get(sample=wildcards.sample).output[0]
we use the shorthand
.. code-block:: python
checkpoints.clustering.get(**wildcards).output[0]
which automatically unpacks the wildcards as keyword arguments (this is standard python argument unpacking).
If the checkpoint has not yet been executed, accessing ``checkpoints.clustering.get(**wildcards)`` ensure that Snakemake records the checkpoint as a direct dependency of the rule ``aggregate``.
Upon completion of the checkpoint, the input function is re-evaluated, and the code beyond its first line is executed.
Here, we retrieve the values of the wildcard ``i`` based on all files named ``{i}.txt`` in the output directory of the checkpoint.
These values are then used to expand the pattern ``"post/{sample}/{i}.txt"``, such that the rule ``intermediate`` is executed for each of the determined clusters.
This mechanism can be used to replace the use of the :ref:`dynamic-flag <snakefiles-dynamic_files>` which will be deprecated in Snakemake 6.0.
......@@ -66,7 +66,7 @@ The ``benchmark`` directive takes a string that points to the file where benchma
Similar to output files, the path can contain wildcards (it must be the same wildcards as in the output files).
When a job derived from the rule is executed, Snakemake will measure the wall clock time and memory usage (in MiB) and store it in the file in tab-delimited format.
It is possible to repeat a benchmark multiple times in order to get a sense for the variability of the measurements.
This can be done by annotating the benchmark file, e.g., with ``benchmark("benchmarks/{sample}.bwa.benchmark.txt", 3)`` Snakemake can be told to run the job three times.
This can be done by annotating the benchmark file, e.g., with ``repeat("benchmarks/{sample}.bwa.benchmark.txt", 3)`` Snakemake can be told to run the job three times.
The repeated measurements occur as subsequent lines in the tab-delimited benchmark file.
Modularization
......
......@@ -59,7 +59,7 @@ It serves as a blueprint for proteins, which form living cells, carry informatio
and drive chemical reactions. Differences between populations, species, cancer
cells and healthy tissue, as well as syndromes or diseases can be reflected and
sometimes caused by changes in the genome.
This makes the genome an major target of biological and medical research.
This makes the genome a major target of biological and medical research.
Today, it is often analyzed with DNA sequencing, producing gigabytes of data from
a single biological sample (e.g. a biopsy of some tissue).
For technical reasons, DNA sequencing cuts the DNA of a sample into millions
......
......@@ -1105,7 +1105,8 @@ def get_argument_parser(profile=None):
default="https://bitbucket.org/snakemake/snakemake-wrappers/raw/",
help="Prefix for URL created from wrapper directive (default: "
"https://bitbucket.org/snakemake/snakemake-wrappers/raw/). Set this to "
"a different URL to use your fork or a local clone of the repository."
"a different URL to use your fork or a local clone of the repository, "
"e.g., use a git URL like 'git+file://path/to/your/local/clone@'."
)
group_behavior.add_argument("--default-remote-provider",
choices=["S3", "GS", "FTP", "SFTP", "S3Mocked", "gfal", "gridftp", "iRODS"],
......
......@@ -23,9 +23,9 @@ def get_keywords():
# setup.py/versioneer.py will grep for the variable names, so they must
# each be defined on a line of their own. _version.py will just call
# get_keywords().
git_refnames = " (tag: v5.3.0)"
git_full = "a890ff6275ef9bfa8051033f055f18f252ffcac9"
git_date = "2018-09-18 16:14:53 +0200"
git_refnames = " (HEAD -> master, tag: v5.4.0)"
git_full = "b2d791223686295c9539b37967978234e2f45897"
git_date = "2018-12-18 12:38:46 +0000"
keywords = {"refnames": git_refnames, "full": git_full, "date": git_date}
return keywords
......
......@@ -178,6 +178,7 @@ class BenchmarkTimer(ScheduledPeriodicTimer):
rss, vms, uss, pss = 0, 0, 0, 0
# I/O measurements
io_in, io_out = 0, 0
check_io = True
# CPU seconds
cpu_seconds = 0
# Iterate over process and all children
......@@ -190,9 +191,14 @@ class BenchmarkTimer(ScheduledPeriodicTimer):
vms += meminfo.vms
uss += meminfo.uss
pss += meminfo.pss
if check_io:
try:
ioinfo = proc.io_counters()
io_in += ioinfo.read_bytes
io_out += ioinfo.write_bytes
except NotImplementedError as nie:
# OS doesn't track IO
check_io = False
if self.bench_record.prev_time:
cpu_seconds += proc.cpu_percent() / 100 * (
this_time - self.bench_record.prev_time)
......@@ -203,8 +209,12 @@ class BenchmarkTimer(ScheduledPeriodicTimer):
vms /= 1024 * 1024
uss /= 1024 * 1024
pss /= 1024 * 1024
if check_io:
io_in /= 1024 * 1024
io_out /= 1024 * 1024
else:
io_in = None
io_out = None
except psutil.Error as e:
return
# Update benchmark record's RSS and VMS
......
from snakemake.exceptions import IncompleteCheckpointException, WorkflowError
from snakemake.io import checkpoint_target
class Checkpoints:
""" A namespace for checkpoints so that they can be accessed via dot notation. """
def __init__(self):
self.future_output = None
def register(self, rule):
setattr(self, rule.name, Checkpoint(rule, self))
class Checkpoint:
__slots__ = ["rule", "checkpoints"]
def __init__(self, rule, checkpoints):
self.rule = rule
self.checkpoints = checkpoints
def get(self, **wildcards):
missing = self.rule.wildcard_names.difference(wildcards.keys())
if missing:
raise WorkflowError(
"Missing wildcard values for {}".format(", ".join(missing)))
output, _ = self.rule.expand_output(wildcards)
if (self.checkpoints.future_output is None or
any((not f.exists or f in self.checkpoints.future_output)
for f in output)):
raise IncompleteCheckpointException(self.rule, checkpoint_target(output[0]))
return CheckpointJob(self.rule, output)
class CheckpointJob:
__slots__ = ["rule", "output"]
def __init__(self, rule, output):
self.output = output.plainstrings()
self.rule = rule
......@@ -6,6 +6,7 @@ __license__ = "MIT"
from functools import update_wrapper
import inspect
import uuid
import os
from ._version import get_versions
......@@ -15,8 +16,7 @@ del get_versions
MIN_PY_VERSION = (3, 5)
DYNAMIC_FILL = "__snakemake_dynamic__"
SNAKEMAKE_SEARCHPATH = os.path.dirname(os.path.dirname(__file__))
UUID_NAMESPACE = uuid.uuid5(uuid.NAMESPACE_URL, "https://snakemake.readthedocs.io")
......@@ -45,6 +45,10 @@ class Mode:
class lazy_property(property):
__slots__ = ["method", "cached", "__doc__"]
@staticmethod
def clean(instance, method):
delattr(instance, method)
def __init__(self, method):
self.method = method
self.cached = "_{}".format(method.__name__)
......
......@@ -112,6 +112,8 @@ class Env:
def create_archive(self):
"""Create self-contained archive of environment."""
from snakemake.shell import shell
try:
import yaml
except ImportError:
......@@ -129,8 +131,8 @@ class Env:
logger.info("Downloading packages for conda environment {}...".format(self.file))
os.makedirs(env_archive, exist_ok=True)
try:
out = subprocess.check_output(["conda", "list", "--explicit",
"--prefix", self.path],
out = shell.check_output(
"conda list --explicit --prefix {}".format(self.path),
stderr=subprocess.STDOUT)
logger.debug(out.decode())
except subprocess.CalledProcessError as e:
......@@ -166,6 +168,8 @@ class Env:
def create(self, dryrun=False):
""" Create the conda enviroment."""
from snakemake.shell import shell
if self._singularity_img:
check_conda(self._singularity_img)
......@@ -183,6 +187,15 @@ class Env:
env_hash = self.hash
env_path = self.path
# Check for broken environment
if os.path.exists(os.path.join(env_path,"env_setup_start")) and not os.path.exists(os.path.join(env_path,"env_setup_done")):
if dryrun:
logger.info("Incomplete Conda environment {} will be recreated.".format(utils.simplify_path(self.file)))
else:
logger.info("Removing incomplete Conda environment {}...".format(utils.simplify_path(self.file)))
shutil.rmtree(env_path, ignore_errors=True)
# Create environment if not already present.
if not os.path.exists(env_path):
if dryrun:
......@@ -193,6 +206,11 @@ class Env:
# Check if env archive exists. Use that if present.
env_archive = self.archive_file
try:
# Touch "start" flag file
os.makedirs(env_path, exist_ok=True)
with open(os.path.join(env_path,"env_setup_start"), "a") as f:
pass
if os.path.exists(env_archive):
logger.info("Using archived local conda packages.")
pkg_list = os.path.join(env_archive, "packages.txt")
......@@ -212,8 +230,7 @@ class Env:
packages)
if self._singularity_img:
cmd = singularity.shellcmd(self._singularity_img.path, cmd)
out = subprocess.check_output(cmd, shell=True,
stderr=subprocess.STDOUT)
out = shell.check_output(cmd, stderr=subprocess.STDOUT)
else:
# Copy env file to env_path (because they can be on
......@@ -229,8 +246,11 @@ class Env:
"--prefix", env_path])
if self._singularity_img:
cmd = singularity.shellcmd(self._singularity_img.path, cmd)
out = subprocess.check_output(cmd, shell=True,
stderr=subprocess.STDOUT)
out = shell.check_output(cmd, stderr=subprocess.STDOUT)
# Touch "done" flag file
with open(os.path.join(env_path,"env_setup_done"), "a") as f:
pass
logger.debug(out.decode())
logger.info("Environment for {} created (location: {})".format(
os.path.relpath(env_file), os.path.relpath(env_path)))
......@@ -262,6 +282,8 @@ def shellcmd(env_path):
def check_conda(singularity_img=None):
from snakemake.shell import shell
def get_cmd(cmd):
if singularity_img:
return singularity.shellcmd(singularity_img.path, cmd)
......@@ -270,21 +292,30 @@ def check_conda(singularity_img=None):
try:
# Use type here since conda now is a function.
# type allows to check for both functions and regular commands.
subprocess.check_output(get_cmd("type conda"),
shell=True,
stderr=subprocess.STDOUT)
shell.check_output(get_cmd("type conda"), stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
if singularity_img:
raise CreateCondaEnvironmentException("The 'conda' command is not "
"available inside "
"your singularity container "
"image.")
"image. Make sure that "
"conda is properly installed "
"inside your container "
"image. For example use "
"continuumio/miniconda3 as a "
"base image.")
else:
raise CreateCondaEnvironmentException("The 'conda' command is not "
"available.")
"available in the "
"shell {} that will be "
"used by Snakemake. You have "
"to ensure that it is in your "
"PATH, e.g., first activating "
"the conda base environment "
"with `conda activate base`.".format(
shell.get_executable()))
try:
version = subprocess.check_output(get_cmd("conda --version"),
shell=True,
version = shell.check_output(get_cmd("conda --version"),
stderr=subprocess.STDOUT).decode() \
.split()[1]
if StrictVersion(version) < StrictVersion("4.2"):
......
......@@ -31,6 +31,7 @@ from snakemake.common import DYNAMIC_FILL
from snakemake import conda, singularity
from snakemake import utils
from snakemake.output_index import OutputIndex
from snakemake import workflow
class DAG:
......@@ -150,6 +151,19 @@ class DAG:
for i, job in enumerate(self.jobs):
job.is_valid()
@property
def checkpoint_jobs(self):
for job in self.needrun_jobs:
if job.is_checkpoint:
yield job
def is_checkpoint_output(self, f):
return f in self._checkpoint_outputs
def update_checkpoint_outputs(self):
workflow.checkpoints.future_output = set(f for job in self.checkpoint_jobs
for f in job.output)
def update_jobids(self):
for job in self.jobs:
if job not in self._jobid:
......@@ -802,18 +816,18 @@ class DAG:
if job.group is None:
continue
stop = lambda j: j.group != job.group
# BFS into depending jobs if in same group
# Note: never go up here (into depending), because it may contain
# jobs that have been sorted out due to e.g. ruleorder.
group = GroupJob(job.group,
chain(self.bfs(self.depending, job, stop=stop),
self.bfs(self.dependencies, job, stop=stop)))
self.bfs(self.dependencies, job, stop=stop))
# merge with previously determined group if present
# merge with previously determined groups if present
for j in group:
if j in groups:
other = groups[j]
other.merge(group)
group = other
break
# update assignment
for j in group:
if j not in groups:
......@@ -869,6 +883,7 @@ class DAG:
self.update_groups()
self.update_ready()
self.close_remote_objects()
self.update_checkpoint_outputs()
def handle_pipes(self):
"""Use pipes to determine job groups. Check if every pipe has exactly
......@@ -964,6 +979,18 @@ class DAG:
self._finished.update(jobs)
if update_dynamic:
self.update_checkpoint_outputs()
for job in jobs:
if job.is_checkpoint:
depending = list(self.depending[job])
# re-evaluate depending jobs, replace and update DAG
for j in depending:
logger.info("Updating job {}.".format(self.jobid(j)))
newjob = j.updated()
self.replace_job(j, newjob, recursive=False)
self.postprocess()
# mark depending jobs as ready
# skip jobs that are marked as until jobs
self.update_ready(j for job in jobs for j in self.depending[job]
......@@ -982,8 +1009,6 @@ class DAG:
self.postprocess()
self.handle_protected(newjob)
self.handle_touch(newjob)
# add finished jobs to len as they are not counted after new postprocess
self._len += len(self._finished)
def new_job(self, rule, targetfile=None, format_wildcards=None):
"""Create new job for given rule and (optional) targetfile.
......@@ -1071,7 +1096,7 @@ class DAG:
if job in self._ready_jobs:
self._ready_jobs.remove(job)
def replace_job(self, job, newjob):
def replace_job(self, job, newjob, recursive=True):
"""Replace given job with new job."""
if job in self.targetjobs:
self.targetjobs.remove(job)
......@@ -1080,7 +1105,7 @@ class DAG:
if self.finished(job):
self._finished.add(newjob)
self.delete_job(job)
self.delete_job(job, recursive=recursive)
self.update([newjob])
logger.debug("Replace {} with dynamic branch {}".format(job, newjob))
......@@ -1103,7 +1128,7 @@ class DAG:
# use a set to circumvent multiple jobs for the same file
# if user specified it twice
file2jobs = self.file2jobs
for file in set(job.input):
for file in job.unique_input:
# omit the file if it comes from a subworkflow
if file in job.subworkflow_input:
continue
......
......@@ -377,3 +377,10 @@ class CreateCondaEnvironmentException(WorkflowError):
class SpawnedJobError(Exception):
pass
class IncompleteCheckpointException(Exception):
def __init__(self, rule, targetfile):
self.rule = rule
from snakemake.io import checkpoint_target
self.targetfile = checkpoint_target(targetfile)
......@@ -35,7 +35,7 @@ from snakemake.io import get_wildcard_names, Wildcards
from snakemake.exceptions import print_exception, get_exception_origin
from snakemake.exceptions import format_error, RuleException, log_verbose_traceback
from snakemake.exceptions import ClusterJobException, ProtectedOutputException, WorkflowError, ImproperShadowException, SpawnedJobError
from snakemake.common import Mode, __version__, get_container_image
from snakemake.common import Mode, __version__, get_container_image, get_uuid
def sleep():
......@@ -1110,7 +1110,14 @@ class KubernetesExecutor(ClusterExecutor):
continue
self.kubeapi.create_namespaced_secret(self.namespace, secret)
def unregister_secret(self):
import kubernetes.client
self.kubeapi.delete_namespaced_secret(self.run_namespace,
self.namespace,
kubernetes.client.V1DeleteOptions())
def shutdown(self):
self.unregister_secret()
super().shutdown()
def cancel(self):
......@@ -1132,11 +1139,14 @@ class KubernetesExecutor(ClusterExecutor):
exec_job = self.format_job(
self.exec_job, job, _quote_all=True, rules=job.rules,
use_threads="--force-use-threads" if not job.is_group() else "")
jobid = "snakejob-{}-{}-{}".format(
self.run_namespace, job.jobid, job.attempt)
# Kubernetes silently does not submit a job if the name is too long
# therefore, we ensure that it is not longer than snakejob+uuid.
jobid = "snakejob-{}".format(
get_uuid("{}-{}-{}".format(
self.run_namespace, job.jobid, job.attempt)))
body = kubernetes.client.V1Pod()
body.metadata = kubernetes.client.V1ObjectMeta()
body.metadata = kubernetes.client.V1ObjectMeta(labels={"app": "snakemake"})
body.metadata.name = jobid
# container
......@@ -1196,12 +1206,12 @@ class KubernetesExecutor(ClusterExecutor):
# This way, we should be able to saturate the node without exceeding it
# too much.
container.resources.requests["cpu"] = job.resources["_cores"] - 1
if "mem_mb" in job.resources:
if "mem_mb" in job.resources.keys():
container.resources.requests["memory"] = "{}M".format(
job.resources["mem_mb"])
# capabilities
if job.singularity_img and self.workflow.use_singularity:
if job.needs_singularity and self.workflow.use_singularity:
# TODO this should work, but it doesn't currently because of
# missing loop devices
# singularity inside docker requires SYS_ADMIN capabilities
......@@ -1236,7 +1246,13 @@ class KubernetesExecutor(ClusterExecutor):
# Unauthorized.
# Reload config in order to ensure token is
# refreshed. Then try again.
logger.info("trying to reauthenticate")
kubernetes.config.load_kube_config()
subprocess.run(['kubectl','get','nodes'])
self.kubeapi = kubernetes.client.CoreV1Api()
self.batchapi = kubernetes.client.BatchV1Api()
self.register_secret()
try:
return func()
except kubernetes.client.rest.ApiException as e:
......@@ -1271,8 +1287,15 @@ class KubernetesExecutor(ClusterExecutor):
except WorkflowError as e:
print_exception(e, self.workflow.linemaps)
j.error_callback(j.job)
continue
if res.status.phase == "Failed":
if res is None:
msg = ("Unknown pod {jobid}. "
"Has the pod been deleted "
"manually?").format(jobid=j.jobid)
self.print_job_error(j.job, msg=msg, jobid=j.jobid)
j.error_callback(j.job)
elif res.status.phase == "Failed":
msg = ("For details, please issue:\n"
"kubectl describe pod {jobid}\n"
"kubectl logs {jobid}").format(jobid=j.jobid)
......@@ -1322,7 +1345,7 @@ def run_wrapper(job_rule, input, output, params, wildcards, threads, resources,
with change_working_directory(shadow_dir):
if benchmark:
bench_records = []
for i in range(benchmark_repeats):
for bench_iteration in range(benchmark_repeats):
# Determine whether to benchmark this process or do not
# benchmarking at all. We benchmark this process unless the
# execution is done through the ``shell:``, ``script:``, or
......@@ -1337,7 +1360,7 @@ def run_wrapper(job_rule, input, output, params, wildcards, threads, resources,
run(input, output, params, wildcards, threads, resources,
log, version, rule, conda_env, singularity_img,
singularity_args, use_singularity, bench_record,
jobid, is_shell)
jobid, is_shell, bench_iteration)
else:
# The benchmarking is started here as we have a run section
# and the generated Python function is executed in this
......@@ -1346,13 +1369,13 @@ def run_wrapper(job_rule, input, output, params, wildcards, threads, resources,
run(input, output, params, wildcards, threads, resources,
log, version, rule, conda_env, singularity_img,
singularity_args, use_singularity,
bench_record, jobid, is_shell)
bench_record, jobid, is_shell, bench_iteration)
# Store benchmark record for this iteration
bench_records.append(bench_record)
else:
run(input, output, params, wildcards, threads, resources,
log, version, rule, conda_env, singularity_img,
singularity_args, use_singularity, None, jobid, is_shell)
singularity_args, use_singularity, None, jobid, is_shell, None)
except (KeyboardInterrupt, SystemExit) as e:
# Re-raise the keyboard interrupt in order to record an error in the
# scheduler but ignore it
......
......@@ -4,6 +4,7 @@ __email__ = "koester@jimmy.harvard.edu"
__license__ = "MIT"
import collections
import git
import os
import shutil
from pathlib import Path
......@@ -710,6 +711,8 @@ def dynamic(value):
A flag for a file that shall be dynamic, i.e. the multiplicity
(and wildcard values) will be expanded after a certain
rule has been run """
logger.warning("Dynamic output is deprecated in favor of checkpoints (see docs). "
"It will be removed in Snakemake 6.0.")
annotated = flag(value, "dynamic", True)
tocheck = [annotated] if not_iterable(annotated) else annotated
for file in tocheck:
......@@ -736,6 +739,10 @@ def repeat(value, n_repeat):
return flag(value, "repeat", n_repeat)
def checkpoint_target(value):
return flag(value, "checkpoint_target")
ReportObject = namedtuple("ReportObject", ["caption", "category"])
......@@ -870,6 +877,7 @@ def update_wildcard_constraints(pattern,
def split_git_path(path):
file_sub = re.sub("^git\+file:/+",'/',path)
(file_path, version) = file_sub.split("@")
file_path = os.path.realpath(file_path)
root_path = get_git_root(file_path)
if file_path.startswith(root_path):
file_path = file_path[len(root_path):].lstrip("/")
......@@ -884,13 +892,35 @@ def get_git_root(path):
path to root folder for git repo
"""
try:
import git
except ImportError as e:
raise WorkflowError("The Python package gitpython has to be installed.", e)
git_repo = git.Repo(path, search_parent_directories=True)
return git_repo.git.rev_parse("--show-toplevel")
except git.exc.NoSuchPathError as e:
tail,head = os.path.split(path)
return get_git_root_parent_directory(tail,path)
def get_git_root_parent_directory(path, input_path):
"""
This function will recursively go through parent directories until a git
repository is found or until no parent directories are left, in which case
a error will be raised. This is needed when providing a path to a
file/folder that is located on a branch/tag no currently checked out.
Args:
path: (str) Path a to a directory that is located inside the repo
input_path: (str) origin path, used when raising WorkflowError
Returns:
path to root folder for git repo
"""
try:
git_repo = git.Repo(path, search_parent_directories=True)
return git_repo.git.rev_parse("--show-toplevel")
except git.exc.NoSuchPathError as e:
tail,head = os.path.split(path)
if tail is None:
raise WorkflowError("Neither provided git path ({}) ".format(input_path) +
"or parent directories contain a valid git repo.")
else:
return get_git_root_parent_directory(tail, input_path)
def git_content(git_file):
"""
......@@ -906,8 +936,6 @@ def git_content(git_file):
"""
if git_file.startswith("git+file:"):
(root_path, file_path, version) = split_git_path(git_file)
# split_git_path does the import check, hence it is safe to import git here
import git
return git.Repo(root_path).git.show('{}:{}'.format(version, file_path))
else:
raise WorkflowError("Provided git path ({}) doesn't meet the "
......
......@@ -12,7 +12,7 @@ import subprocess
import json
from collections import defaultdict
from itertools import chain
from itertools import chain, filterfalse
from functools import partial
from operator import attrgetter
from urllib.request import urlopen
......@@ -33,6 +33,8 @@ def format_files(job, io, dynamicio):
yield "{} (dynamic)".format(f.format_dynamic())
elif is_flagged(f, "pipe"):
yield "{} (pipe)".format(f)
elif is_flagged(f, "checkpoint_target"):
yield "<unknown>"
else:
yield f
......@@ -142,6 +144,11 @@ class Job(AbstractJob):
for wildcard_value in self.wildcards_dict.values():
self._hash ^= wildcard_value.__hash__()
def updated(self):
return Job(self.rule, self.dag,
wildcards_dict=self.wildcards_dict,
targetfile=self.targetfile)
def is_valid(self):
"""Check if job is valid"""
# these properties have to work in dry-run as well. Hence we check them here:
......@@ -251,6 +258,10 @@ class Job(AbstractJob):
return self.conda_env.create_archive()
return None
@property
def needs_singularity(self):
return self.singularity_img is not None
@property
def singularity_img_url(self):
return self.rule.singularity_img
......@@ -466,6 +477,14 @@ class Job(AbstractJob):
if not f.is_remote:
yield f
@property
def unique_input(self):
seen = set()
for element in filterfalse(seen.__contains__, self.input):
seen.add(element)
yield element
@property
def local_output(self):
for f in self.output:
......@@ -703,7 +722,8 @@ class Job(AbstractJob):
resources=self.resources,
log=self.log,
version=self.rule.version,
rule=self.rule.name, ))
rule=self.rule.name,
bench_iteration=None))
_variables.update(variables)
try:
return format(string, **_variables)
......@@ -774,7 +794,7 @@ class Job(AbstractJob):
def is_group(self):
return False
def log_info(self, skip_dynamic=False, indent=False):
def log_info(self, skip_dynamic=False, indent=False, printshellcmd=True):
# skip dynamic jobs that will be "executed" only in dryrun mode
if skip_dynamic and self.dag.dynamic(self):
return
......@@ -796,7 +816,9 @@ class Job(AbstractJob):
priority="highest"
if priority == Job.HIGHEST_PRIORITY else priority,
threads=self.threads,
indent=indent)
indent=indent,
is_checkpoint=self.rule.is_checkpoint,
printshellcmd=printshellcmd)
logger.shellcmd(self.shellcmd, indent=indent)
if self.dynamic_output:
......@@ -911,6 +933,10 @@ class Job(AbstractJob):
def restart_times(self):
return self.rule.restart_times
@property
def is_checkpoint(self):
return self.rule.is_checkpoint
def __len__(self):
return 1
......@@ -966,6 +992,9 @@ class GroupJob(AbstractJob):
def is_group(self):
return True
def is_checkpoint(self):
return any(job.is_checkpoint for job in self.jobs)
def log_info(self, skip_dynamic=False):
logger.group_info(groupid=self.groupid)
for job in sorted(self.jobs, key=lambda j: j.rule.name):
......@@ -1011,9 +1040,10 @@ class GroupJob(AbstractJob):
if self._resources is None:
self._resources = defaultdict(int)
# take the maximum over all jobs
pipe_group = any([any([is_flagged(o, "pipe") for o in job.output]) for job in self.jobs])
for job in self.jobs:
for res, value in job.resources.items():
if self.dag.workflow.run_local:
if self.dag.workflow.run_local or pipe_group:
# in case of local execution, this must be a
# group of jobs that are connected with pipes
# and have to run simultaneously
......@@ -1021,7 +1051,7 @@ class GroupJob(AbstractJob):
else:
self._resources[res] = max(self._resources.get(res, value),
value)
return self._resources
return Resources(fromdict=self._resources)
@property
def input(self):
......@@ -1058,6 +1088,7 @@ class GroupJob(AbstractJob):
"local": self.is_local,
"input": self.input,
"output": self.output,
"threads": self.threads,
"resources": resources,
"jobid": self.jobid
}
......@@ -1159,6 +1190,10 @@ class GroupJob(AbstractJob):
def is_branched(self):
return any(job.is_branched for job in self.jobs)
@property
def needs_singularity(self):
return any(job.needs_singularity for job in self.jobs)
@property
def rules(self):
return [job.rule.name for job in self.jobs]
......