Skip to content
Commits on Source (8)
......@@ -9,6 +9,12 @@ test_script:
# Build the compiled extension and run the project tests
- "python.exe scratch/test/test_pyflow.py"
branches:
only:
- master
- travis
- /v\d\.\d\.\d/
notifications:
- provider: Email
to:
......
language: python
# Note PYVER drives hack to use python 2.4, this is
# actually pretty ugly on travis -- process is:
# 1) install # python2.4 from deadsnakes ppa
# 2) shove 2.4 in /usr/bin/python
# 3) set PATH back to /usr/bin
#
# This removes the system python link which is probably not
# smart, but the test works so leaving it for now.
#
matrix:
include:
- os: linux
......@@ -16,19 +7,14 @@ matrix:
python: "2.7"
- os: linux
sudo: required
python: "2.7"
env: PYVER="2.4"
python: "2.6"
before_install:
- date -u
- uname -a
- lsb_release -a
- if [ "$PYVER" == "2.4" ]; then sudo add-apt-repository -y ppa:fkrull/deadsnakes && sudo apt-get update -qq; fi
install:
- if [ "$PYVER" == "2.4" ]; then sudo apt-get install python2.4 -y && python2.4 -V; fi
- if [ "$PYVER" == "2.4" ]; then sudo rm -f /usr/bin/python && sudo ln -s /usr/bin/python2.4 /usr/bin/python; fi
- if [ "$PYVER" == "2.4" ]; then export PATH=/usr/bin:$PATH; fi
- python -V
script:
......@@ -37,6 +23,8 @@ script:
branches:
only:
- master
- travis
- /v\d\.\d\.\d/
notifications:
email:
......
......@@ -24,8 +24,7 @@ For more information, please see the [pyFlow website] [site].
License
-------
pyFlow source code is provided under the [BSD 2-Clause License]
(pyflow/COPYRIGHT.txt).
pyFlow source code is provided under the [BSD 2-Clause License](pyflow/COPYRIGHT.txt).
Releases
......
python-pyflow (1.1.20-1) unstable; urgency=medium
* New upstream version
* Point Vcs fields to salsa.debian.org
* Standards-Version: 4.1.4
* debhelper 11
* Testsuite: autopkgtest-pkg-python
-- Andreas Tille <tille@debian.org> Mon, 16 Apr 2018 13:48:38 +0200
python-pyflow (1.1.14-1) unstable; urgency=medium
* Initial release (Closes: #854920)
......
......@@ -2,13 +2,14 @@ Source: python-pyflow
Maintainer: Debian Med Packaging Team <debian-med-packaging@lists.alioth.debian.org>
Uploaders: Andreas Tille <tille@debian.org>
Section: python
Testsuite: autopkgtest-pkg-python
Priority: optional
Build-Depends: debhelper (>= 9),
Build-Depends: debhelper (>= 11~),
dh-python,
python-all
Standards-Version: 3.9.8
Vcs-Browser: https://anonscm.debian.org/cgit/debian-med/python-pyflow.git
Vcs-Git: https://anonscm.debian.org/git/debian-med/python-pyflow.git
Standards-Version: 4.1.4
Vcs-Browser: https://salsa.debian.org/med-team/python-pyflow
Vcs-Git: https://salsa.debian.org/med-team/python-pyflow.git
Homepage: https://illumina.github.io/pyflow/
Package: python-pyflow
......
......@@ -24,9 +24,9 @@ FEATURES:
required for each task.
- Recursive workflow specification: take any existing pyFlow object and
use it as a task in another pyFlow.
- Dynamic workflow specification: define a wait on task specification rather
than just tasks, so that tasks can be defined based on the results of
upstream tasks (note: recursive workflows are an even better way to do this)
- Dynamic workflow specification:
- Define a wait on task specification rather than just tasks, so that tasks can be defined based on the results of upstream tasks (note: recursive workflows are an even better way to do this)
- Tasks/Task-trees can be canceled during the workflow based on the results of other tasks
- Detects and reports all failed tasks with consistent workflow-level logging.
- Task-level logging: All task stderr is logged and decorated,
eg. [time][host][workflow_run][taskid]
......@@ -53,7 +53,7 @@ The pyflow module can be installed using standard python distutils
intallation. To do so unpack the tarball and use the setup script
as follows:
```
```bash
tar -xzf pyflow-X.Y.Z.tar.gz
cd pyflow-X.Y.Z
python setup.py build install
......@@ -63,7 +63,7 @@ If installation in not convenient, you can simply add the pyflow
src/ directory to the system search path. For instance:
usepyflow.py:
```
```python
import sys
sys.path.append("/path/to/pyflow/src")
......@@ -117,7 +117,7 @@ prefixed with "[time] [hosname] [workflow_run] [component] ". Where:
is composed of (1) the run() PID and (2) the number of times run() has been called on
the workflow by the same process. These two values are joined by an underscore
- 'component' - the name of the pyflow thread, the primary threads are
'WorkflowManager' which runs the worklow() method, and 'TaskManager' which
'WorkflowManager' which runs the workflow() method, and 'TaskManager' which
polls the task graph and launches jobs.
In the task logs, only the stderr stream is decorated. The prefix in
......@@ -142,8 +142,8 @@ from make and must be kept in mind when restarting interrupted
workflows.
The runstate of each task is in
pyflow.data/state/pyflow_tasks_runstate.txt, the description of each
task is in pyflow.data/state/pyflow_tasks_info.txt. At the beginning of
`pyflow.data/state/pyflow_tasks_runstate.txt`, the description of each
task is in `pyflow.data/state/pyflow_tasks_info.txt`. At the beginning of
each run any existing task files are backed up in
pyflow.data/state/backup.
......@@ -174,11 +174,11 @@ directory here:
pyflow.data/state/make_pyflow_task_graph.py
This script can be run without arguments to produce the current task graph in
dot format based on the data files in the pyflow.data/state/ directory.
dot format based on the data files in the `pyflow.data/state/` directory.
#### Site configuration:
The file ${pyflowDir}/src/pyflowConfig.py contains any pyflow variables or
The file `${pyflowDir}/src/pyflowConfig.py` contains any pyflow variables or
functions which would be likely to need configuration at a new site. This
currently incudes:
......
#
# pyFlow - a lightweight parallel task engine
#
# Copyright (c) 2012-2015 Illumina, Inc.
# Copyright (c) 2012-2017 Illumina, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
......@@ -29,6 +29,7 @@
# WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
#
"""
......@@ -520,7 +521,7 @@ def flowcellBamListMergeFlow(self, taskPrefix="", dependencies=set()) :
# TODO: what if there's an NFS delay updating all the bams while
# we're reading them out here? make this process more robust -- we
# should know how many BAM's we're expecting, in a way that's
# robust to interuption/restart
# robust to interruption/restart
#
bams = {}
bamDigger = FileDigger(".bam", ["Flowcell_", "bam", "Project_", "Sample_"])
......@@ -563,7 +564,7 @@ def flowcellBamListMergeFlow(self, taskPrefix="", dependencies=set()) :
#
# mergedBams contains all bams from the current run, we also add any from a
# previous interupted run:
# previous interrupted run:
mergedBamDigger = FileDigger(mergedBamExt, ["Project_", "Sample_"])
for (project, sample, bamFile) in mergedBamDigger.getNextFile(mergedBamDir) :
key = (project, sample)
......
......@@ -2,7 +2,7 @@
#
# pyFlow - a lightweight parallel task engine
#
# Copyright (c) 2012-2015 Illumina, Inc.
# Copyright (c) 2012-2017 Illumina, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
......@@ -30,6 +30,7 @@
# WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
#
"""
This demonstrates a run of a prototype BCL to BWA BAM workflow
......@@ -65,7 +66,7 @@ def get_run_options() :
from optparse import OptionParser
import textwrap
epilog=\"""Note this script can be re-run to continue the workflow run in case of interuption.
epilog=\"""Note this script can be re-run to continue the workflow run in case of interruption.
Also note that dryRun option has limited utility when task definition depends on upstream task results,
in which case the dry run will not cover the full 'live' run task set)\"""
......
......@@ -2,7 +2,7 @@
#
# pyFlow - a lightweight parallel task engine
#
# Copyright (c) 2012-2015 Illumina, Inc.
# Copyright (c) 2012-2017 Illumina, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
......@@ -30,6 +30,7 @@
# WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
#
#
......
......@@ -2,7 +2,7 @@
#
# pyFlow - a lightweight parallel task engine
#
# Copyright (c) 2012-2015 Illumina, Inc.
# Copyright (c) 2012-2017 Illumina, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
......@@ -30,6 +30,7 @@
# WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
#
#
......
......@@ -2,7 +2,7 @@
#
# pyFlow - a lightweight parallel task engine
#
# Copyright (c) 2012-2015 Illumina, Inc.
# Copyright (c) 2012-2017 Illumina, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
......@@ -30,6 +30,7 @@
# WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
#
#
......
# launchTasksUntil Demo
This demo shows how a pyflow script can be configured to launch tasks until a certain amount of work is accomplished,
where the quantity of work accomplished by each task is unknown at the time the tasks are launched. In this case
the total work accomplished by all complete tasks is enumerated, and task launch is canceled once the required total
is reached. Once the total work threshold has been met, any remaining queued or running tasks are canceled.
Note this is an advanced pyflow demo, it should not be used as a tutorial to learn pyflow's basic operation, and it
reflects a use case that was not considered during the original development of pyflow's API. On the latter point,
note in particular that the re-entrant lock used to synchronize the counting of total work accomplished in this demo
must be placed in a special `DeepCopyProtector` object to prevent an attempted deep-copy of the lock. Pyflow workflow
tasks are deep-copied by default, so this workaround is required for any object to be shared between a workflow task
and the code which launches it (except via the filesystem -- which is the communication pathway encouraged by the
pyflow design).
#!/usr/bin/env python
#
# pyFlow - a lightweight parallel task engine
#
# Copyright (c) 2012-2017 Illumina, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
# WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
#
import os.path
import sys
# add module path by hand
#
scriptDir=os.path.abspath(os.path.dirname(__file__))
sys.path.append(scriptDir+"/../../src")
from pyflow import WorkflowRunner
#
# very simple task scripts called by the demo:
#
testJobDir=os.path.join(scriptDir)
workerJob=os.path.join(testJobDir,"testWorker.py")
class DeepCopyProtector(object) :
"""
Any data attached to this object will remain aliased through a deepcopy operation
Overloading __copy__ is provided here just to ensure that deep/shallow copy semantics are identical.
"""
def __copy__(self) :
return self
def __deepcopy__(self, dict) :
return self
def lockMethod(f):
"""
method decorator acquires/releases object's lock
"""
def wrapped(self, *args, **kw):
import threading
if not hasattr(self,"lock") :
self.lock = threading.RLock()
self.lock.acquire()
try:
return f(self, *args, **kw)
finally:
self.lock.release()
return wrapped
class SyncronizedAccumulator(object) :
def __init__(self) :
self._values = []
@lockMethod
def addOrderedValue(self, index, value):
while index+1 > len(self._values) :
self._values.append(None)
self._values[index] = value
@lockMethod
def totalValue(self):
count = 0
sum = 0
for v in self._values :
if v is None : continue
count += 1
sum += v
return (count, sum)
@lockMethod
def totalContinuousValue(self):
count = 0
sum = 0
for v in self._values :
if v is None : break
count += 1
sum += v
return (count, sum)
@lockMethod
def continuousTasksRequiredToReachTarget(self, targetVal):
count = 0
sum = 0
if sum >= targetVal : return count
for v in self._values :
if v is None : break
count += 1
sum += v
if sum >= targetVal : return count
return None
class SumWorkflow(WorkflowRunner) :
def __init__(self, taskIndex, inputFile, totalWorkCompleted) :
self.taskIndex=taskIndex
self.inputFile=inputFile
self.nocopy = DeepCopyProtector()
self.nocopy.totalWorkCompleted = totalWorkCompleted
def workflow(self) :
import os
infp = open(self.inputFile, "rb")
value = int(infp.read().strip())
infp.close()
self.flowLog("File: %s Value: %i" % (self.inputFile, value))
os.remove(self.inputFile)
self.nocopy.totalWorkCompleted.addOrderedValue(self.taskIndex, value)
class LaunchUntilWorkflow(WorkflowRunner) :
def __init__(self):
self.totalContinuousWorkTarget = 100
def workflow(self):
taskByIndex = []
allTasks = set()
completedTasks = set()
totalWorkCompleted = SyncronizedAccumulator()
def launchNextTask() :
taskIndex = len(allTasks)
workerTaskLabel = "workerTask_%05i" % (taskIndex)
workerTaskFile = "outputFile_%05i" % (taskIndex)
workerTaskCmd=[sys.executable, workerJob, workerTaskFile]
self.addTask(workerTaskLabel, workerTaskCmd)
allTasks.add(workerTaskLabel)
taskByIndex.append(workerTaskLabel)
sumTaskLabel="sumTask_%05i" % (taskIndex)
self.addWorkflowTask(sumTaskLabel, SumWorkflow(taskIndex, workerTaskFile, totalWorkCompleted), dependencies=workerTaskLabel)
def updateCompletedTasks() :
for task in allTasks :
if task in completedTasks : continue
(isDone, isError) = self.isTaskDone(task)
if not isDone : continue
if isError :
raise Exception("Task %s failed." % (task))
completedTasks.add(task)
def stopRunningExtraTasks(nTargetTasks) :
for task in taskByIndex[nTargetTasks:] :
self.cancelTaskTree(task)
maxTaskCount = self.getNCores()
assert(maxTaskCount > 0)
import time
while True :
count, completedWork = totalWorkCompleted.totalValue()
self.flowLog("TotalWorkCompleted: %i (%i)" % (completedWork, count))
count, completedContinuousWork = totalWorkCompleted.totalContinuousValue()
self.flowLog("TotalContinuousWorkCompleted: %i (%i)" % (completedContinuousWork, count))
nTargetTasks = totalWorkCompleted.continuousTasksRequiredToReachTarget(self.totalContinuousWorkTarget)
if nTargetTasks is not None :
self.flowLog("Work target completed in first %i tasks" % (nTargetTasks))
stopRunningExtraTasks(nTargetTasks)
break
updateCompletedTasks()
runningTaskCount = len(allTasks)-len(completedTasks)
self.flowLog("Completed/Running tasks: %i %i" % (len(completedTasks), runningTaskCount))
assert(runningTaskCount >= 0)
# launch new tasks until it is clear the total threshold will be met
if completedWork < self.totalContinuousWorkTarget :
numberOfTasksToLaunch = max(maxTaskCount-runningTaskCount,0)
for _ in range(numberOfTasksToLaunch) : launchNextTask()
time.sleep(5)
wflow = LaunchUntilWorkflow()
# Run the worklow:
#
retval=wflow.run(mode="local",nCores=8)
sys.exit(retval)
#!/usr/bin/env python
#
# pyFlow - a lightweight parallel task engine
#
# Copyright (c) 2012-2017 Illumina, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
# WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
#
"""
A dummy worker used to illustrate a pyflow 'launch-tasks-until-condition' workflow.
This worker wait for a random length of time, then writes a random number to the file
specified in arg1.
"""
import sys
if len(sys.argv) != 2 :
print "usage: $0 outputFile"
sys.exit(2)
outputFile=sys.argv[1]
import time
from random import randint
sleepTime=randint(1,60)
time.sleep(sleepTime)
reportVal=randint(1,20)
ofp=open(outputFile,"wb")
ofp.write("%i\n" % (reportVal))
ofp.close()
......@@ -2,7 +2,7 @@
#
# pyFlow - a lightweight parallel task engine
#
# Copyright (c) 2012-2015 Illumina, Inc.
# Copyright (c) 2012-2017 Illumina, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
......@@ -30,6 +30,7 @@
# WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
#
import os.path
......
......@@ -2,7 +2,7 @@
#
# pyFlow - a lightweight parallel task engine
#
# Copyright (c) 2012-2015 Illumina, Inc.
# Copyright (c) 2012-2017 Illumina, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
......@@ -30,6 +30,7 @@
# WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
#
#
......
......@@ -2,7 +2,7 @@
#
# pyFlow - a lightweight parallel task engine
#
# Copyright (c) 2012-2015 Illumina, Inc.
# Copyright (c) 2012-2017 Illumina, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
......@@ -30,6 +30,7 @@
# WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
#
import os.path
......
......@@ -2,7 +2,7 @@
#
# pyFlow - a lightweight parallel task engine
#
# Copyright (c) 2012-2015 Illumina, Inc.
# Copyright (c) 2012-2017 Illumina, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
......@@ -30,6 +30,7 @@
# WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
#
#
......