Skip to content
Commits on Source (4)
......@@ -27,7 +27,8 @@ MODULE=cwltool
# `[[` conditional expressions.
PYSOURCES=$(wildcard ${MODULE}/**.py tests/*.py) setup.py
DEVPKGS=pep8 diff_cover autopep8 pylint coverage pydocstyle flake8 pytest isort mock
DEBDEVPKGS=pep8 python-autopep8 pylint python-coverage pydocstyle sloccount python-flake8 python-mock
DEBDEVPKGS=pep8 python-autopep8 pylint python-coverage pydocstyle sloccount \
python-flake8 python-mock shellcheck
VERSION=1.0.$(shell date +%Y%m%d%H%M%S --date=`git log --first-parent \
--max-count=1 --format=format:%cI`)
mkfile_dir := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
......@@ -56,7 +57,7 @@ install: FORCE
dist: dist/${MODULE}-$(VERSION).tar.gz
dist/${MODULE}-$(VERSION).tar.gz: $(SOURCES)
./setup.py sdist
./setup.py sdist bdist_wheel
## clean : clean up all temporary / machine-generated files
clean: FORCE
......
Metadata-Version: 1.1
Name: cwltool
Version: 1.0.20180211183944
Version: 1.0.20180225105849
Summary: Common workflow language reference implementation
Home-page: https://github.com/common-workflow-language/cwltool
Author: Common workflow language working group
......@@ -149,6 +149,16 @@ Description: ==================================================================
cwltool --user-space-docker-cmd=dx-docker https://raw.githubusercontent.com/common-workflow-language/common-workflow-language/master/v1.0/v1.0/test-cwl-out2.cwl https://github.com/common-workflow-language/common-workflow-language/blob/master/v1.0/v1.0/empty.json
``cwltool`` can use `Singularity <http://singularity.lbl.gov/>`_ as a Docker container runtime, an experimental feature.
Singularity will run software containers specified in ``DockerRequirement`` and therefore works with Docker images only,
native Singularity images are not supported.
To use Singularity as the Docker container runtime, provide ``--singularity`` command line option to ``cwltool``.
.. code:: bash
cwltool --singularity https://raw.githubusercontent.com/common-workflow-language/common-workflow-language/master/v1.0/v1.0/v1.0/cat3-tool-mediumcut.cwl https://github.com/common-workflow-language/common-workflow-language/blob/master/v1.0/v1.0/cat-job.json
Tool or workflow loading from remote or local locations
-------------------------------------------------------
......
......@@ -139,6 +139,16 @@ or
cwltool --user-space-docker-cmd=dx-docker https://raw.githubusercontent.com/common-workflow-language/common-workflow-language/master/v1.0/v1.0/test-cwl-out2.cwl https://github.com/common-workflow-language/common-workflow-language/blob/master/v1.0/v1.0/empty.json
``cwltool`` can use `Singularity <http://singularity.lbl.gov/>`_ as a Docker container runtime, an experimental feature.
Singularity will run software containers specified in ``DockerRequirement`` and therefore works with Docker images only,
native Singularity images are not supported.
To use Singularity as the Docker container runtime, provide ``--singularity`` command line option to ``cwltool``.
.. code:: bash
cwltool --singularity https://raw.githubusercontent.com/common-workflow-language/common-workflow-language/master/v1.0/v1.0/v1.0/cat3-tool-mediumcut.cwl https://github.com/common-workflow-language/common-workflow-language/blob/master/v1.0/v1.0/cat-job.json
Tool or workflow loading from remote or local locations
-------------------------------------------------------
......
Metadata-Version: 1.1
Name: cwltool
Version: 1.0.20180211183944
Version: 1.0.20180225105849
Summary: Common workflow language reference implementation
Home-page: https://github.com/common-workflow-language/cwltool
Author: Common workflow language working group
......@@ -149,6 +149,16 @@ Description: ==================================================================
cwltool --user-space-docker-cmd=dx-docker https://raw.githubusercontent.com/common-workflow-language/common-workflow-language/master/v1.0/v1.0/test-cwl-out2.cwl https://github.com/common-workflow-language/common-workflow-language/blob/master/v1.0/v1.0/empty.json
``cwltool`` can use `Singularity <http://singularity.lbl.gov/>`_ as a Docker container runtime, an experimental feature.
Singularity will run software containers specified in ``DockerRequirement`` and therefore works with Docker images only,
native Singularity images are not supported.
To use Singularity as the Docker container runtime, provide ``--singularity`` command line option to ``cwltool``.
.. code:: bash
cwltool --singularity https://raw.githubusercontent.com/common-workflow-language/common-workflow-language/master/v1.0/v1.0/v1.0/cat3-tool-mediumcut.cwl https://github.com/common-workflow-language/common-workflow-language/blob/master/v1.0/v1.0/cat-job.json
Tool or workflow loading from remote or local locations
-------------------------------------------------------
......
......@@ -9,6 +9,7 @@ cwltool/__init__.py
cwltool/__main__.py
cwltool/argparser.py
cwltool/builder.py
cwltool/command_line_tool.py
cwltool/cwlNodeEngine.js
cwltool/cwlNodeEngineJSConsole.js
cwltool/cwlrdf.py
......@@ -16,12 +17,14 @@ cwltool/docker.py
cwltool/docker_id.py
cwltool/draft2tool.py
cwltool/errors.py
cwltool/executors.py
cwltool/expression.py
cwltool/extensions.yml
cwltool/factory.py
cwltool/flatten.py
cwltool/job.py
cwltool/load_tool.py
cwltool/loghandler.py
cwltool/main.py
cwltool/mutation.py
cwltool/pack.py
......@@ -29,6 +32,7 @@ cwltool/pathmapper.py
cwltool/process.py
cwltool/resolver.py
cwltool/sandboxjs.py
cwltool/singularity.py
cwltool/software_requirements.py
cwltool/stdfsaccess.py
cwltool/update.py
......@@ -173,9 +177,11 @@ tests/test_examples.py
tests/test_ext.py
tests/test_fetch.py
tests/test_http_input.py
tests/test_iwdr.py
tests/test_js_sandbox.py
tests/test_override.py
tests/test_pack.py
tests/test_parallel.py
tests/test_pathmapper.py
tests/test_rdfprint.py
tests/test_relax_path_checks.py
......@@ -193,6 +199,7 @@ tests/tmp1/tmp2/tmp3/.gitkeep
tests/wf/badout1.cwl
tests/wf/badout2.cwl
tests/wf/badout3.cwl
tests/wf/cat-tool.cwl
tests/wf/cat.cwl
tests/wf/count-lines1-wf.cwl
tests/wf/default_path.cwl
......@@ -204,12 +211,14 @@ tests/wf/formattest.cwl
tests/wf/hello-workflow.cwl
tests/wf/hello.txt
tests/wf/hello_single_tool.cwl
tests/wf/iwdr-entry.cwl
tests/wf/js_output.cwl
tests/wf/js_output_workflow.cwl
tests/wf/listing_deep.cwl
tests/wf/listing_none.cwl
tests/wf/listing_shallow.cwl
tests/wf/listing_v1_0.cwl
tests/wf/malformed_outputs.cwl
tests/wf/missing_cwlVersion.cwl
tests/wf/mut.cwl
tests/wf/mut2.cwl
......@@ -218,13 +227,17 @@ tests/wf/parseInt-tool.cwl
tests/wf/revsort-job.json
tests/wf/revsort.cwl
tests/wf/revtool.cwl
tests/wf/scatter-job2.json
tests/wf/scatter-wf4.cwl
tests/wf/scatterfail.cwl
tests/wf/separate_without_prefix.cwl
tests/wf/sorttool.cwl
tests/wf/updatedir.cwl
tests/wf/updatedir_inplace.cwl
tests/wf/updateval.cwl
tests/wf/updateval.py
tests/wf/updateval_inplace.cwl
tests/wf/vf-concat.cwl
tests/wf/wc-job.json
tests/wf/wc-tool.cwl
tests/wf/wffail.cwl
......
......@@ -7,6 +7,7 @@ import os
from typing import (Any, AnyStr, Dict, List, Sequence, Text, Union, cast)
from . import loghandler
from schema_salad.ref_resolver import file_uri
from .process import (Process, shortname)
from .resolver import ga4gh_tool_registries
......@@ -14,30 +15,29 @@ from .software_requirements import (SOFTWARE_REQUIREMENTS_ENABLED)
_logger = logging.getLogger("cwltool")
defaultStreamHandler = logging.StreamHandler()
_logger.addHandler(defaultStreamHandler)
_logger.setLevel(logging.INFO)
DEFAULT_TMP_PREFIX = "tmp"
def arg_parser(): # type: () -> argparse.ArgumentParser
parser = argparse.ArgumentParser(description='Reference executor for Common Workflow Language')
parser = argparse.ArgumentParser(
description='Reference executor for Common Workflow Language standards.')
parser.add_argument("--basedir", type=Text)
parser.add_argument("--outdir", type=Text, default=os.path.abspath('.'),
help="Output directory, default current directory")
parser.add_argument("--no-container", action="store_false", default=True,
help="Do not execute jobs in a Docker container, even when specified by the CommandLineTool",
dest="use_container")
parser.add_argument("--preserve-environment", type=Text, action="append",
help="Preserve specific environment variable when running CommandLineTools. May be provided multiple times.",
metavar="ENVVAR",
default=["PATH"],
parser.add_argument("--parallel", action="store_true", default=False,
help="[experimental] Run jobs in parallel. "
"Does not currently keep track of ResourceRequirements like the number of cores"
"or memory and can overload this system")
envgroup = parser.add_mutually_exclusive_group()
envgroup.add_argument("--preserve-environment", type=Text, action="append",
help="Preserve specific environment variable when "
"running CommandLineTools. May be provided multiple "
"times.", metavar="ENVVAR", default=["PATH"],
dest="preserve_environment")
parser.add_argument("--preserve-entire-environment", action="store_true",
help="Preserve entire parent environment when running CommandLineTools.",
default=False,
envgroup.add_argument("--preserve-entire-environment", action="store_true",
help="Preserve all environment variable when running "
"CommandLineTools.", default=False,
dest="preserve_entire_environment")
exgroup = parser.add_mutually_exclusive_group()
......@@ -72,12 +72,12 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
parser.add_argument("--tmpdir-prefix", type=Text,
help="Path prefix for temporary directories",
default="tmp")
default=DEFAULT_TMP_PREFIX)
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--tmp-outdir-prefix", type=Text,
help="Path prefix for intermediate output directories",
default="tmp")
default=DEFAULT_TMP_PREFIX)
exgroup.add_argument("--cachedir", type=Text, default="",
help="Directory to cache intermediate workflow outputs to avoid recomputing steps.")
......@@ -141,7 +141,7 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
default=True, dest="strict")
parser.add_argument("--skip-schemas", action="store_true",
help="Skip loading of schemas", default=True, dest="skip_schemas")
help="Skip loading of schemas", default=False, dest="skip_schemas")
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--verbose", action="store_true", help="Default logging")
......@@ -152,10 +152,22 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
"timestamps to the errors, warnings, and "
"notifications.")
parser.add_argument("--js-console", action="store_true", help="Enable javascript console output")
parser.add_argument("--user-space-docker-cmd",
dockergroup = parser.add_mutually_exclusive_group()
dockergroup.add_argument("--user-space-docker-cmd", metavar="CMD",
help="(Linux/OS X only) Specify a user space docker "
"command (like udocker or dx-docker) that will be "
"used to call 'pull' and 'run'")
dockergroup.add_argument("--singularity", action="store_true",
default=False, help="[experimental] Use "
"Singularity runtime for running containers. "
"Requires Singularity v2.3.2+ and Linux with kernel "
"version v3.18+ or with overlayfs support "
"backported.")
dockergroup.add_argument("--no-container", action="store_false",
default=True, help="Do not execute jobs in a "
"Docker container, even when `DockerRequirement` "
"is specified under `hints`.",
dest="use_container")
dependency_resolvers_configuration_help = argparse.SUPPRESS
dependencies_directory_help = argparse.SUPPRESS
......@@ -190,7 +202,7 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
parser.add_argument("--default-container",
help="Specify a default docker container that will be used if the workflow fails to specify one.")
parser.add_argument("--no-match-user", action="store_true",
help="Disable passing the current uid to 'docker run --user`")
help="Disable passing the current uid to `docker run --user`")
parser.add_argument("--disable-net", action="store_true",
help="Use docker's default networking for containers;"
" the default is to enable networking.")
......@@ -235,8 +247,15 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
parser.add_argument("--overrides", type=str,
default=None, help="Read process requirement overrides from file.")
parser.add_argument("workflow", type=Text, nargs="?", default=None)
parser.add_argument("job_order", nargs=argparse.REMAINDER)
parser.add_argument("workflow", type=Text, nargs="?", default=None,
metavar='cwl_document', help="path or URL to a CWL Workflow, "
"CommandLineTool, or ExpressionTool. If the `inputs_object` has a "
"`cwl:tool` field indicating the path or URL to the cwl_document, "
" then the `workflow` argument is optional.")
parser.add_argument("job_order", nargs=argparse.REMAINDER,
metavar='inputs_object', help="path or URL to a YAML or JSON "
"formatted description of the required input values for the given "
"`cwl_document`.")
return parser
......
......@@ -78,6 +78,7 @@ class Builder(object):
lead_pos = []
bindings = [] # type: List[Dict[Text,Text]]
binding = None # type: Dict[Text,Any]
value_from_expression = False
if "inputBinding" in schema and isinstance(schema["inputBinding"], dict):
binding = copy.copy(schema["inputBinding"])
......@@ -87,9 +88,12 @@ class Builder(object):
binding["position"] = aslist(lead_pos) + [0] + aslist(tail_pos)
binding["datum"] = datum
if "valueFrom" in binding:
value_from_expression = True
# Handle union types
if isinstance(schema["type"], list):
if not value_from_expression:
for t in schema["type"]:
if isinstance(t, (str, Text)) and self.names.has_name(t, ""):
avsc = self.names.get_name(t, "")
......@@ -103,6 +107,7 @@ class Builder(object):
return self.bind_input(schema, datum, lead_pos=lead_pos, tail_pos=tail_pos)
raise validate.ValidationException(u"'%s' is not a valid union %s" % (datum, schema["type"]))
elif isinstance(schema["type"], dict):
if not value_from_expression:
st = copy.deepcopy(schema["type"])
if binding and "inputBinding" not in st and st["type"] == "array" and "itemSeparator" not in binding:
st["inputBinding"] = {}
......@@ -212,15 +217,18 @@ class Builder(object):
prefix = binding.get("prefix")
sep = binding.get("separate", True)
if prefix is None and not sep:
with SourceLine(binding, "separate", WorkflowException, _logger.isEnabledFor(logging.DEBUG)):
raise WorkflowException("'separate' option can not be specified without prefix")
l = [] # type: List[Dict[Text,Text]]
if isinstance(value, list):
if binding.get("itemSeparator"):
if binding.get("itemSeparator") and value:
l = [binding["itemSeparator"].join([self.tostr(v) for v in value])]
elif binding.get("valueFrom"):
value = [self.tostr(v) for v in value]
return ([prefix] if prefix else []) + value
elif prefix:
elif prefix and value:
return [prefix]
else:
return []
......@@ -244,8 +252,8 @@ class Builder(object):
return [a for a in args if a is not None]
def do_eval(self, ex, context=None, pull_image=True, recursive=False):
# type: (Union[Dict[Text, Text], Text], Any, bool, bool) -> Any
def do_eval(self, ex, context=None, pull_image=True, recursive=False, strip_whitespace=True):
# type: (Union[Dict[Text, Text], Text], Any, bool, bool, bool) -> Any
if recursive:
if isinstance(ex, dict):
return {k: self.do_eval(v, context, pull_image, recursive) for k, v in iteritems(ex)}
......@@ -260,4 +268,5 @@ class Builder(object):
timeout=self.timeout,
debug=self.debug,
js_console=self.js_console,
force_docker_pull=self.force_docker_pull)
force_docker_pull=self.force_docker_pull,
strip_whitespace=strip_whitespace)
This diff is collapsed.
from __future__ import absolute_import
import logging
import os
import re
import shutil
import subprocess
import sys
import tempfile
from io import open
from typing import Dict, List, Text
import datetime
import requests
from typing import (Dict, List, Text, Any, MutableMapping)
from .docker_id import docker_vm_id
from .errors import WorkflowException
from .job import ContainerCommandLineJob
from .pathmapper import PathMapper, ensure_writable
from .utils import docker_windows_path_adjust, onWindows
_logger = logging.getLogger("cwltool")
class DockerCommandLineJob(ContainerCommandLineJob):
@staticmethod
def get_image(dockerRequirement, pull_image, dry_run=False):
# type: (Dict[Text, Text], bool, bool) -> bool
found = False
......@@ -97,8 +107,7 @@ def get_image(dockerRequirement, pull_image, dry_run=False):
return found
def get_from_requirements(r, req, pull_image, dry_run=False):
def get_from_requirements(self, r, req, pull_image, dry_run=False):
# type: (Dict[Text, Text], bool, bool, bool) -> Text
if r:
errmsg = None
......@@ -115,10 +124,141 @@ def get_from_requirements(r, req, pull_image, dry_run=False):
else:
return None
if get_image(r, pull_image, dry_run):
if self.get_image(r, pull_image, dry_run):
return r["dockerImageId"]
else:
if req:
raise WorkflowException(u"Docker image %s not found" % r["dockerImageId"])
return None
def add_volumes(self, pathmapper, runtime):
# type: (PathMapper, List[Text]) -> None
host_outdir = self.outdir
container_outdir = self.builder.outdir
for src, vol in pathmapper.items():
if not vol.staged:
continue
if vol.target.startswith(container_outdir+"/"):
host_outdir_tgt = os.path.join(
host_outdir, vol.target[len(container_outdir)+1:])
else:
host_outdir_tgt = None
if vol.type in ("File", "Directory"):
if not vol.resolved.startswith("_:"):
runtime.append(u"--volume=%s:%s:ro" % (
docker_windows_path_adjust(vol.resolved),
docker_windows_path_adjust(vol.target)))
elif vol.type == "WritableFile":
if self.inplace_update:
runtime.append(u"--volume=%s:%s:rw" % (
docker_windows_path_adjust(vol.resolved),
docker_windows_path_adjust(vol.target)))
else:
shutil.copy(vol.resolved, host_outdir_tgt)
ensure_writable(host_outdir_tgt)
elif vol.type == "WritableDirectory":
if vol.resolved.startswith("_:"):
os.makedirs(host_outdir_tgt, 0o0755)
else:
if self.inplace_update:
runtime.append(u"--volume=%s:%s:rw" % (
docker_windows_path_adjust(vol.resolved),
docker_windows_path_adjust(vol.target)))
else:
shutil.copytree(vol.resolved, host_outdir_tgt)
ensure_writable(host_outdir_tgt)
elif vol.type == "CreateFile":
if host_outdir_tgt:
with open(host_outdir_tgt, "wb") as f:
f.write(vol.resolved.encode("utf-8"))
else:
fd, createtmp = tempfile.mkstemp(dir=self.tmpdir)
with os.fdopen(fd, "wb") as f:
f.write(vol.resolved.encode("utf-8"))
runtime.append(u"--volume=%s:%s:rw" % (
docker_windows_path_adjust(createtmp),
docker_windows_path_adjust(vol.target)))
def create_runtime(self, env, rm_container=True, record_container_id=False, cidfile_dir="",
cidfile_prefix="", **kwargs):
# type: (MutableMapping[Text, Text], bool, bool, Text, Text, **Any) -> List
user_space_docker_cmd = kwargs.get("user_space_docker_cmd")
if user_space_docker_cmd:
runtime = [user_space_docker_cmd, u"run"]
else:
runtime = [u"docker", u"run", u"-i"]
runtime.append(u"--volume=%s:%s:rw" % (
docker_windows_path_adjust(os.path.realpath(self.outdir)),
self.builder.outdir))
runtime.append(u"--volume=%s:%s:rw" % (
docker_windows_path_adjust(os.path.realpath(self.tmpdir)), "/tmp"))
self.add_volumes(self.pathmapper, runtime)
if self.generatemapper:
self.add_volumes(self.generatemapper, runtime)
if user_space_docker_cmd:
runtime = [x.replace(":ro", "") for x in runtime]
runtime = [x.replace(":rw", "") for x in runtime]
runtime.append(u"--workdir=%s" % (
docker_windows_path_adjust(self.builder.outdir)))
if not user_space_docker_cmd:
if not kwargs.get("no_read_only"):
runtime.append(u"--read-only=true")
if kwargs.get("custom_net", None) is not None:
runtime.append(u"--net={0}".format(kwargs.get("custom_net")))
elif kwargs.get("disable_net", None):
runtime.append(u"--net=none")
if self.stdout:
runtime.append("--log-driver=none")
euid, egid = docker_vm_id()
if not onWindows():
# MS Windows does not have getuid() or geteuid() functions
euid, egid = euid or os.geteuid(), egid or os.getgid()
if kwargs.get("no_match_user", None) is False \
and (euid, egid) != (None, None):
runtime.append(u"--user=%d:%d" % (euid, egid))
if rm_container:
runtime.append(u"--rm")
runtime.append(u"--env=TMPDIR=/tmp")
# spec currently says "HOME must be set to the designated output
# directory." but spec might change to designated temp directory.
# runtime.append("--env=HOME=/tmp")
runtime.append(u"--env=HOME=%s" % self.builder.outdir)
# add parameters to docker to write a container ID file
if record_container_id:
if cidfile_dir != "":
if not os.path.isdir(cidfile_dir):
_logger.error("--cidfile-dir %s error:\n%s", cidfile_dir,
cidfile_dir + " is not a directory or "
"directory doesn't exist, please check it first")
exit(2)
if not os.path.exists(cidfile_dir):
_logger.error("--cidfile-dir %s error:\n%s", cidfile_dir,
"directory doesn't exist, please create it first")
exit(2)
else:
cidfile_dir = os.getcwd()
cidfile_name = datetime.datetime.now().strftime("%Y%m%d%H%M%S-%f") + ".cid"
if cidfile_prefix != "":
cidfile_name = str(cidfile_prefix + "-" + cidfile_name)
cidfile_path = os.path.join(cidfile_dir, cidfile_name)
runtime.append(u"--cidfile=%s" % cidfile_path)
for t, v in self.environment.items():
runtime.append(u"--env=%s=%s" % (t, v))
return runtime
This diff is collapsed.
import logging
import tempfile
import threading
import os
from abc import ABCMeta, abstractmethod
from typing import Dict, Text, Any, Tuple, Set, List
from .builder import Builder
from .errors import WorkflowException
from .mutation import MutationManager
from .job import JobBase
from .process import relocateOutputs, cleanIntermediate, Process
from . import loghandler
_logger = logging.getLogger("cwltool")
class JobExecutor(object):
__metaclass__ = ABCMeta
def __init__(self):
# type: (...) -> None
self.final_output = [] # type: List
self.final_status = [] # type: List
self.output_dirs = set() # type: Set
def __call__(self, *args, **kwargs):
return self.execute(*args, **kwargs)
def output_callback(self, out, processStatus):
self.final_status.append(processStatus)
self.final_output.append(out)
@abstractmethod
def run_jobs(self,
t, # type: Process
job_order_object, # type: Dict[Text, Any]
logger,
**kwargs # type: Any
):
pass
def execute(self, t, # type: Process
job_order_object, # type: Dict[Text, Any]
logger=_logger,
**kwargs # type: Any
):
# type: (...) -> Tuple[Dict[Text, Any], Text]
if "basedir" not in kwargs:
raise WorkflowException("Must provide 'basedir' in kwargs")
finaloutdir = os.path.abspath(kwargs.get("outdir")) if kwargs.get("outdir") else None
kwargs["outdir"] = tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"]) if kwargs.get(
"tmp_outdir_prefix") else tempfile.mkdtemp()
self.output_dirs.add(kwargs["outdir"])
kwargs["mutation_manager"] = MutationManager()
jobReqs = None
if "cwl:requirements" in job_order_object:
jobReqs = job_order_object["cwl:requirements"]
elif ("cwl:defaults" in t.metadata and "cwl:requirements" in t.metadata["cwl:defaults"]):
jobReqs = t.metadata["cwl:defaults"]["cwl:requirements"]
if jobReqs:
for req in jobReqs:
t.requirements.append(req)
self.run_jobs(t, job_order_object, logger, **kwargs)
if self.final_output and self.final_output[0] and finaloutdir:
self.final_output[0] = relocateOutputs(self.final_output[0], finaloutdir,
self.output_dirs, kwargs.get("move_outputs"),
kwargs["make_fs_access"](""),
kwargs["compute_checksum"])
if kwargs.get("rm_tmpdir"):
cleanIntermediate(self.output_dirs)
if self.final_output and self.final_status:
return (self.final_output[0], self.final_status[0])
else:
return (None, "permanentFail")
class SingleJobExecutor(JobExecutor):
def run_jobs(self,
t, # type: Process
job_order_object, # type: Dict[Text, Any]
logger,
**kwargs # type: Any
):
jobiter = t.job(job_order_object,
self.output_callback,
**kwargs)
try:
for r in jobiter:
if r:
builder = kwargs.get("builder", None) # type: Builder
if builder is not None:
r.builder = builder
if r.outdir:
self.output_dirs.add(r.outdir)
r.run(**kwargs)
else:
logger.error("Workflow cannot make any more progress.")
break
except WorkflowException:
raise
except Exception as e:
logger.exception("Got workflow error")
raise WorkflowException(Text(e))
class MultithreadedJobExecutor(JobExecutor):
def __init__(self):
super(MultithreadedJobExecutor, self).__init__()
self.threads = set()
self.exceptions = []
def run_job(self,
job, # type: JobBase
**kwargs # type: Any
):
# type: (...) -> None
def runner():
try:
job.run(**kwargs)
except WorkflowException as e:
self.exceptions.append(e)
except Exception as e:
self.exceptions.append(WorkflowException(Text(e)))
self.threads.remove(thread)
thread = threading.Thread(target=runner)
thread.daemon = True
self.threads.add(thread)
thread.start()
def wait_for_next_completion(self): # type: () -> None
if self.exceptions:
raise self.exceptions[0]
def run_jobs(self,
t, # type: Process
job_order_object, # type: Dict[Text, Any]
logger,
**kwargs # type: Any
):
jobiter = t.job(job_order_object, self.output_callback, **kwargs)
for r in jobiter:
if r:
builder = kwargs.get("builder", None) # type: Builder
if builder is not None:
r.builder = builder
if r.outdir:
self.output_dirs.add(r.outdir)
self.run_job(r, **kwargs)
else:
if len(self.threads):
self.wait_for_next_completion()
else:
logger.error("Workflow cannot make any more progress.")
break
while len(self.threads) > 0:
self.wait_for_next_completion()
......@@ -173,8 +173,9 @@ def evaluator(ex, jslib, obj, fullJS=False, timeout=None, force_docker_pull=Fals
def interpolate(scan, rootvars,
timeout=None, fullJS=None, jslib="", force_docker_pull=False,
debug=False, js_console=False):
# type: (Text, Dict[Text, Any], int, bool, Union[str, Text], bool, bool, bool) -> JSON
debug=False, js_console=False, strip_whitespace=True):
# type: (Text, Dict[Text, Any], int, bool, Union[str, Text], bool, bool, bool, bool) -> JSON
if strip_whitespace:
scan = scan.strip()
parts = []
w = scanner(scan)
......@@ -185,7 +186,7 @@ def interpolate(scan, rootvars,
e = evaluator(scan[w[0] + 1:w[1]], jslib, rootvars, fullJS=fullJS,
timeout=timeout, force_docker_pull=force_docker_pull,
debug=debug, js_console=js_console)
if w[0] == 0 and w[1] == len(scan):
if w[0] == 0 and w[1] == len(scan) and len(parts) <= 1:
return e
leaf = json.dumps(e, sort_keys=True)
if leaf[0] == '"':
......@@ -202,8 +203,9 @@ def interpolate(scan, rootvars,
def do_eval(ex, jobinput, requirements, outdir, tmpdir, resources,
context=None, pull_image=True, timeout=None, force_docker_pull=False, debug=False, js_console=False):
# type: (Union[dict, AnyStr], Dict[Text, Union[Dict, List, Text]], List[Dict[Text, Any]], Text, Text, Dict[Text, Union[int, Text]], Any, bool, int, bool, bool, bool) -> Any
context=None, pull_image=True, timeout=None, force_docker_pull=False,
debug=False, js_console=False, strip_whitespace=True):
# type: (Union[dict, AnyStr], Dict[Text, Union[Dict, List, Text]], List[Dict[Text, Any]], Text, Text, Dict[Text, Union[int, Text]], Any, bool, int, bool, bool, bool, bool) -> Any
runtime = copy.copy(resources)
runtime["tmpdir"] = docker_windows_path_adjust(tmpdir)
......@@ -231,7 +233,8 @@ def do_eval(ex, jobinput, requirements, outdir, tmpdir, resources,
jslib=jslib,
force_docker_pull=force_docker_pull,
debug=debug,
js_console=js_console)
js_console=js_console,
strip_whitespace=strip_whitespace)
except Exception as e:
raise WorkflowException("Expression evaluation error:\n%s" % e)
......
......@@ -3,8 +3,9 @@ import os
from typing import Callable as tCallable
from typing import Any, Dict, Text, Tuple, Union
from . import load_tool, main, workflow
from . import load_tool, workflow
from .argparser import get_default_args
from .executors import SingleJobExecutor
from .process import Process
......@@ -36,11 +37,13 @@ class Factory(object):
def __init__(self,
makeTool=workflow.defaultMakeTool, # type: tCallable[[Any], Process]
# should be tCallable[[Dict[Text, Any], Any], Process] ?
executor=main.single_job_executor, # type: tCallable[...,Tuple[Dict[Text,Any], Text]]
executor=None, # type: tCallable[...,Tuple[Dict[Text,Any], Text]]
**execkwargs # type: Any
):
# type: (...) -> None
self.makeTool = makeTool
if executor is None:
executor = SingleJobExecutor()
self.executor = executor
kwargs = get_default_args()
......
from __future__ import absolute_import
import codecs
import functools
import io
......@@ -11,27 +12,28 @@ import stat
import subprocess
import sys
import tempfile
import datetime
from abc import ABCMeta, abstractmethod
from io import open
from typing import (IO, Any, Callable, Dict, Iterable, List, MutableMapping, Text,
Tuple, Union, cast)
from threading import Lock
import shellescape
from typing import (IO, Any, Callable, Dict, Iterable, List, MutableMapping, Text,
Union, cast)
from .utils import copytree_with_merge, docker_windows_path_adjust, onWindows
from . import docker
from .builder import Builder
from .docker_id import docker_vm_id
from .errors import WorkflowException
from .pathmapper import PathMapper, ensure_writable
from .process import (UnsupportedRequirement, empty_subtree, get_feature,
from .pathmapper import PathMapper
from .process import (UnsupportedRequirement, get_feature,
stageFiles)
from .utils import bytes2str_in_dicts
from .utils import copytree_with_merge, onWindows
_logger = logging.getLogger("cwltool")
needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""")
job_output_lock = Lock()
FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "0") == "1"
SHELL_COMMAND_TEMPLATE = """#!/bin/bash
......@@ -267,6 +269,7 @@ class JobBase(object):
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(u"[job %s] %s", self.name, json.dumps(outputs, indent=4))
with job_output_lock:
self.output_callback(outputs, processStatus)
if self.stagedir and os.path.exists(self.stagedir):
......@@ -312,56 +315,19 @@ class CommandLineJob(JobBase):
self._execute([], env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs)
class DockerCommandLineJob(JobBase):
class ContainerCommandLineJob(JobBase):
__metaclass__ = ABCMeta
def add_volumes(self, pathmapper, runtime):
# type: (PathMapper, List[Text]) -> None
@abstractmethod
def get_from_requirements(self, r, req, pull_image, dry_run=False):
# type: (Dict[Text, Text], bool, bool, bool) -> Text
pass
host_outdir = self.outdir
container_outdir = self.builder.outdir
for src, vol in pathmapper.items():
if not vol.staged:
continue
if vol.target.startswith(container_outdir+"/"):
host_outdir_tgt = os.path.join(
host_outdir, vol.target[len(container_outdir)+1:])
else:
host_outdir_tgt = None
if vol.type in ("File", "Directory"):
if not vol.resolved.startswith("_:"):
runtime.append(u"--volume=%s:%s:ro" % (
docker_windows_path_adjust(vol.resolved),
docker_windows_path_adjust(vol.target)))
elif vol.type == "WritableFile":
if self.inplace_update:
runtime.append(u"--volume=%s:%s:rw" % (
docker_windows_path_adjust(vol.resolved),
docker_windows_path_adjust(vol.target)))
else:
shutil.copy(vol.resolved, host_outdir_tgt)
ensure_writable(host_outdir_tgt)
elif vol.type == "WritableDirectory":
if vol.resolved.startswith("_:"):
os.makedirs(host_outdir_tgt, 0o0755)
else:
if self.inplace_update:
runtime.append(u"--volume=%s:%s:rw" % (
docker_windows_path_adjust(vol.resolved),
docker_windows_path_adjust(vol.target)))
else:
shutil.copytree(vol.resolved, host_outdir_tgt)
ensure_writable(host_outdir_tgt)
elif vol.type == "CreateFile":
if host_outdir_tgt:
with open(host_outdir_tgt, "wb") as f:
f.write(vol.resolved.encode("utf-8"))
else:
fd, createtmp = tempfile.mkstemp(dir=self.tmpdir)
with os.fdopen(fd, "wb") as f:
f.write(vol.resolved.encode("utf-8"))
runtime.append(u"--volume=%s:%s:rw" % (
docker_windows_path_adjust(createtmp),
docker_windows_path_adjust(vol.target)))
@abstractmethod
def create_runtime(self, env, rm_container, record_container_id, cidfile_dir,
cidfile_prefix, **kwargs):
# type: (MutableMapping[Text, Text], bool, bool, Text, Text, **Any) -> List
pass
def run(self, pull_image=True, rm_container=True,
record_container_id=False, cidfile_dir="",
......@@ -389,8 +355,7 @@ class DockerCommandLineJob(JobBase):
try:
env = cast(MutableMapping[Text, Text], os.environ)
if docker_req and kwargs.get("use_container"):
img_id = str(docker.get_from_requirements(
docker_req, True, pull_image))
img_id = str(self.get_from_requirements(docker_req, True, pull_image))
if img_id is None:
if self.builder.find_default_container:
default_container = self.builder.find_default_container()
......@@ -401,99 +366,23 @@ class DockerCommandLineJob(JobBase):
if docker_req and img_id is None and kwargs.get("use_container"):
raise Exception("Docker image not available")
except Exception as e:
_logger.debug("Docker error", exc_info=True)
container = "Singularity" if kwargs.get("singularity") else "Docker"
_logger.debug("%s error" % container, exc_info=True)
if docker_is_req:
raise UnsupportedRequirement(
"Docker is required to run this tool: %s" % e)
"%s is required to run this tool: %s" % (container, e))
else:
raise WorkflowException(
"Docker is not available for this tool, try "
"--no-container to disable Docker, or install "
"{0} is not available for this tool, try "
"--no-container to disable {0}, or install "
"a user space Docker replacement like uDocker with "
"--user-space-docker-cmd.: %s" % e)
"--user-space-docker-cmd.: {1}".format(container, e))
self._setup(kwargs)
if user_space_docker_cmd:
runtime = [user_space_docker_cmd, u"run"]
else:
runtime = [u"docker", u"run", u"-i"]
runtime.append(u"--volume=%s:%s:rw" % (
docker_windows_path_adjust(os.path.realpath(self.outdir)),
self.builder.outdir))
runtime.append(u"--volume=%s:%s:rw" % (
docker_windows_path_adjust(os.path.realpath(self.tmpdir)), "/tmp"))
self.add_volumes(self.pathmapper, runtime)
if self.generatemapper:
self.add_volumes(self.generatemapper, runtime)
if user_space_docker_cmd:
runtime = [x.replace(":ro", "") for x in runtime]
runtime = [x.replace(":rw", "") for x in runtime]
runtime.append(u"--workdir=%s" % (
docker_windows_path_adjust(self.builder.outdir)))
if not user_space_docker_cmd:
if not kwargs.get("no_read_only"):
runtime.append(u"--read-only=true")
if kwargs.get("custom_net", None) is not None:
runtime.append(u"--net={0}".format(kwargs.get("custom_net")))
elif kwargs.get("disable_net", None):
runtime.append(u"--net=none")
if self.stdout:
runtime.append("--log-driver=none")
euid, egid = docker_vm_id()
if not onWindows():
# MS Windows does not have getuid() or geteuid() functions
euid, egid = euid or os.geteuid(), egid or os.getgid()
if kwargs.get("no_match_user", None) is False \
and (euid, egid) != (None, None):
runtime.append(u"--user=%d:%d" % (euid, egid))
if rm_container:
runtime.append(u"--rm")
runtime.append(u"--env=TMPDIR=/tmp")
# spec currently says "HOME must be set to the designated output
# directory." but spec might change to designated temp directory.
# runtime.append("--env=HOME=/tmp")
runtime.append(u"--env=HOME=%s" % self.builder.outdir)
# add parameters to docker to write a container ID file
if record_container_id:
if cidfile_dir != "":
if not os.path.isdir(cidfile_dir):
_logger.error("--cidfile-dir %s error:\n%s", cidfile_dir,
cidfile_dir + " is not a directory or "
"directory doesn't exist, please check it first")
exit(2)
if not os.path.exists(cidfile_dir):
_logger.error("--cidfile-dir %s error:\n%s", cidfile_dir,
"directory doesn't exist, please create it first")
exit(2)
else:
cidfile_dir = os.getcwd()
cidfile_name = datetime.datetime.now().strftime("%Y%m%d%H%M%S-%f")+".cid"
if cidfile_prefix != "":
cidfile_name = str(cidfile_prefix + "-" + cidfile_name)
cidfile_path = os.path.join(cidfile_dir, cidfile_name)
runtime.append(u"--cidfile=%s" % cidfile_path)
for t, v in self.environment.items():
runtime.append(u"--env=%s=%s" % (t, v))
runtime = self.create_runtime(env, rm_container, record_container_id, cidfile_dir, cidfile_prefix, **kwargs)
runtime.append(img_id)
self._execute(
runtime, env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs)
self._execute(runtime, env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs)
def _job_popen(
......
......@@ -20,7 +20,7 @@ import schema_salad.schema as schema
from avro.schema import Names
from ruamel.yaml.comments import CommentedMap, CommentedSeq
from schema_salad.ref_resolver import ContextType, Fetcher, Loader, file_uri
from schema_salad.sourceline import cmap
from schema_salad.sourceline import cmap, SourceLine
from schema_salad.validate import ValidationException
from . import process, update
......@@ -124,6 +124,9 @@ def _convert_stdstreams_to_files(workflowobj):
if isinstance(workflowobj, dict):
if workflowobj.get('class') == 'CommandLineTool':
for out in workflowobj.get('outputs', []):
if type(out) is not CommentedMap:
with SourceLine(workflowobj, "outputs", ValidationException, _logger.isEnabledFor(logging.DEBUG)):
raise ValidationException("Output '%s' is not a valid OutputParameter." % out)
for streamtype in ['stdout', 'stderr']:
if out.get('type') == streamtype:
if 'outputBinding' in out:
......@@ -215,9 +218,10 @@ def validate_document(document_loader, # type: Loader
if metadata and 'cwlVersion' in metadata:
workflowobj['cwlVersion'] = metadata['cwlVersion']
else:
raise ValidationException("No cwlVersion found."
"Use the following syntax in your CWL document to declare "
"the version: cwlVersion: <version>")
raise ValidationException(
"No cwlVersion found. "
"Use the following syntax in your CWL document to declare the version: cwlVersion: <version>.\n"
"Note: if this is a CWL draft-2 (pre v1.0) document then it will need to be upgraded first.")
if not isinstance(workflowobj["cwlVersion"], (str, Text)):
raise Exception("'cwlVersion' must be a string, got %s" % type(workflowobj["cwlVersion"]))
......@@ -227,16 +231,14 @@ def validate_document(document_loader, # type: Loader
workflowobj["cwlVersion"])
if workflowobj["cwlVersion"] not in list(ALLUPDATES):
# print out all the Supported Versions of cwlVersion
versions = list(ALLUPDATES) # ALLUPDATES is a dict
versions = []
for version in list(ALLUPDATES):
if "dev" in version:
version += " (with --enable-dev flag only)"
versions.append(version)
versions.sort()
raise ValidationException("'cwlVersion' not valid. Supported CWL versions are: \n{}".format("\n".join(versions)))
if workflowobj["cwlVersion"] == "draft-2":
workflowobj = cast(CommentedMap, cmap(update._draft2toDraft3dev1(
workflowobj, document_loader, uri, update_steps=False)))
if "@graph" in workflowobj:
workflowobj["$graph"] = workflowobj["@graph"]
del workflowobj["@graph"]
raise ValidationException("The CWL reference runner no longer supports pre CWL v1.0 documents. "
"Supported versions are: \n{}".format("\n".join(versions)))
(sch_document_loader, avsc_names) = \
process.get_schema(workflowobj["cwlVersion"])[:2]
......@@ -257,8 +259,6 @@ def validate_document(document_loader, # type: Loader
raise ValidationException("Workflow must be a dict or list.")
if not new_metadata:
if not isinstance(processobj, dict):
raise ValidationException("Draft-2 workflows must be a dict.")
new_metadata = cast(CommentedMap, cmap(
{"$namespaces": processobj.get("$namespaces", {}),
"$schemas": processobj.get("$schemas", []),
......
import logging
_logger = logging.getLogger("cwltool")
defaultStreamHandler = logging.StreamHandler()
_logger.addHandler(defaultStreamHandler)
_logger.setLevel(logging.INFO)
......@@ -9,30 +9,32 @@ import json
import logging
import os
import sys
import tempfile
import warnings
from typing import (IO, Any, Callable, Dict, List, Text, Tuple,
Union, cast, Mapping, MutableMapping, Iterable)
import pkg_resources # part of setuptools
import ruamel.yaml as yaml
import schema_salad.validate as validate
import six
from typing import (IO, Any, Callable, Dict, List, Text, Tuple,
Union, cast, Mapping, MutableMapping, Iterable)
import schema_salad.validate as validate
from schema_salad.ref_resolver import Loader, file_uri, uri_file_path
from schema_salad.sourceline import strip_dup_lineno
from . import draft2tool, workflow
from .argparser import arg_parser, generate_parser
from .builder import Builder
from . import command_line_tool, workflow
from .argparser import arg_parser, generate_parser, DEFAULT_TMP_PREFIX
from .cwlrdf import printdot, printrdf
from .errors import UnsupportedRequirement, WorkflowException
from .executors import SingleJobExecutor, MultithreadedJobExecutor
from .load_tool import (FetcherConstructorType, resolve_tool_uri,
fetch_document, make_tool, validate_document, jobloaderctx,
resolve_overrides, load_overrides)
from .loghandler import defaultStreamHandler
from .mutation import MutationManager
from .pack import pack
from .pathmapper import (adjustDirObjs, trim_listing, visit_class)
from .process import (Process, cleanIntermediate, normalizeFilesDirs,
relocateOutputs, scandeps, shortname, use_custom_schema,
from .process import (Process, normalizeFilesDirs,
scandeps, shortname, use_custom_schema,
use_standard_schema)
from .resolver import ga4gh_tool_registries, tool_resolver
from .software_requirements import (DependenciesConfiguration,
......@@ -43,76 +45,16 @@ from .utils import onWindows, windows_default_container_id
_logger = logging.getLogger("cwltool")
defaultStreamHandler = logging.StreamHandler()
_logger.addHandler(defaultStreamHandler)
_logger.setLevel(logging.INFO)
def single_job_executor(t, # type: Process
job_order_object, # type: Dict[Text, Any]
**kwargs # type: Any
):
# type: (...) -> Tuple[Dict[Text, Any], Text]
final_output = []
final_status = []
def output_callback(out, processStatus):
final_status.append(processStatus)
final_output.append(out)
if "basedir" not in kwargs:
raise WorkflowException("Must provide 'basedir' in kwargs")
output_dirs = set()
finaloutdir = os.path.abspath(kwargs.get("outdir")) if kwargs.get("outdir") else None
kwargs["outdir"] = tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"]) if kwargs.get(
"tmp_outdir_prefix") else tempfile.mkdtemp()
output_dirs.add(kwargs["outdir"])
kwargs["mutation_manager"] = MutationManager()
jobReqs = None
if "cwl:requirements" in job_order_object:
jobReqs = job_order_object["cwl:requirements"]
elif ("cwl:defaults" in t.metadata and "cwl:requirements" in t.metadata["cwl:defaults"]):
jobReqs = t.metadata["cwl:defaults"]["cwl:requirements"]
if jobReqs:
for req in jobReqs:
t.requirements.append(req)
jobiter = t.job(job_order_object,
output_callback,
**kwargs)
try:
for r in jobiter:
if r:
builder = kwargs.get("builder", None) # type: Builder
if builder is not None:
r.builder = builder
if r.outdir:
output_dirs.add(r.outdir)
r.run(**kwargs)
else:
_logger.error("Workflow cannot make any more progress.")
break
except WorkflowException:
raise
except Exception as e:
_logger.exception("Got workflow error")
raise WorkflowException(Text(e))
if final_output and final_output[0] and finaloutdir:
final_output[0] = relocateOutputs(final_output[0], finaloutdir,
output_dirs, kwargs.get("move_outputs"),
kwargs["make_fs_access"](""))
if kwargs.get("rm_tmpdir"):
cleanIntermediate(output_dirs)
if final_output and final_status:
return (final_output[0], final_status[0])
else:
return (None, "permanentFail")
warnings.warn("Use of single_job_executor function is deprecated. "
"Use cwltool.executors.SingleJobExecutor class instead", DeprecationWarning)
executor = SingleJobExecutor()
return executor(t, job_order_object, **kwargs)
def generate_example_input(inptype):
......@@ -377,7 +319,7 @@ def supportedCWLversions(enable_dev):
def main(argsl=None, # type: List[str]
args=None, # type: argparse.Namespace
executor=single_job_executor, # type: Callable[..., Tuple[Dict[Text, Any], Text]]
executor=None, # type: Callable[..., Tuple[Dict[Text, Any], Text]]
makeTool=workflow.defaultMakeTool, # type: Callable[..., Process]
selectResources=None, # type: Callable[[Dict[Text, int]], Dict[Text, int]]
stdin=sys.stdin, # type: IO[Any]
......@@ -477,7 +419,7 @@ def main(argsl=None, # type: List[str]
arg_parser().print_help()
return 1
if args.relax_path_checks:
draft2tool.ACCEPTLIST_RE = draft2tool.ACCEPTLIST_EN_RELAXED_RE
command_line_tool.ACCEPTLIST_RE = command_line_tool.ACCEPTLIST_EN_RELAXED_RE
if args.ga4gh_tool_registries:
ga4gh_tool_registries[:] = args.ga4gh_tool_registries
......@@ -589,8 +531,16 @@ def main(argsl=None, # type: List[str]
if isinstance(tool, int):
return tool
# If on MacOS platform, TMPDIR must be set to be under one of the shared volumes in Docker for Mac
# More info: https://dockstore.org/docs/faq
if sys.platform == "darwin":
tmp_prefix = "tmp_outdir_prefix"
default_mac_path = "/private/tmp/docker_tmp"
if getattr(args, tmp_prefix) and getattr(args, tmp_prefix) == DEFAULT_TMP_PREFIX:
setattr(args, tmp_prefix, default_mac_path)
for dirprefix in ("tmpdir_prefix", "tmp_outdir_prefix", "cachedir"):
if getattr(args, dirprefix) and getattr(args, dirprefix) != 'tmp':
if getattr(args, dirprefix) and getattr(args, dirprefix) != DEFAULT_TMP_PREFIX:
sl = "/" if getattr(args, dirprefix).endswith("/") or dirprefix == "cachedir" else ""
setattr(args, dirprefix,
os.path.abspath(getattr(args, dirprefix)) + sl)
......@@ -617,6 +567,12 @@ def main(argsl=None, # type: List[str]
except SystemExit as e:
return e.code
if not executor:
if args.parallel:
executor = MultithreadedJobExecutor()
else:
executor = SingleJobExecutor()
if isinstance(job_order_object, int):
return job_order_object
......@@ -625,6 +581,7 @@ def main(argsl=None, # type: List[str]
del args.workflow
del args.job_order
(out, status) = executor(tool, job_order_object,
logger=_logger,
makeTool=makeTool,
select_resources=selectResources,
make_fs_access=make_fs_access,
......
......@@ -256,8 +256,8 @@ def collectFilesAndDirs(obj, out):
collectFilesAndDirs(l, out)
def relocateOutputs(outputObj, outdir, output_dirs, action, fs_access):
# type: (Union[Dict[Text, Any], List[Dict[Text, Any]]], Text, Set[Text], Text, StdFsAccess) -> Union[Dict[Text, Any], List[Dict[Text, Any]]]
def relocateOutputs(outputObj, outdir, output_dirs, action, fs_access, compute_checksum):
# type: (Union[Dict[Text, Any], List[Dict[Text, Any]]], Text, Set[Text], Text, StdFsAccess, bool) -> Union[Dict[Text, Any], List[Dict[Text, Any]]]
adjustDirObjs(outputObj, functools.partial(get_listing, fs_access, recursive=True))
if action not in ("move", "copy"):
......@@ -299,7 +299,7 @@ def relocateOutputs(outputObj, outdir, output_dirs, action, fs_access):
return f
visit_class(outputObj, ("File", "Directory"), _check_adjust)
if compute_checksum:
visit_class(outputObj, ("File",), functools.partial(compute_checksums, fs_access))
# If there are symlinks to intermediate output directories, we want to move
......@@ -401,7 +401,7 @@ def fillInDefaults(inputs, job):
elif job.get(fieldname) is None and u"null" in aslist(inp[u"type"]):
job[fieldname] = None
else:
raise WorkflowException("Missing required input parameter `%s`" % shortname(inp["id"]))
raise WorkflowException("Missing required input parameter '%s'" % shortname(inp["id"]))
def avroize_type(field_type, name_prefix=""):
......@@ -506,7 +506,8 @@ class Process(six.with_metaclass(abc.ABCMeta, object)):
del c["id"]
if "type" not in c:
raise validate.ValidationException(u"Missing `type` in parameter `%s`" % c["name"])
raise validate.ValidationException(u"Missing 'type' in "
"parameter '%s'" % c["name"])
if "default" in c and "null" not in aslist(c["type"]):
c["type"] = ["null"] + aslist(c["type"])
......@@ -522,7 +523,8 @@ class Process(six.with_metaclass(abc.ABCMeta, object)):
self.inputs_record_schema = cast(Dict[six.text_type, Any], schema_salad.schema.make_valid_avro(self.inputs_record_schema, {}, set()))
AvroSchemaFromJSONData(self.inputs_record_schema, self.names)
except avro.schema.SchemaParseException as e:
raise validate.ValidationException(u"Got error `%s` while processing inputs of %s:\n%s" %
raise validate.ValidationException(u"Got error '%s' while "
"processing inputs of %s:\n%s" %
(Text(e), self.tool["id"],
json.dumps(self.inputs_record_schema, indent=4)))
......@@ -530,7 +532,8 @@ class Process(six.with_metaclass(abc.ABCMeta, object)):
self.outputs_record_schema = cast(Dict[six.text_type, Any], schema_salad.schema.make_valid_avro(self.outputs_record_schema, {}, set()))
AvroSchemaFromJSONData(self.outputs_record_schema, self.names)
except avro.schema.SchemaParseException as e:
raise validate.ValidationException(u"Got error `%s` while processing outputs of %s:\n%s" %
raise validate.ValidationException(u"Got error '%s' while "
"processing outputs of %s:\n%s" %
(Text(e), self.tool["id"],
json.dumps(self.outputs_record_schema, indent=4)))
......
from __future__ import absolute_import
import logging
import os
import re
import shutil
import subprocess
import sys
from io import open
from typing import (Dict, List, Text, MutableMapping, Any)
from .errors import WorkflowException
from .job import ContainerCommandLineJob
from .pathmapper import PathMapper, ensure_writable
from .process import (UnsupportedRequirement)
from .utils import docker_windows_path_adjust
_logger = logging.getLogger("cwltool")
class SingularityCommandLineJob(ContainerCommandLineJob):
@staticmethod
def get_image(dockerRequirement, pull_image, dry_run=False):
# type: (Dict[Text, Text], bool, bool) -> bool
found = False
if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
match = re.search(pattern=r'([a-z]*://)', string=dockerRequirement["dockerPull"])
if match:
dockerRequirement["dockerImageId"] = re.sub(pattern=r'([a-z]*://)', repl=r'',
string=dockerRequirement["dockerPull"])
dockerRequirement["dockerImageId"] = re.sub(pattern=r'[:/]', repl=r'-',
string=dockerRequirement["dockerImageId"]) + ".img"
else:
dockerRequirement["dockerImageId"] = re.sub(pattern=r'[:/]', repl=r'-',
string=dockerRequirement["dockerPull"]) + ".img"
dockerRequirement["dockerPull"] = "docker://" + dockerRequirement["dockerPull"]
# check to see if the Singularity container is already downloaded
if os.path.isfile(dockerRequirement["dockerImageId"]):
_logger.info("Using local copy of Singularity image")
found = True
# if the .img file is not already present, pull the image
elif pull_image:
cmd = [] # type: List[Text]
if "dockerPull" in dockerRequirement:
cmd = ["singularity", "pull", "--name", str(dockerRequirement["dockerImageId"]),
str(dockerRequirement["dockerPull"])]
_logger.info(Text(cmd))
if not dry_run:
subprocess.check_call(cmd, stdout=sys.stderr)
found = True
return found
def get_from_requirements(self, r, req, pull_image, dry_run=False):
# type: (Dict[Text, Text], bool, bool, bool) -> Text
# returns the filename of the Singularity image (e.g. hello-world-latest.img)
if r:
errmsg = None
try:
subprocess.check_output(["singularity", "--version"])
except subprocess.CalledProcessError as e:
errmsg = "Cannot execute 'singularity --version' " + Text(e)
except OSError as e:
errmsg = "'singularity' executable not found: " + Text(e)
if errmsg:
if req:
raise WorkflowException(errmsg)
else:
return None
if self.get_image(r, pull_image, dry_run):
return os.path.abspath(r["dockerImageId"])
else:
if req:
raise WorkflowException(u"Container image %s not found" % r["dockerImageId"])
return None
def add_volumes(self, pathmapper, runtime, stage_output):
# type: (PathMapper, List[Text], bool) -> None
host_outdir = self.outdir
container_outdir = self.builder.outdir
for src, vol in pathmapper.items():
if not vol.staged:
continue
if stage_output:
containertgt = container_outdir + vol.target[len(host_outdir):]
else:
containertgt = vol.target
if vol.target.startswith(container_outdir + "/"):
host_outdir_tgt = os.path.join(
host_outdir, vol.target[len(container_outdir) + 1:])
else:
host_outdir_tgt = None
if vol.type in ("File", "Directory"):
if not vol.resolved.startswith("_:"):
runtime.append(u"--bind")
runtime.append("%s:%s:ro" % (
docker_windows_path_adjust(vol.resolved), docker_windows_path_adjust(containertgt)))
elif vol.type == "WritableFile":
if self.inplace_update:
runtime.append(u"--bind")
runtime.append("%s:%s:rw" % (
docker_windows_path_adjust(vol.resolved), docker_windows_path_adjust(containertgt)))
else:
shutil.copy(vol.resolved, host_outdir_tgt)
ensure_writable(host_outdir_tgt)
elif vol.type == "WritableDirectory":
if vol.resolved.startswith("_:"):
os.makedirs(host_outdir_tgt, 0o0755)
else:
if self.inplace_update:
runtime.append(u"--bind")
runtime.append("%s:%s:rw" % (
docker_windows_path_adjust(vol.resolved), docker_windows_path_adjust(containertgt)))
else:
shutil.copytree(vol.resolved, vol.target)
elif vol.type == "CreateFile":
createtmp = os.path.join(host_outdir, os.path.basename(vol.target))
with open(createtmp, "wb") as f:
f.write(vol.resolved.encode("utf-8"))
runtime.append(u"--bind")
runtime.append(
"%s:%s:ro" % (docker_windows_path_adjust(createtmp), docker_windows_path_adjust(vol.target)))
def create_runtime(self, env, rm_container=True, record_container_id=False, cidfile_dir="",
cidfile_prefix="", **kwargs):
# type: (MutableMapping[Text, Text], bool, bool, Text, Text, **Any) -> List
runtime = [u"singularity", u"--quiet", u"exec"]
runtime.append(u"--bind")
runtime.append(
u"%s:%s:rw" % (docker_windows_path_adjust(os.path.realpath(self.outdir)), self.builder.outdir))
runtime.append(u"--bind")
runtime.append(u"%s:%s:rw" % (docker_windows_path_adjust(os.path.realpath(self.tmpdir)), "/tmp"))
self.add_volumes(self.pathmapper, runtime, stage_output=False)
if self.generatemapper:
self.add_volumes(self.generatemapper, runtime, stage_output=True)
runtime.append(u"--pwd")
runtime.append("%s" % (docker_windows_path_adjust(self.builder.outdir)))
if kwargs.get("custom_net", None) is not None:
raise UnsupportedRequirement(
"Singularity implementation does not support networking")
env["SINGULARITYENV_TMPDIR"] = "/tmp"
env["SINGULARITYENV_HOME"] = self.builder.outdir
for t, v in self.environment.items():
env["SINGULARITYENV_" + t] = v
return runtime
......@@ -45,69 +45,6 @@ def fixType(doc): # type: (Any) -> Any
return "#" + doc
return doc
def _draft2toDraft3dev1(doc, loader, baseuri, update_steps=True):
# type: (Any, Loader, Text, bool) -> Any
try:
if isinstance(doc, dict):
if "import" in doc:
imp = urllib.parse.urljoin(baseuri, doc["import"])
impLoaded = loader.fetch(imp)
r = None # type: Dict[Text, Any]
if isinstance(impLoaded, list):
r = {"@graph": impLoaded}
elif isinstance(impLoaded, dict):
r = impLoaded
else:
raise Exception("Unexpected code path.")
r["id"] = imp
_, frag = urllib.parse.urldefrag(imp)
if frag:
frag = "#" + frag
r = findId(r, frag)
return _draft2toDraft3dev1(r, loader, imp)
if "include" in doc:
return loader.fetch_text(urllib.parse.urljoin(baseuri, doc["include"]))
for typename in ("type", "items"):
if typename in doc:
doc[typename] = fixType(doc[typename])
if "steps" in doc and update_steps:
if not isinstance(doc["steps"], list):
raise Exception("Value of 'steps' must be a list")
for i, s in enumerate(doc["steps"]):
if "id" not in s:
s["id"] = "step%i" % i
for inp in s.get("inputs", []):
if isinstance(inp.get("source"), list):
if "requirements" not in doc:
doc["requirements"] = []
doc["requirements"].append({"class": "MultipleInputFeatureRequirement"})
for a in doc:
doc[a] = _draft2toDraft3dev1(doc[a], loader, baseuri)
if isinstance(doc, list):
for i, a in enumerate(doc):
doc[i] = _draft2toDraft3dev1(a, loader, baseuri)
return doc
except Exception as e:
err = json.dumps(doc, indent=4)
if "id" in doc:
err = doc["id"]
elif "name" in doc:
err = doc["name"]
raise Exception(u"Error updating '%s'\n %s\n%s" % (err, e, traceback.format_exc()))
def draft2toDraft3dev1(doc, loader, baseuri):
# type: (Any, Loader, Text) -> Tuple[Any, Text]
return (_draft2toDraft3dev1(doc, loader, baseuri), "draft-3.dev1")
digits = re.compile("\d+")
......@@ -144,110 +81,6 @@ def _updateDev2Script(ent): # type: (Any) -> Any
else:
return ent
def _draftDraft3dev1toDev2(doc, loader, baseuri):
# type: (Any, Loader, Text) -> Any
doc = _updateDev2Script(doc)
if isinstance(doc, six.string_types):
return doc
# Convert expressions
if isinstance(doc, dict):
if "@import" in doc:
resolved_doc = loader.resolve_ref(
doc["@import"], base_url=baseuri)[0]
if isinstance(resolved_doc, dict):
return _draftDraft3dev1toDev2(
resolved_doc, loader, resolved_doc["id"])
else:
raise Exception("Unexpected codepath")
for a in doc:
doc[a] = _draftDraft3dev1toDev2(doc[a], loader, baseuri)
if "class" in doc and (doc["class"] in ("CommandLineTool", "Workflow", "ExpressionTool")):
added = False
if "requirements" in doc:
for r in doc["requirements"]:
if r["class"] == "ExpressionEngineRequirement":
if "engineConfig" in r:
doc["requirements"].append({
"class": "InlineJavascriptRequirement",
"expressionLib": [updateScript(sc) for sc in aslist(r["engineConfig"])]
})
added = True
for i, rq in enumerate(doc["requirements"]):
if rq["class"] == "ExpressionEngineRequirement":
del doc["requirements"][i]
break
break
else:
doc["requirements"] = []
if not added:
doc["requirements"].append({"class": "InlineJavascriptRequirement"})
elif isinstance(doc, list):
for i, a in enumerate(doc):
doc[i] = _draftDraft3dev1toDev2(a, loader, baseuri)
return doc
def draftDraft3dev1toDev2(doc, loader, baseuri):
# type: (Any, Loader, Text) -> Tuple[Any, Text]
return (_draftDraft3dev1toDev2(doc, loader, baseuri), "draft-3.dev2")
def _draftDraft3dev2toDev3(doc, loader, baseuri):
# type: (Any, Loader, Text) -> Any
try:
if isinstance(doc, dict):
if "@import" in doc:
if doc["@import"][0] == "#":
return doc["@import"]
else:
imp = urllib.parse.urljoin(baseuri, doc["@import"])
impLoaded = loader.fetch(imp)
r = {} # type: Dict[Text, Any]
if isinstance(impLoaded, list):
r = {"@graph": impLoaded}
elif isinstance(impLoaded, dict):
r = impLoaded
else:
raise Exception("Unexpected code path.")
r["id"] = imp
frag = urllib.parse.urldefrag(imp)[1]
if frag:
frag = "#" + frag
r = findId(r, frag)
return _draftDraft3dev2toDev3(r, loader, imp)
if "@include" in doc:
return loader.fetch_text(urllib.parse.urljoin(baseuri, doc["@include"]))
for a in doc:
doc[a] = _draftDraft3dev2toDev3(doc[a], loader, baseuri)
if isinstance(doc, list):
for i, a in enumerate(doc):
doc[i] = _draftDraft3dev2toDev3(a, loader, baseuri)
return doc
except Exception as e:
err = json.dumps(doc, indent=4)
if "id" in doc:
err = doc["id"]
elif "name" in doc:
err = doc["name"]
import traceback
raise Exception(u"Error updating '%s'\n %s\n%s" % (err, e, traceback.format_exc()))
def draftDraft3dev2toDev3(doc, loader, baseuri):
# type: (Any, Loader, Text) -> Tuple[Any, Text]
return (_draftDraft3dev2toDev3(doc, loader, baseuri), "draft-3.dev3")
def traverseImport(doc, loader, baseuri, func):
# type: (Any, Loader, Text, Callable[[Any, Loader, Text], Any]) -> Any
if "$import" in doc:
......@@ -270,204 +103,6 @@ def traverseImport(doc, loader, baseuri, func):
r = findId(r, frag)
return func(r, loader, imp)
def _draftDraft3dev3toDev4(doc, loader, baseuri):
# type: (Any, Loader, Text) -> Any
try:
if isinstance(doc, dict):
r = traverseImport(doc, loader, baseuri, _draftDraft3dev3toDev4)
if r is not None:
return r
if "@graph" in doc:
doc["$graph"] = doc["@graph"]
del doc["@graph"]
for a in doc:
doc[a] = _draftDraft3dev3toDev4(doc[a], loader, baseuri)
if isinstance(doc, list):
for i, a in enumerate(doc):
doc[i] = _draftDraft3dev3toDev4(a, loader, baseuri)
return doc
except Exception as e:
err = json.dumps(doc, indent=4)
if "id" in doc:
err = doc["id"]
elif "name" in doc:
err = doc["name"]
import traceback
raise Exception(u"Error updating '%s'\n %s\n%s" % (err, e, traceback.format_exc()))
def draftDraft3dev3toDev4(doc, loader, baseuri):
# type: (Any, Loader, Text) -> Tuple[Any, Text]
return (_draftDraft3dev3toDev4(doc, loader, baseuri), "draft-3.dev4")
def _draftDraft3dev4toDev5(doc, loader, baseuri):
# type: (Any, Loader, Text) -> Any
try:
if isinstance(doc, dict):
r = traverseImport(doc, loader, baseuri, _draftDraft3dev4toDev5)
if r is not None:
return r
for b in ("inputBinding", "outputBinding"):
if b in doc and "secondaryFiles" in doc[b]:
doc["secondaryFiles"] = doc[b]["secondaryFiles"]
del doc[b]["secondaryFiles"]
for a in doc:
doc[a] = _draftDraft3dev4toDev5(doc[a], loader, baseuri)
if isinstance(doc, list):
for i, a in enumerate(doc):
doc[i] = _draftDraft3dev4toDev5(a, loader, baseuri)
return doc
except Exception as e:
err = json.dumps(doc, indent=4)
if "id" in doc:
err = doc["id"]
elif "name" in doc:
err = doc["name"]
raise Exception(u"Error updating '%s'\n %s\n%s" % (err, e, traceback.format_exc()))
def draftDraft3dev4toDev5(doc, loader, baseuri):
# type: (Any, Loader, Text) -> Tuple[Any, Text]
return (_draftDraft3dev4toDev5(doc, loader, baseuri), "draft-3.dev5")
def draftDraft3dev5toFinal(doc, loader, baseuri):
# type: (Any, Loader, Text) -> Tuple[Any, Text]
return (doc, "draft-3")
def _draft3toDraft4dev1(doc, loader, baseuri):
# type: (Any, Loader, Text) -> Any
if isinstance(doc, dict):
if "class" in doc and doc["class"] == "Workflow":
def fixup(f): # type: (Text) -> Text
doc, frg = urllib.parse.urldefrag(f)
frg = '/'.join(frg.rsplit('.', 1))
return doc + "#" + frg
for step in doc["steps"]:
step["in"] = step["inputs"]
step["out"] = step["outputs"]
del step["inputs"]
del step["outputs"]
for io in ("in", "out"):
for i in step[io]:
i["id"] = fixup(i["id"])
if "source" in i:
i["source"] = [fixup(s) for s in aslist(i["source"])]
if len(i["source"]) == 1:
i["source"] = i["source"][0]
if "scatter" in step:
step["scatter"] = [fixup(s) for s in aslist(step["scatter"])]
for out in doc["outputs"]:
out["source"] = fixup(out["source"])
for key, value in doc.items():
if key == 'run':
value = deepcopy(value)
doc[key] = _draft3toDraft4dev1(value, loader, baseuri)
elif isinstance(doc, list):
for i, a in enumerate(doc):
doc[i] = _draft3toDraft4dev1(a, loader, baseuri)
return doc
def draft3toDraft4dev1(doc, loader, baseuri):
# type: (Any, Loader, Text) -> Tuple[Any, Text]
"""Public updater for draft-3 to draft-4.dev1."""
return (_draft3toDraft4dev1(doc, loader, baseuri), "draft-4.dev1")
def _draft4Dev1toDev2(doc, loader, baseuri):
# type: (Any, Loader, Text) -> Any
if isinstance(doc, dict):
if "class" in doc and doc["class"] == "Workflow":
for out in doc["outputs"]:
out["outputSource"] = out["source"]
del out["source"]
for key, value in doc.items():
if key == 'run':
value = deepcopy(value)
doc[key] = _draft4Dev1toDev2(value, loader, baseuri)
elif isinstance(doc, list):
for i, a in enumerate(doc):
doc[i] = _draft4Dev1toDev2(a, loader, baseuri)
return doc
def draft4Dev1toDev2(doc, loader, baseuri):
# type: (Any, Loader, Text) -> Tuple[Any, Text]
"""Public updater for draft-4.dev1 to draft-4.dev2."""
return (_draft4Dev1toDev2(doc, loader, baseuri), "draft-4.dev2")
def _draft4Dev2toDev3(doc, loader, baseuri):
# type: (Any, Loader, Text) -> Any
if isinstance(doc, dict):
if "class" in doc and doc["class"] == "File":
doc["location"] = doc["path"]
del doc["path"]
if "secondaryFiles" in doc:
for i, sf in enumerate(doc["secondaryFiles"]):
if "$(" in sf or "${" in sf:
doc["secondaryFiles"][i] = sf.replace('"path"', '"location"').replace(".path", ".location")
if "class" in doc and doc["class"] == "CreateFileRequirement":
doc["class"] = "InitialWorkDirRequirement"
doc["listing"] = []
for f in doc["fileDef"]:
doc["listing"].append({
"entryname": f["filename"],
"entry": f["fileContent"]
})
del doc["fileDef"]
for key, value in doc.items():
doc[key] = _draft4Dev2toDev3(value, loader, baseuri)
elif isinstance(doc, list):
for i, a in enumerate(doc):
doc[i] = _draft4Dev2toDev3(a, loader, baseuri)
return doc
def draft4Dev2toDev3(doc, loader, baseuri):
# type: (Any, Loader, Text) -> Tuple[Any, Text]
"""Public updater for draft-4.dev2 to draft-4.dev3."""
return (_draft4Dev2toDev3(doc, loader, baseuri), "draft-4.dev3")
def _draft4Dev3to1_0dev4(doc, loader, baseuri):
# type: (Any, Loader, Text) -> Any
if isinstance(doc, dict):
if "description" in doc:
doc["doc"] = doc["description"]
del doc["description"]
for key, value in doc.items():
doc[key] = _draft4Dev3to1_0dev4(value, loader, baseuri)
elif isinstance(doc, list):
for i, a in enumerate(doc):
doc[i] = _draft4Dev3to1_0dev4(a, loader, baseuri)
return doc
def draft4Dev3to1_0dev4(doc, loader, baseuri):
# type: (Any, Loader, Text) -> Tuple[Any, Text]
"""Public updater for draft-4.dev3 to v1.0.dev4."""
return (_draft4Dev3to1_0dev4(doc, loader, baseuri), "v1.0.dev4")
def v1_0dev4to1_0(doc, loader, baseuri):
# type: (Any, Loader, Text) -> Tuple[Any, Text]
"""Public updater for v1.0.dev4 to v1.0."""
......@@ -481,21 +116,10 @@ def v1_0to1_1_0dev1(doc, loader, baseuri):
UPDATES = {
"draft-2": draft2toDraft3dev1,
"draft-3": draft3toDraft4dev1,
"v1.0": None
} # type: Dict[Text, Callable[[Any, Loader, Text], Tuple[Any, Text]]]
DEVUPDATES = {
"draft-3.dev1": draftDraft3dev1toDev2,
"draft-3.dev2": draftDraft3dev2toDev3,
"draft-3.dev3": draftDraft3dev3toDev4,
"draft-3.dev4": draftDraft3dev4toDev5,
"draft-3.dev5": draftDraft3dev5toFinal,
"draft-4.dev1": draft4Dev1toDev2,
"draft-4.dev2": draft4Dev2toDev3,
"draft-4.dev3": draft4Dev3to1_0dev4,
"v1.0.dev4": v1_0dev4to1_0,
"v1.0": v1_0to1_1_0dev1,
"v1.1.0-dev1": None
} # type: Dict[Text, Callable[[Any, Loader, Text], Tuple[Any, Text]]]
......