Skip to content
Commits on Source (4)
......@@ -10,7 +10,7 @@ python:
- "3.5"
- "3.6"
- "3.7"
- "3.8-dev"
- "3.8"
- "nightly"
install:
......
......@@ -2,6 +2,18 @@
Changes
=======
v2.7 (2019-11-22)
-----------------
* :issue:`427`: Multicore is now supported even when using ``--info-file``,
``--rest-file`` or ``--wildcard-file``. The only remaining feature that
still does not work with multicore is now demultiplexing.
* :issue:`290`: When running on a single core, Cutadapt no longer spawns
external ``pigz`` processes for writing gzip-compressed files. This is a first
step towards ensuring that using ``--cores=n`` uses only at most *n* CPU
cores.
* This release adds support for Python 3.8.
v2.6 (2019-10-26)
-----------------
......
#!/bin/bash
#
# Build manylinux1 wheels for cutadapt. Based on the example at
# Build manylinux wheels. Based on the example at
# <https://github.com/pypa/python-manylinux-demo>
#
# It is best to run this in a fresh clone of the repository!
#
# Run this within the repository root:
# docker run --rm -v $(pwd):/io quay.io/pypa/manylinux1_x86_64 /io/buildwheels.sh
# ./buildwheels.sh
#
# The wheels will be put into the wheelhouse/ subdirectory.
#
# For interactive tests:
# docker run -it -v $(pwd):/io quay.io/pypa/manylinux1_x86_64 /bin/bash
# docker run -it -v $(pwd):/io quay.io/pypa/manylinux2010_x86_64 /bin/bash
set -xeuo pipefail
MANYLINUX=quay.io/pypa/manylinux2010_x86_64
manylinux=quay.io/pypa/manylinux2010_x86_64
# For convenience, if this script is called from outside of a docker container,
# it starts a container and runs itself inside of it.
if ! grep -q docker /proc/1/cgroup; then
# We are not inside a container
docker pull ${MANYLINUX}
exec docker run --rm -v $(pwd):/io ${MANYLINUX} /io/$0
docker pull ${manylinux}
exec docker run --rm -v $(pwd):/io ${manylinux} /io/$0
fi
# Strip binaries (copied from multibuild)
......@@ -37,11 +37,12 @@ PYBINS="/opt/python/*/bin"
HAS_CYTHON=0
for PYBIN in ${PYBINS}; do
# ${PYBIN}/pip install -r /io/requirements.txt
${PYBIN}/pip wheel /io/ -w wheelhouse/
${PYBIN}/pip wheel --no-deps /io/ -w wheelhouse/
done
ls wheelhouse/
# Bundle external shared libraries into the wheels
for whl in wheelhouse/cutadapt-*.whl; do
for whl in wheelhouse/*.whl; do
auditwheel repair "$whl" --plat manylinux1_x86_64 -w repaired/
done
......
python-cutadapt (2.7-1) unstable; urgency=medium
* Team upload.
* New upstream version
-- Steffen Moeller <moeller@debian.org> Mon, 25 Nov 2019 14:17:56 +0100
python-cutadapt (2.6-1) unstable; urgency=medium
* Team upload
......
......@@ -14,7 +14,7 @@ Build-Depends: debhelper-compat (= 12),
python3-setuptools-scm,
python3-pytest-timeout,
python3-xopen (>= 0.5.0),
python3-dnaio (>= 0.4),
python3-dnaio,
cython3
Standards-Version: 4.4.1
Vcs-Browser: https://salsa.debian.org/med-team/python-cutadapt
......
......@@ -46,7 +46,7 @@ source_suffix = '.rst'
master_doc = 'index'
# General information about the project.
project = u'cutadapt'
project = u'Cutadapt'
copyright = u'2010-2019, Marcel Martin'
# The version info for the project you're documenting, acts as replacement for
......
......@@ -129,22 +129,9 @@ Make also sure that you have ``pigz`` (parallel gzip) installed if you use
multiple cores and write to a ``.gz`` output file. Otherwise, compression of
the output will be done in a single thread and therefore be a bottleneck.
There are some limitations at the moment:
* The following command-line arguments are not compatible with
multi-core:
- ``--info-file``
- ``--rest-file``
- ``--wildcard-file``
- ``--format``
* Multi-core is not available when you use Cutadapt for demultiplexing.
If you try to use multiple cores with an incompatible commandline option, you
will get an error message.
Some of these limitations will be lifted in the future, as time allows.
Currently, multi-core support is not available when demultiplexing. You
will get an error message if you try to use it. This limitations may be
lifted in the future.
.. versionadded:: 1.15
......@@ -154,6 +141,9 @@ Some of these limitations will be lifted in the future, as time allows.
.. versionadded:: 2.5
Multicore works with ``--untrimmed/too-short/too-long-(paired)-output``
.. versionadded:: 2.7
Muticore works with ``--info-file``, ``--rest-file``, ``--wildcard-file``
Speed-up tricks
---------------
......@@ -1037,7 +1027,7 @@ each read. Steps not requested on the command-line are skipped.
Filtering reads
===============
By default, all processed reads, no matter whether they were trimmed are not,
By default, all processed reads, no matter whether they were trimmed or not,
are written to the output file specified by the ``-o`` option (or to standard
output if ``-o`` was not provided). For paired-end reads, the second read in a
pair is always written to the file specified by the ``-p`` option.
......@@ -1047,23 +1037,22 @@ them entirely or by redirecting them to other files. When redirecting reads,
the basic rule is that *each read is written to at most one file*. You cannot
write reads to more than one output file.
In the following, the term "processed read" refers to a read to which all
modifications have been applied (adapter removal, quality trimming etc.). A
processed read can be identical to the input read if no modifications were done.
Filters are applied to *all* processed reads, no matter whether they have been
modified by adapter- or quality trimming.
``--minimum-length LENGTH`` or ``-m LENGTH``
Discard processed reads that are shorter than LENGTH. Reads that are too
short even before adapter removal are also discarded. Without this option,
reads that have a length of zero (empty reads) are kept in the output.
Discard processed reads that are shorter than LENGTH.
If you do not use this option, reads that have a length of zero (empty
reads) are kept in the output. Some downstream tools may have problems
with zero-length sequences. In that case, specify at least ``-m 1``.
``--too-short-output FILE``
Instead of discarding the reads that are too short according to ``-m``,
write them to *FILE* (in FASTA/FASTQ format).
``--maximum-length LENGTH`` or ``-M LENGTH``
Discard processed reads that are longer than LENGTH. Reads that are too
long even before adapter removal are also discarded.
Discard processed reads that are longer than LENGTH.
``--too-long-output FILE``
Instead of discarding reads that are too long (according to ``-M``),
......@@ -1074,11 +1063,11 @@ processed read can be identical to the input read if no modifications were done.
of writing them to the regular output file.
``--discard-trimmed``
Discard reads in which an adapter was found.
Discard reads in which an adapter was found.
``--discard-untrimmed``
Discard reads in which *no* adapter was found. This has the same effect as
specifying ``--untrimmed-output /dev/null``.
Discard reads in which *no* adapter was found. This has the same effect as
specifying ``--untrimmed-output /dev/null``.
The options ``--too-short-output`` and ``--too-long-output`` are applied first.
This means, for example, that a read that is too long will never end up in the
......@@ -1089,11 +1078,11 @@ The options ``--untrimmed-output``, ``--discard-trimmed`` and ``-discard-untrimm
are mutually exclusive.
The following filtering options do not have a corresponding option for redirecting
reads. They always discard reads for which the filtering criterion applies.
reads. They always discard those reads for which the filtering criterion applies.
``--max-n COUNT_or_FRACTION``
Discard reads with more than COUNT ``N`` bases. If ``COUNT_or_FRACTION`` is an
number between 0 and 1, it is interpreted as a fraction of the read length
Discard reads with more than COUNT ``N`` bases. If ``COUNT_or_FRACTION`` is
a number between 0 and 1, it is interpreted as a fraction of the read length
``--discard-casava``
Discard reads that did not pass CASAVA filtering. Illumina’s CASAVA pipeline in
......
......@@ -38,6 +38,24 @@ Then install Cutadapt like this::
If neither ``pip`` nor ``conda`` installation works, keep reading.
Installation on a Debian-based Linux distribution
-------------------------------------------------
Cutadapt is also included in Debian-based Linux distributions, such as Ubuntu.
Simply use your favorite package manager to install Cutadapt. On the
command-line, this should work ::
sudo apt install cutadapt
or possibly ::
sudo apt install python3-cutadapt
Please be aware that this will likely give you an old version of Cutadapt. If
you encounter unexpected behavior, please use one of the other installation
methods to get an up-to-date version before reporting bugs.
.. _dependencies:
Dependencies
......
......@@ -103,11 +103,11 @@ setup(
packages=find_packages('src'),
entry_points={'console_scripts': ['cutadapt = cutadapt.__main__:main']},
install_requires=[
'dnaio~=0.4.0',
'dnaio~=0.4.1',
'xopen~=0.8.4',
],
extras_require={
'dev': ['Cython', 'pytest', 'pytest-timeout', 'sphinx', 'sphinx_issues'],
'dev': ['Cython', 'pytest', 'pytest-timeout', 'pytest-mock', 'sphinx', 'sphinx_issues'],
},
python_requires='>=3.5',
classifiers=[
......
......@@ -60,7 +60,6 @@ import logging
import platform
from argparse import ArgumentParser, SUPPRESS, HelpFormatter
from xopen import xopen
import dnaio
from cutadapt import __version__
......@@ -72,7 +71,7 @@ from cutadapt.modifiers import (LengthTagModifier, SuffixRemover, PrefixSuffixAd
from cutadapt.report import full_report, minimal_report
from cutadapt.pipeline import (SingleEndPipeline, PairedEndPipeline, InputFiles, OutputFiles,
SerialPipelineRunner, ParallelPipelineRunner)
from cutadapt.utils import available_cpu_count, Progress, DummyProgress
from cutadapt.utils import available_cpu_count, Progress, DummyProgress, FileOpener
from cutadapt.log import setup_logging, REPORT
logger = logging.getLogger()
......@@ -185,7 +184,7 @@ def get_argument_parser():
"mask: replace with 'N' characters; "
"lowercase: convert to lowercase; "
"none: leave unchanged (useful with "
"--discard-untrimmed). Default: trim")
"--discard-untrimmed). Default: %(default)s")
group.add_argument("--no-trim", dest='action', action='store_const', const='none',
help=SUPPRESS) # Deprecated, use --action=none
group.add_argument("--mask-adapter", dest='action', action='store_const', const='mask',
......@@ -381,25 +380,24 @@ def parse_lengths(s):
return tuple(values)
def open_output_files(args, default_outfile, interleaved):
def open_output_files(args, default_outfile, interleaved, file_opener):
"""
Return an OutputFiles instance. If demultiplex is True, the untrimmed, untrimmed2, out and out2
attributes are not opened files, but paths (out and out2 with the '{name}' template).
"""
compression_level = args.compression_level
def open1(path):
"""Return opened file (or None if path is None)"""
if path is None:
return None
return xopen(path, "w", compresslevel=compression_level)
return file_opener.xopen(path, "wb")
def open2(path1, path2):
file1 = file2 = None
if path1 is not None:
file1 = xopen(path1, 'wb', compresslevel=compression_level)
file1 = file_opener.xopen(path1, "wb")
if path2 is not None:
file2 = xopen(path2, 'wb', compresslevel=compression_level)
file2 = file_opener.xopen(path2, "wb")
return file1, file2
rest_file = open1(args.rest_file)
......@@ -599,7 +597,7 @@ def check_arguments(args, paired, is_interleaved_output):
raise CommandLineError("--pair-adapters cannot be used with --times")
def pipeline_from_parsed_args(args, paired, is_interleaved_output):
def pipeline_from_parsed_args(args, paired, is_interleaved_output, file_opener):
"""
Setup a processing pipeline from parsed command-line arguments.
......@@ -632,9 +630,9 @@ def pipeline_from_parsed_args(args, paired, is_interleaved_output):
# Create the processing pipeline
if paired:
pair_filter_mode = 'any' if args.pair_filter is None else args.pair_filter
pipeline = PairedEndPipeline(pair_filter_mode)
pipeline = PairedEndPipeline(pair_filter_mode, file_opener)
else:
pipeline = SingleEndPipeline()
pipeline = SingleEndPipeline(file_opener)
# When adapters are being trimmed only in R1 or R2, override the pair filter mode
# as using the default of 'any' would regard all read pairs as untrimmed.
......@@ -780,30 +778,30 @@ def main(cmdlineargs=None, default_outfile=sys.stdout.buffer):
# Print the header now because some of the functions below create logging output
log_header(cmdlineargs)
if args.cores < 0:
parser.error('Value for --cores cannot be negative')
cores = available_cpu_count() if args.cores == 0 else args.cores
file_opener = FileOpener(
compression_level=args.compression_level, threads=0 if cores == 1 else None)
try:
is_interleaved_input, is_interleaved_output = determine_interleaved(args)
input_filename, input_paired_filename = input_files_from_parsed_args(args.inputs,
paired, is_interleaved_input)
pipeline = pipeline_from_parsed_args(args, paired, is_interleaved_output)
outfiles = open_output_files(args, default_outfile, is_interleaved_output)
pipeline = pipeline_from_parsed_args(args, paired, is_interleaved_output, file_opener)
outfiles = open_output_files(args, default_outfile, is_interleaved_output, file_opener)
except CommandLineError as e:
parser.error(str(e))
return # avoid IDE warnings below
if args.cores < 0:
parser.error('Value for --cores cannot be negative')
cores = available_cpu_count() if args.cores == 0 else args.cores
if cores > 1:
if ParallelPipelineRunner.can_output_to(outfiles):
runner_class = ParallelPipelineRunner
runner_kwargs = dict(n_workers=cores, buffer_size=args.buffer_size)
else:
parser.error('Running in parallel is currently not supported for '
'the given combination of command-line parameters.\nThese '
'options are not supported: --info-file, --rest-file, '
'--wildcard-file, --format\n'
'Also, demultiplexing is not supported.\n'
'Omit --cores/-j to continue.')
parser.error("Running in parallel is currently not supported "
"when using --format or when demultiplexing.\n"
"Omit --cores/-j to continue.")
return # avoid IDE warnings below
else:
runner_class = SerialPipelineRunner
......
......@@ -73,6 +73,15 @@ class EndStatistics:
self._remove = adapter.remove
self.adjacent_bases = {'A': 0, 'C': 0, 'G': 0, 'T': 0, '': 0}
def __repr__(self):
errors = {k: dict(v) for k, v in self.errors.items()}
return "EndStatistics(where={}, max_error_rate={}, errors={}, adjacent_bases={})".format(
self.where,
self.max_error_rate,
errors,
self.adjacent_bases,
)
def __iadd__(self, other):
if not isinstance(other, self.__class__):
raise ValueError("Cannot compare")
......@@ -140,6 +149,14 @@ class AdapterStatistics:
else:
self.back = EndStatistics(adapter2)
def __repr__(self):
return "AdapterStatistics(name={}, where={}, front={}, back={})".format(
self.name,
self.where,
self.front,
self.back,
)
def __iadd__(self, other):
if self.where != other.where: # TODO self.name != other.name or
raise ValueError('incompatible objects')
......@@ -297,7 +314,7 @@ class Adapter:
self.name = _generate_adapter_name() if name is None else name
self.sequence = sequence.upper().replace('U', 'T')
if not self.sequence:
raise ValueError('Sequence is empty')
raise ValueError("Adapter sequence is empty")
self.where = where
if remove not in (None, 'prefix', 'suffix', 'auto'):
raise ValueError('remove parameter must be "prefix", "suffix", "auto" or None')
......
......@@ -14,11 +14,6 @@ The read is then assumed to have been "consumed", that is, either written
somewhere or filtered (should be discarded).
"""
from abc import ABC, abstractmethod
import errno
import dnaio
from .utils import raise_open_files_limit
# Constants used when returning from a Filter’s __call__ method to improve
......@@ -230,29 +225,13 @@ class CasavaFilter(SingleEndFilter):
return right[1:4] == ':Y:' # discard if :Y: found
def _open_raise_limit(path, qualities):
"""
Open a FASTA/FASTQ file for writing. If it fails because the number of open files
would be exceeded, try to raise the soft limit and re-try.
"""
try:
f = dnaio.open(path, mode="w", qualities=qualities)
except OSError as e:
if e.errno == errno.EMFILE: # Too many open files
raise_open_files_limit(8)
f = dnaio.open(path, mode="w", qualities=qualities)
else:
raise
return f
class Demultiplexer(SingleEndFilter):
"""
Demultiplex trimmed reads. Reads are written to different output files
depending on which adapter matches. Files are created when the first read
is written to them.
"""
def __init__(self, path_template, untrimmed_path, qualities):
def __init__(self, path_template, untrimmed_path, qualities, file_opener):
"""
path_template must contain the string '{name}', which will be replaced
with the name of the adapter to form the final output path.
......@@ -267,6 +246,7 @@ class Demultiplexer(SingleEndFilter):
self.written = 0
self.written_bp = [0, 0]
self.qualities = qualities
self.file_opener = file_opener
def __call__(self, read, matches):
"""
......@@ -275,14 +255,14 @@ class Demultiplexer(SingleEndFilter):
if matches:
name = matches[-1].adapter.name
if name not in self.writers:
self.writers[name] = _open_raise_limit(
self.writers[name] = self.file_opener.dnaio_open_raise_limit(
self.template.replace('{name}', name), self.qualities)
self.written += 1
self.written_bp[0] += len(read)
self.writers[name].write(read)
else:
if self.untrimmed_writer is None and self.untrimmed_path is not None:
self.untrimmed_writer = _open_raise_limit(
self.untrimmed_writer = self.file_opener.dnaio_open_raise_limit(
self.untrimmed_path, self.qualities)
if self.untrimmed_writer is not None:
self.written += 1
......@@ -303,16 +283,16 @@ class PairedDemultiplexer(PairedEndFilter):
depending on which adapter (in read 1) matches.
"""
def __init__(self, path_template, path_paired_template, untrimmed_path, untrimmed_paired_path,
qualities):
qualities, file_opener):
"""
The path templates must contain the string '{name}', which will be replaced
with the name of the adapter to form the final output path.
Read pairs without an adapter match are written to the files named by
untrimmed_path.
"""
self._demultiplexer1 = Demultiplexer(path_template, untrimmed_path, qualities)
self._demultiplexer1 = Demultiplexer(path_template, untrimmed_path, qualities, file_opener)
self._demultiplexer2 = Demultiplexer(path_paired_template, untrimmed_paired_path,
qualities)
qualities, file_opener)
@property
def written(self):
......@@ -337,7 +317,7 @@ class CombinatorialDemultiplexer(PairedEndFilter):
Demultiplex reads depending on which adapter matches, taking into account both matches
on R1 and R2.
"""
def __init__(self, path_template, path_paired_template, untrimmed_name, qualities):
def __init__(self, path_template, path_paired_template, untrimmed_name, qualities, file_opener):
"""
path_template must contain the string '{name1}' and '{name2}', which will be replaced
with the name of the adapters found on R1 and R2, respectively to form the final output
......@@ -355,6 +335,7 @@ class CombinatorialDemultiplexer(PairedEndFilter):
self.written = 0
self.written_bp = [0, 0]
self.qualities = qualities
self.file_opener = file_opener
@staticmethod
def _make_path(template, name1, name2):
......@@ -379,8 +360,8 @@ class CombinatorialDemultiplexer(PairedEndFilter):
path1 = self._make_path(self.template, name1, name2)
path2 = self._make_path(self.paired_template, name1, name2)
self.writers[key] = (
_open_raise_limit(path1, qualities=self.qualities),
_open_raise_limit(path2, qualities=self.qualities),
self.file_opener.dnaio_open_raise_limit(path1, qualities=self.qualities),
self.file_opener.dnaio_open_raise_limit(path2, qualities=self.qualities),
)
writer1, writer2 = self.writers[key]
self.written += 1
......
......@@ -3,6 +3,7 @@ Parse adapter specifications
"""
import re
import logging
from xopen import xopen
from dnaio.readers import FastaReader
from .adapters import Where, WHERE_TO_REMOVE_MAP, Adapter, BackOrFrontAdapter, LinkedAdapter
......@@ -397,7 +398,8 @@ class AdapterParser:
"""
if spec.startswith('file:'):
# read adapter sequences from a file
with FastaReader(spec[5:]) as fasta:
with xopen(spec[5:], mode="rb", threads=0) as f:
fasta = FastaReader(f)
for record in fasta:
name = record.name.split(None, 1)
name = name[0] if name else None
......
......@@ -6,13 +6,14 @@ import logging
import functools
from abc import ABC, abstractmethod
from multiprocessing import Process, Pipe, Queue
from pathlib import Path
import multiprocessing.connection
import traceback
from xopen import xopen
import dnaio
from .utils import Progress
from .utils import Progress, FileOpener
from .modifiers import PairedModifier
from .report import Statistics
from .filters import (Redirector, PairedRedirector, NoFilter, PairedNoFilter, InfoFileWriter,
......@@ -93,13 +94,14 @@ class Pipeline(ABC):
"""
n_adapters = 0
def __init__(self):
def __init__(self, file_opener: FileOpener):
self._close_files = []
self._reader = None
self._filters = None
self._modifiers = []
self._outfiles = None
self._demultiplexer = None
self._textiowrappers = []
# Filter settings
self._minimum_length = None
......@@ -108,6 +110,7 @@ class Pipeline(ABC):
self.discard_casava = False
self.discard_trimmed = False
self.discard_untrimmed = False
self.file_opener = file_opener
def connect_io(self, infiles: InputFiles, outfiles: OutputFiles):
self._reader = dnaio.open(infiles.file1, file2=infiles.file2,
......@@ -119,6 +122,10 @@ class Pipeline(ABC):
# for all outputs
if force_fasta:
kwargs['fileformat'] = 'fasta'
# file and file2 must already be file-like objects because we don’t want to
# take care of threads and compression levels here.
for f in (file, file2):
assert not (f is None and isinstance(f, (str, bytes, Path)))
return dnaio.open(file, file2=file2, mode='w', qualities=self.uses_qualities,
**kwargs)
......@@ -133,7 +140,9 @@ class Pipeline(ABC):
(WildcardFileWriter, outfiles.wildcard),
):
if outfile:
self._filters.append(filter_wrapper(None, filter_class(outfile), None))
textiowrapper = io.TextIOWrapper(outfile)
self._textiowrappers.append(textiowrapper)
self._filters.append(filter_wrapper(None, filter_class(textiowrapper), None))
# minimum length and maximum length
for lengths, file1, file2, filter_class in (
......@@ -184,7 +193,16 @@ class Pipeline(ABC):
untrimmed_filter_wrapper(untrimmed_writer, DiscardUntrimmedFilter(), DiscardUntrimmedFilter()))
self._filters.append(self._final_filter(outfiles))
def flush(self):
for f in self._textiowrappers:
f.flush()
for f in self._outfiles:
if f is not None:
f.flush()
def close(self):
for f in self._textiowrappers:
f.close() # This also closes the underlying files; a second close occurs below
for f in self._outfiles:
# TODO do not use hasattr
if f is not None and f is not sys.stdin and f is not sys.stdout and hasattr(f, 'close'):
......@@ -223,8 +241,8 @@ class SingleEndPipeline(Pipeline):
"""
paired = False
def __init__(self):
super().__init__()
def __init__(self, file_opener: FileOpener):
super().__init__(file_opener)
self._modifiers = []
def add(self, modifier):
......@@ -261,7 +279,8 @@ class SingleEndPipeline(Pipeline):
return NoFilter(writer)
def _create_demultiplexer(self, outfiles):
return Demultiplexer(outfiles.out, outfiles.untrimmed, qualities=self.uses_qualities)
return Demultiplexer(outfiles.out, outfiles.untrimmed, qualities=self.uses_qualities,
file_opener=self.file_opener)
@property
def minimum_length(self):
......@@ -288,8 +307,8 @@ class PairedEndPipeline(Pipeline):
"""
paired = True
def __init__(self, pair_filter_mode):
super().__init__()
def __init__(self, pair_filter_mode, file_opener: FileOpener):
super().__init__(file_opener)
self._pair_filter_mode = pair_filter_mode
self._reader = None
# Whether to ignore pair_filter mode for discard-untrimmed filter
......@@ -359,10 +378,11 @@ class PairedEndPipeline(Pipeline):
def _create_demultiplexer(self, outfiles):
if '{name1}' in outfiles.out and '{name2}' in outfiles.out:
return CombinatorialDemultiplexer(outfiles.out, outfiles.out2,
outfiles.untrimmed, qualities=self.uses_qualities)
outfiles.untrimmed, qualities=self.uses_qualities, file_opener=self.file_opener)
else:
return PairedDemultiplexer(outfiles.out, outfiles.out2,
outfiles.untrimmed, outfiles.untrimmed2, qualities=self.uses_qualities)
outfiles.untrimmed, outfiles.untrimmed2, qualities=self.uses_qualities,
file_opener=self.file_opener)
@property
def minimum_length(self):
......@@ -472,6 +492,7 @@ class WorkerProcess(Process):
outfiles = self._make_output_files()
self._pipeline.connect_io(infiles, outfiles)
(n, bp1, bp2) = self._pipeline.process_reads()
self._pipeline.flush()
cur_stats = Statistics().collect(n, bp1, bp2, [], self._pipeline._filters)
stats += cur_stats
self._send_outfiles(outfiles, chunk_index, n)
......@@ -502,7 +523,6 @@ class WorkerProcess(Process):
that has BytesIO instances for each non-None output file
"""
output_files = copy.copy(self._orig_outfiles)
# TODO info, rest, wildcard need to be StringIO()
for attr in (
"out", "out2", "untrimmed", "untrimmed2", "too_short", "too_short2", "too_long",
"too_long2", "info", "rest", "wildcard"
......@@ -632,13 +652,7 @@ class ParallelPipelineRunner(PipelineRunner):
@staticmethod
def can_output_to(outfiles):
return (
outfiles.out is not None
and outfiles.rest is None
and outfiles.info is None
and outfiles.wildcard is None
and not outfiles.demultiplex
)
return outfiles.out is not None and not outfiles.demultiplex
def _assign_output(self, outfiles):
if not self.can_output_to(outfiles):
......@@ -682,8 +696,7 @@ class ParallelPipelineRunner(PipelineRunner):
# this happens only when there is an exception sending
# the statistics)
e, tb_str = connection.recv()
# TODO traceback should only be printed in development
logger.debug('%s', tb_str)
logger.error('%s', tb_str)
raise e
if stats is None:
stats = cur_stats
......@@ -695,17 +708,16 @@ class ParallelPipelineRunner(PipelineRunner):
# An exception has occurred in the worker
e, tb_str = connection.recv()
# TODO traceback should only be printed in development
# We should use the worker's actual traceback object
# here, but traceback objects are not picklable.
logger.debug('%s', tb_str)
logger.error('%s', tb_str)
raise e
# No. of reads processed in this chunk
chunk_n = connection.recv()
if chunk_n == -2:
e, tb_str = connection.recv()
logger.debug('%s', tb_str)
logger.error('%s', tb_str)
raise e
n += chunk_n
self._progress.update(n)
......
import re
import sys
import time
import errno
import multiprocessing
from xopen import xopen
import dnaio
......@@ -142,3 +144,31 @@ def reverse_complemented_sequence(sequence: dnaio.Sequence):
else:
qualities = sequence.qualities[::-1]
return dnaio.Sequence(sequence.name, reverse_complement(sequence.sequence), qualities)
class FileOpener:
def __init__(self, compression_level: int = 6, threads: int = None):
self.compression_level = compression_level
self.threads = threads
def xopen(self, path, mode):
return xopen(path, mode, compresslevel=self.compression_level, threads=self.threads)
def dnaio_open(self, *args, **kwargs):
kwargs["opener"] = self.xopen
return dnaio.open(*args, **kwargs)
def dnaio_open_raise_limit(self, path, qualities):
"""
Open a FASTA/FASTQ file for writing. If it fails because the number of open files
would be exceeded, try to raise the soft limit and re-try.
"""
try:
f = self.dnaio_open(path, mode="w", qualities=qualities)
except OSError as e:
if e.errno == errno.EMFILE: # Too many open files
raise_open_files_limit(8)
f = self.dnaio_open(path, mode="w", qualities=qualities)
else:
raise
return f
......@@ -46,10 +46,10 @@ def test_lowercase(run):
run('-a ttagacatatctccgtcg', 'lowercase.fastq', 'small.fastq')
def test_rest(run, tmpdir):
def test_rest(run, tmpdir, cores):
"""-r/--rest-file"""
rest = str(tmpdir.join("rest.tmp"))
run(['-b', 'ADAPTER', '-N', '-r', rest], "rest.fa", "rest.fa")
run(['--cores', str(cores), '-b', 'ADAPTER', '-N', '-r', rest], "rest.fa", "rest.fa")
assert_files_equal(datapath('rest.txt'), rest)
......@@ -228,11 +228,14 @@ def test_read_wildcard(run):
("-a", "wildcard_adapter.fa"),
("-b", "wildcard_adapter_anywhere.fa"),
])
def test_adapter_wildcard(adapter_type, expected, run, tmpdir):
def test_adapter_wildcard(adapter_type, expected, run, tmpdir, cores):
"""wildcards in adapter"""
wildcard_path = str(tmpdir.join("wildcards.txt"))
run("--wildcard-file {} {} ACGTNNNACGT".format(wildcard_path, adapter_type),
expected, "wildcard_adapter.fa")
run([
"--cores", str(cores),
"--wildcard-file", wildcard_path,
adapter_type, "ACGTNNNACGT"
], expected, "wildcard_adapter.fa")
with open(wildcard_path) as wct:
lines = wct.readlines()
lines = [line.strip() for line in lines]
......@@ -309,26 +312,26 @@ def test_strip_suffix(run):
run("--strip-suffix _sequence -a XXXXXXX", "stripped.fasta", "simple.fasta")
def test_info_file(run, tmpdir):
def test_info_file(run, tmpdir, cores):
# The true adapter sequence in the illumina.fastq.gz data set is
# GCCTAACTTCTTAGACTGCCTTAAGGACGT (fourth base is different from the sequence shown here)
info_path = str(tmpdir.join("info.txt"))
run(["--info-file", info_path, "-a", "adapt=GCCGAACTTCTTAGACTGCCTTAAGGACGT"],
run(["--cores", str(cores), "--info-file", info_path, "-a", "adapt=GCCGAACTTCTTAGACTGCCTTAAGGACGT"],
"illumina.fastq", "illumina.fastq.gz")
assert_files_equal(cutpath("illumina.info.txt"), info_path)
def test_info_file_times(run, tmpdir):
def test_info_file_times(run, tmpdir, cores):
info_path = str(tmpdir.join("info.txt"))
run(["--info-file", info_path, "--times", "2", "-a", "adapt=GCCGAACTTCTTA",
run(["--cores", str(cores), "--info-file", info_path, "--times", "2", "-a", "adapt=GCCGAACTTCTTA",
"-a", "adapt2=GACTGCCTTAAGGACGT"], "illumina5.fastq", "illumina5.fastq")
assert_files_equal(cutpath('illumina5.info.txt'), info_path)
def test_info_file_fasta(run, tmpdir):
def test_info_file_fasta(run, tmpdir, cores):
info_path = str(tmpdir.join("info.txt"))
# Just make sure that it runs
run(["--info-file", info_path, "-a", "TTAGACATAT", "-g", "GAGATTGCCA", "--no-indels"],
run(["--cores", str(cores), "--info-file", info_path, "-a", "TTAGACATAT", "-g", "GAGATTGCCA", "--no-indels"],
"no_indels.fasta", "no_indels.fasta")
......