Skip to content
Commits on Source (4)
......@@ -2,6 +2,22 @@
Changes
=======
v2.5 (2019-09-04)
-----------------
* :issue:`391`: Multicore is now supported even when using
``--untrimmed-output``, ``--too-short-output``, ``--too-long-output``
or the corresponding ``...-paired-output`` options.
* :issue:`393`: Using ``--info-file`` no longer crashes when processing
paired-end data. However, the info file itself will only contain results
for R1.
* :issue:`394`: Options ``-e``/``--no-indels``/``-O`` were ignored for
linked adapters
* :issue:`320`: When a “Too many open files” error occurs during
demultiplexing, Cutadapt can now automatically raise the limit and
re-try if the limit is a “soft” limit.
v2.4 (2019-07-09)
-----------------
......
Metadata-Version: 2.1
Name: cutadapt
Version: 2.4
Version: 2.5
Summary: trim adapters from high-throughput sequencing reads
Home-page: https://cutadapt.readthedocs.io/
Author: Marcel Martin
......@@ -8,9 +8,15 @@ Author-email: marcel.martin@scilifelab.se
License: MIT
Description: .. image:: https://travis-ci.org/marcelm/cutadapt.svg?branch=master
:target: https://travis-ci.org/marcelm/cutadapt
:alt:
.. image:: https://img.shields.io/pypi/v/cutadapt.svg?branch=master
:target: https://pypi.python.org/pypi/cutadapt
:alt:
.. image:: https://codecov.io/gh/marcelm/cutadapt/branch/master/graph/badge.svg
:target: https://codecov.io/gh/marcelm/cutadapt
:alt:
========
Cutadapt
......
.. image:: https://travis-ci.org/marcelm/cutadapt.svg?branch=master
:target: https://travis-ci.org/marcelm/cutadapt
:alt:
.. image:: https://img.shields.io/pypi/v/cutadapt.svg?branch=master
:target: https://pypi.python.org/pypi/cutadapt
:alt:
.. image:: https://codecov.io/gh/marcelm/cutadapt/branch/master/graph/badge.svg
:target: https://codecov.io/gh/marcelm/cutadapt
:alt:
========
Cutadapt
......
python-cutadapt (2.5-1) unstable; urgency=medium
* Team upload.
* New upstream version
-- Steffen Moeller <moeller@debian.org> Sat, 26 Oct 2019 17:52:13 +0200
python-cutadapt (2.4-2) unstable; urgency=medium
* Reupload source package
......
......@@ -16,7 +16,7 @@ Build-Depends: debhelper-compat (= 12),
python3-xopen (>= 0.5.0),
python3-dnaio,
cython3
Standards-Version: 4.4.0
Standards-Version: 4.4.1
Vcs-Browser: https://salsa.debian.org/med-team/python-cutadapt
Vcs-Git: https://salsa.debian.org/med-team/python-cutadapt.git
Homepage: https://pypi.python.org/pypi/cutadapt
......
......@@ -73,7 +73,7 @@ overlaps that are actually allowed by the adapter type are actually considered.
.. _quality-trimming-algorithm:
Quality trimming algorithm
--------------------------
==========================
The trimming algorithm implemented in Cutadapt is the same as the one used by
BWA, but applied to both
......
......@@ -149,9 +149,6 @@ There are some limitations at the moment:
- ``--info-file``
- ``--rest-file``
- ``--wildcard-file``
- ``--untrimmed-output``, ``--untrimmed-paired-output``
- ``--too-short-output``, ``--too-short-paired-output``
- ``--too-long-output``, ``--too-long-paired-output``
* Multi-core is also not compatible with ``--format``
......@@ -167,6 +164,10 @@ Some of these limitations will be lifted in the future, as time allows.
.. versionadded:: 1.18
``--cores=0`` for autodetection
.. versionadded:: 2.5
Multicore works with ``--untrimmed/too-short/too-long-(paired)-output``
Speed-up tricks
---------------
......@@ -927,23 +928,29 @@ Quality trimming
----------------
The ``-q`` (or ``--quality-cutoff``) parameter can be used to trim
low-quality ends from reads before adapter removal. For this to work
correctly, the quality values must be encoded as ascii(phred quality +
33). If they are encoded as ascii(phred quality + 64), you need to add
``--quality-base=64`` to the command line.
Quality trimming can be done without adapter trimming, so this will work::
low-quality ends from reads. If you specify a single cutoff value, the
3' end of each read is trimmed::
cutadapt -q 10 -o output.fastq input.fastq
By default, only the 3' end of each read is quality-trimmed. If you want to
trim the 5' end as well, use the ``-q`` option with two comma-separated cutoffs::
For Illumina reads, this is sufficient as their quality is high at the beginning,
but degrades towards the 3' end.
It is also possible to also trim from the 5' end by specifying two
comma-separated cutoffs as *5' cutoff,3' cutoff*. For example, ::
cutadapt -q 15,10 -o output.fastq input.fastq
The 5' end will then be trimmed with a cutoff of 15, and the 3' end will be
trimmed with a cutoff of 10. If you only want to trim the 5' end, then use a
cutoff of 0 for the 3' end, as in ``-q 10,0``.
will quality-trim the 5' end with a cutoff of 15 and the 3' end with a cutoff
of 10. To only trim the 5' end, use a cutoff of 0 for the 3' end, as in
``-q 15,0``.
Quality trimming is done before any adapter trimming.
By default, quality values are assumed to be encoded as
ascii(phred quality + 33). Nowadays, this should always be the case.
Some old Illumina FASTQ files encode qualities as ascii(phred quality + 64).
For those, you must add ``--quality-base=64`` to the command line.
A :ref:`description of the quality-trimming algorithm is also
available <quality-trimming-algorithm>`. The algorithm is the same as used by BWA.
......@@ -955,11 +962,11 @@ Quality trimming of reads using two-color chemistry (NextSeq)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Some Illumina instruments use a two-color chemistry to encode the four bases.
This includes the NextSeq and the (at the time of this writing) recently
announced NovaSeq. In those instruments, a 'dark cycle' (with no detected color)
This includes the NextSeq and the NovaSeq. In those instruments, a
'dark cycle' (with no detected color)
encodes a ``G``. However, dark cycles also occur when when sequencing "falls
off" the end of the fragment. The read then `contains a run of high-quality, but
incorrect ``G`` calls <https://sequencing.qcfail.com/articles/illumina-2-colour-chemistry-can-overcall-high-confidence-g-bases/>`_
incorrect “G” calls <https://sequencing.qcfail.com/articles/illumina-2-colour-chemistry-can-overcall-high-confidence-g-bases/>`_
at its 3' end.
Since the regular quality-trimming algorithm cannot deal with this situation,
......
......@@ -79,15 +79,19 @@ Model somehow all the flags that exist for semiglobal alignment. For start of th
Not degraded and no bases before allowed = anchored.
Degraded and bases before allowed = regular 5'
API
---
* https://github.com/marcelm/cutadapt/labels/API
Paired-end trimming
-------------------
* Could also use a paired-end read merger, then remove adapters with -a and -g
Available/used letters for command-line options
Available letters for command-line options
-----------------------------------------------
* Remaining characters: All uppercase letters except A, B, G, M, N, O, U
* Lowercase letters: i, j, k, s, w
* Lowercase letters: i, k, s, w
* Planned/reserved: Q (paired-end quality trimming)
......@@ -158,5 +158,5 @@ To run Cutadapt and see the version number, type ::
~/cutadapt-venv/bin/cutadapt --version
The reported version number will be something like ``2.2.dev5+gf564208``. This
means that you are now running the version of Cutadapt that will become 2.2, and that is contains
means that you are now running the version of Cutadapt that will become 2.2, and that it contains
5 changes (*commits*) since the previous release (2.1 in this case).
......@@ -102,7 +102,7 @@ setup(
package_dir={'': 'src'},
packages=find_packages('src'),
entry_points={'console_scripts': ['cutadapt = cutadapt.__main__:main']},
install_requires=['dnaio>=0.3', 'xopen>=0.7.3'],
install_requires=['dnaio~=0.3.0', 'xopen~=0.8.1'],
extras_require={
'dev': ['Cython', 'pytest', 'pytest-timeout', 'sphinx', 'sphinx_issues'],
},
......
......@@ -64,7 +64,7 @@ from xopen import xopen
import dnaio
from cutadapt import __version__
from cutadapt.adapters import AdapterParser
from cutadapt.parser import AdapterParser
from cutadapt.modifiers import (LengthTagModifier, SuffixRemover, PrefixSuffixAdder,
ZeroCapper, QualityTrimmer, UnconditionalCutter, NEndTrimmer, AdapterCutter,
PairedAdapterCutterError, PairedAdapterCutter, NextseqQualityTrimmer, Shortener)
......@@ -72,6 +72,7 @@ from cutadapt.report import full_report, minimal_report
from cutadapt.pipeline import (SingleEndPipeline, PairedEndPipeline, InputFiles, OutputFiles,
SerialPipelineRunner, ParallelPipelineRunner)
from cutadapt.utils import available_cpu_count
from cutadapt.log import setup_logging, REPORT
logger = logging.getLogger()
......@@ -107,50 +108,6 @@ class CommandLineError(Exception):
pass
# Custom log level, see setup_logging
REPORT = 25
class NiceFormatter(logging.Formatter):
"""
Do not prefix "INFO:" to info-level log messages (but do it for all other
levels).
Based on http://stackoverflow.com/a/9218261/715090 .
"""
def format(self, record):
if record.levelno not in (logging.INFO, REPORT):
record.msg = '{}: {}'.format(record.levelname, record.msg)
return super().format(record)
def setup_logging(stdout=False, minimal=False, quiet=False, debug=False):
"""
Attach handler to the global logger object
"""
# For --report=minimal, we need this custom log level because we want to
# print nothing except the minimal report and therefore cannot use the
# INFO level (and the ERROR level would give us an 'ERROR:' prefix).
logging.addLevelName(REPORT, 'REPORT')
# Due to backwards compatibility, logging output is sent to standard output
# instead of standard error if the -o option is used.
stream_handler = logging.StreamHandler(sys.stdout if stdout else sys.stderr)
stream_handler.setFormatter(NiceFormatter())
# debug overrides quiet overrides minimal
if debug:
level = logging.DEBUG
elif quiet:
level = logging.ERROR
elif minimal:
level = REPORT
else:
level = logging.INFO
stream_handler.setLevel(level)
logger.setLevel(level)
logger.addHandler(stream_handler)
def get_argument_parser():
parser = CutadaptArgumentParser(usage=__doc__, add_help=False)
group = parser.add_argument_group("Options")
......@@ -171,7 +128,7 @@ def get_argument_parser():
group.add_argument("--buffer-size", type=int, default=4000000,
help=SUPPRESS)
# Compression level for gzipped output files. Not exposed since we have -Z
group.add_argument("--compression-level", default=6,
group.add_argument("--compression-level", type=int, default=6,
help=SUPPRESS)
# Deprecated: The input format is always auto-detected
group.add_argument("-f", "--format", help=SUPPRESS)
......@@ -588,15 +545,7 @@ def input_files_from_parsed_args(inputs, paired, interleaved):
return input_filename, input_paired_filename
def pipeline_from_parsed_args(args, paired, is_interleaved_output):
"""
Setup a processing pipeline from parsed command-line arguments.
If there are any problems parsing the arguments, a CommandLineError is thrown.
Return an instance of Pipeline (SingleEndPipeline or PairedEndPipeline)
"""
def check_arguments(args, paired, is_interleaved_output):
if not paired:
if args.untrimmed_paired_output:
raise CommandLineError("Option --untrimmed-paired-output can only be used when "
......@@ -631,6 +580,17 @@ def pipeline_from_parsed_args(args, paired, is_interleaved_output):
raise CommandLineError("The overlap must be at least 1.")
if not (0 <= args.gc_content <= 100):
raise CommandLineError("GC content must be given as percentage between 0 and 100")
def pipeline_from_parsed_args(args, paired, is_interleaved_output):
"""
Setup a processing pipeline from parsed command-line arguments.
If there are any problems parsing the arguments, a CommandLineError is raised.
Return an instance of Pipeline (SingleEndPipeline or PairedEndPipeline)
"""
check_arguments(args, paired, is_interleaved_output)
if args.action == 'none':
args.action = None
......@@ -780,7 +740,7 @@ def main(cmdlineargs=None, default_outfile=sys.stdout.buffer):
# Setup logging only if there are not already any handlers (can happen when
# this function is being called externally such as from unit tests)
if not logging.root.handlers:
setup_logging(stdout=log_to_stdout,
setup_logging(logger, stdout=log_to_stdout,
quiet=args.quiet, minimal=args.report == 'minimal', debug=args.debug)
if args.profile:
import cProfile
......@@ -820,13 +780,10 @@ def main(cmdlineargs=None, default_outfile=sys.stdout.buffer):
runner_class = ParallelPipelineRunner
runner_kwargs = dict(n_workers=cores, buffer_size=args.buffer_size)
else:
logger.error('Running in parallel is currently not supported for '
parser.error('Running in parallel is currently not supported for '
'the given combination of command-line parameters.\nThese '
'options are not supported: --info-file, --rest-file, '
'--wildcard-file, --untrimmed-output, '
'--untrimmed-paired-output, --too-short-output, '
'--too-short-paired-output, --too-long-output, '
'--too-long-paired-output, --format\n'
'--wildcard-file, --format\n'
'Also, demultiplexing is not supported.\n'
'Omit --cores/-j to continue.')
return # avoid IDE warnings below
......
/* Generated by Cython 0.29.12 */
/* Generated by Cython 0.29.13 */
 
/* BEGIN: Cython Metadata
{
......@@ -20,8 +20,8 @@ END: Cython Metadata */
#elif PY_VERSION_HEX < 0x02060000 || (0x03000000 <= PY_VERSION_HEX && PY_VERSION_HEX < 0x03030000)
#error Cython requires Python 2.6+ or Python 3.3+.
#else
#define CYTHON_ABI "0_29_12"
#define CYTHON_HEX_VERSION 0x001D0CF0
#define CYTHON_ABI "0_29_13"
#define CYTHON_HEX_VERSION 0x001D0DF0
#define CYTHON_FUTURE_DIVISION 1
#include <stddef.h>
#ifndef offsetof
......
# coding: utf-8
# file generated by setuptools_scm
# don't change, don't track in version control
version = '2.4'
version = '2.5'
......@@ -4,14 +4,11 @@ Adapter finding and trimming classes
The ...Adapter classes are responsible for finding adapters.
The ...Match classes trim the reads.
"""
import re
import logging
from enum import Enum
from collections import defaultdict
from dnaio.readers import FastaReader
from cutadapt import align
from .align import hamming_environment
from . import align
logger = logging.getLogger()
......@@ -43,408 +40,6 @@ WHERE_TO_REMOVE_MAP = {
}
class AdapterSpecification:
"""
Description of a single adapter. This represents the same information that would be
given on the commandline through an option such as -a, but does not handle linked
adapters on also not the file: syntax.
These are the attributes:
- name (None or str)
- restriction (None, 'anchored', or 'noninternal')
- sequence (nucleotide sequence as string)
- parameters (dict with extra parameters such as 'max_error_rate', 'min_overlap')
- cmdline_type ('front' for -a, 'back' for -g and 'anywhere' for -b)
>>> AdapterSpecification.parse('a_name=ACGT;anywhere', 'back')
AdapterSpecification(name='a_name', restriction=None, sequence='ACGT', parameters={'anywhere': True}, cmdline_type='back')
"""
def __init__(self, name, restriction, sequence, parameters, cmdline_type):
self.name = name
self.restriction = restriction
self.sequence = sequence
self.parameters = parameters
self.cmdline_type = cmdline_type
@classmethod
def parse(cls, spec: str, cmdline_type: str):
"""Factory for creating an instance from a string specification"""
name, restriction, sequence, parameters = cls._parse(spec, cmdline_type)
return cls(name, restriction, sequence, parameters, cmdline_type)
def __repr__(self):
return '{}(name={!r}, restriction={!r}, sequence={!r}, parameters={!r}, cmdline_type={!r})'.format(
self.__class__.__name__, self.name, self.restriction, self.sequence, self.parameters, self.cmdline_type)
def __eq__(self, other):
return (
self.name == other.name
and self.restriction == other.restriction
and self.sequence == other.sequence
and self.parameters == other.parameters
and self.cmdline_type == other.cmdline_type
)
@staticmethod
def expand_braces(sequence):
"""
Replace all occurrences of ``x{n}`` (where x is any character) with n
occurrences of x. Raise ValueError if the expression cannot be parsed.
>>> AdapterSpecification.expand_braces('TGA{5}CT')
'TGAAAAACT'
"""
# Simple DFA with four states, encoded in prev
result = ''
prev = None
for s in re.split('([{}])', sequence):
if s == '':
continue
if prev is None:
if s == '{':
raise ValueError('"{" must be used after a character')
if s == '}':
raise ValueError('"}" cannot be used here')
prev = s
result += s
elif prev == '{':
prev = int(s)
if not 0 <= prev <= 10000:
raise ValueError('Value {} invalid'.format(prev))
elif isinstance(prev, int):
if s != '}':
raise ValueError('"}" expected')
result = result[:-1] + result[-1] * prev
prev = None
else:
if s != '{':
raise ValueError('Expected "{"')
prev = '{'
# Check if we are in a non-terminating state
if isinstance(prev, int) or prev == '{':
raise ValueError("Unterminated expression")
return result
@staticmethod
def _extract_name(spec):
"""
Parse an adapter specification given as 'name=adapt' into 'name' and 'adapt'.
"""
fields = spec.split('=', 1)
if len(fields) > 1:
name, spec = fields
name = name.strip()
else:
name = None
spec = spec.strip()
return name, spec
allowed_parameters = {
# abbreviations
'e': 'max_error_rate',
'error_rate': 'max_error_rate',
'o': 'min_overlap',
# allowed parameters
'max_error_rate': None,
'min_overlap': None,
'anywhere': None,
'required': None,
'optional': None, # If this is specified, 'required' will be set to False
}
@classmethod
def _parse_parameters(cls, spec):
"""Parse key=value;key=value;key=value into a dict"""
fields = spec.split(';')
result = dict()
for field in fields:
field = field.strip()
if not field:
continue
key, equals, value = field.partition('=')
if equals == '=' and value == '':
raise ValueError('No value given')
key = key.strip()
if key not in cls.allowed_parameters:
raise KeyError('Unknown parameter {}'.format(key))
# unabbreviate
while cls.allowed_parameters[key] is not None:
key = cls.allowed_parameters[key]
value = value.strip()
if value == '':
value = True
else:
try:
value = int(value)
except ValueError:
value = float(value)
if key in result:
raise KeyError('Key {} specified twice'.format(key))
result[key] = value
if 'optional' in result and 'required' in result:
raise ValueError("'optional' and 'required' cannot be specified at the same time")
if 'optional' in result:
result['required'] = False
del result['optional']
return result
@classmethod
def _parse(cls, spec, cmdline_type):
"""
Parse an adapter specification for a non-linked adapter (without '...')
Allow:
'back' and ADAPTER
'back' and ADAPTERX
'back' and ADAPTER$
'front' and ADAPTER
'front' and XADAPTER
'front' and ^ADAPTER
'anywhere' and ADAPTER
"""
if cmdline_type not in ("front", "back", "anywhere"):
raise ValueError("cmdline_type must be front, back or anywhere")
error = ValueError(
"You cannot use multiple placement restrictions for an adapter at the same time. "
"Choose one of ^ADAPTER, ADAPTER$, XADAPTER or ADAPTERX")
spec, middle, parameters_spec = spec.partition(';')
name, spec = cls._extract_name(spec)
spec = spec.strip()
parameters = cls._parse_parameters(parameters_spec)
spec = AdapterSpecification.expand_braces(spec)
# Special case for adapters consisting of only X characters:
# This needs to be supported for backwards-compatibilitity
if len(spec.strip('X')) == 0:
return name, None, spec, {}
front_restriction = None
if spec.startswith('^'):
front_restriction = 'anchored'
spec = spec[1:]
if spec.upper().startswith('X'):
if front_restriction is not None:
raise error
front_restriction = 'noninternal'
spec = spec.lstrip('xX')
back_restriction = None
if spec.endswith('$'):
back_restriction = 'anchored'
spec = spec[:-1]
if spec.upper().endswith('X'):
if back_restriction is not None:
raise error
back_restriction = 'noninternal'
spec = spec.rstrip('xX')
n_placement_restrictions = int(bool(front_restriction)) + int(bool(back_restriction))
if n_placement_restrictions > 1:
raise error
if cmdline_type == 'front' and back_restriction:
raise ValueError(
"Allowed placement restrictions for a 5' adapter are XADAPTER and ^ADAPTER")
if cmdline_type == 'back' and front_restriction:
raise ValueError(
"Allowed placement restrictions for a 3' adapter are ADAPTERX and ADAPTER$")
assert front_restriction is None or back_restriction is None
if front_restriction is not None:
restriction = front_restriction
else:
restriction = back_restriction
if cmdline_type == 'anywhere' and restriction is not None:
raise ValueError(
"Placement restrictions (with X, ^, $) not supported for 'anywhere' (-b) adapters")
return name, restriction, spec, parameters
@staticmethod
def _restriction_to_where(cmdline_type, restriction):
if cmdline_type == 'front':
if restriction is None:
return Where.FRONT
elif restriction == 'anchored':
return Where.PREFIX
elif restriction == 'noninternal':
return Where.FRONT_NOT_INTERNAL
else:
raise ValueError(
'Value {} for a front restriction not allowed'.format(restriction))
elif cmdline_type == 'back':
if restriction is None:
return Where.BACK
elif restriction == 'anchored':
return Where.SUFFIX
elif restriction == 'noninternal':
return Where.BACK_NOT_INTERNAL
else:
raise ValueError(
'Value {} for a back restriction not allowed'.format(restriction))
else:
assert cmdline_type == 'anywhere'
if restriction is None:
return Where.ANYWHERE
else:
raise ValueError('No placement may be specified for "anywhere" adapters')
def where(self):
return self._restriction_to_where(self.cmdline_type, self.restriction)
class AdapterParser:
"""
Factory for Adapter classes that all use the same default parameters (error rate,
indels etc.). The given **kwargs will be passed to the Adapter constructors.
"""
def __init__(self, **kwargs):
# kwargs: max_error_rate, min_overlap, read_wildcards, adapter_wildcards, indels
self.default_parameters = kwargs
def _parse(self, spec: str, cmdline_type='back', name=None):
"""
Parse an adapter specification not using ``file:`` notation and return
an object of an appropriate Adapter class.
name -- Adapter name if not included as part of the spec. (If spec is
'name=ADAPTER', name will be 'name'.)
cmdline_type -- describes which commandline parameter was used (``-a``
is 'back', ``-b`` is 'anywhere', and ``-g`` is 'front').
"""
if cmdline_type not in ('front', 'back', 'anywhere'):
raise ValueError('cmdline_type cannot be {!r}'.format(cmdline_type))
spec1, middle, spec2 = spec.partition('...')
del spec
# Handle linked adapter
if middle == '...' and spec1 and spec2:
return self._parse_linked(spec1, spec2, name, cmdline_type)
elif middle == '...':
if not spec1:
if cmdline_type == 'back': # -a ...ADAPTER
spec = spec2
else: # -g ...ADAPTER
raise ValueError('Invalid adapter specification')
elif not spec2:
if cmdline_type == 'back': # -a ADAPTER...
cmdline_type = 'front'
spec = spec1
else: # -g ADAPTER...
spec = spec1
else:
assert False, 'This should not happen'
else:
spec = spec1
spec = AdapterSpecification.parse(spec, cmdline_type)
where = spec.where()
if not name:
name = spec.name
if spec.parameters.pop('anywhere', False):
spec.parameters['remove'] = WHERE_TO_REMOVE_MAP[where]
where = Where.ANYWHERE
parameters = self.default_parameters.copy()
parameters.update(spec.parameters)
if where in (Where.FRONT, Where.BACK):
adapter_class = BackOrFrontAdapter
else:
adapter_class = Adapter
return adapter_class(sequence=spec.sequence, where=where, name=name, **parameters)
def _parse_linked(self, spec1: str, spec2: str, name, cmdline_type):
"""Return a linked adapter from two specification strings"""
if cmdline_type == 'anywhere':
raise ValueError("'anywhere' (-b) adapters may not be linked")
front_spec = AdapterSpecification.parse(spec1, 'front')
back_spec = AdapterSpecification.parse(spec2, 'back')
if not name:
name = front_spec.name
if cmdline_type == 'back' and front_spec.restriction is None:
import textwrap
logger.warning('\n'.join(textwrap.wrap(
"You specified a linked adapter as '-a ADAPTER1...ADAPTER2'. "
"The interpretation of what this means has changed in Cutadapt 2.0. "
"(The 5' adapter is now no longer anchored by default.) "
"To get results consist with the old behavior, you need to anchor "
"the 5' adapter explicitly as in '-a ^ADAPTER1...ADAPTER2'."
)))
front_anchored = front_spec.restriction is not None
back_anchored = back_spec.restriction is not None
front_parameters = self.default_parameters.copy()
front_parameters.update(front_spec.parameters)
back_parameters = self.default_parameters.copy()
back_parameters.update(back_spec.parameters)
if cmdline_type == 'front':
# -g requires both adapters to be present
front_required = True
back_required = True
else:
# -a requires only the anchored adapters to be present
front_required = front_anchored
back_required = back_anchored
# Handle parameters overriding whether an adapter is required
front_required = front_spec.parameters.pop('required', front_required)
back_required = back_spec.parameters.pop('required', back_required)
front_adapter = Adapter(front_spec.sequence, where=front_spec.where(), name=None,
**front_spec.parameters)
back_adapter = Adapter(back_spec.sequence, where=back_spec.where(), name=None,
**back_spec.parameters)
return LinkedAdapter(
front_adapter=front_adapter,
back_adapter=back_adapter,
front_required=front_required,
back_required=back_required,
name=name,
)
def parse(self, spec, cmdline_type='back'):
"""
Parse an adapter specification and yield appropriate Adapter classes.
This works like the _parse_no_file() function above, but also supports the
``file:`` notation for reading adapters from an external FASTA
file. Since a file can contain multiple adapters, this
function is a generator.
"""
if spec.startswith('file:'):
# read adapter sequences from a file
with FastaReader(spec[5:]) as fasta:
for record in fasta:
name = record.name.split(None, 1)
name = name[0] if name else None
yield self._parse(record.sequence, cmdline_type, name=name)
else:
yield self._parse(spec, cmdline_type, name=None)
def parse_multi(self, back, anywhere, front):
"""
Parse all three types of commandline options that can be used to
specify adapters. back, anywhere and front are lists of strings,
corresponding to the respective commandline types (-a, -b, -g).
Return a list of appropriate Adapter classes.
"""
adapters = []
for specs, cmdline_type in (back, 'back'), (anywhere, 'anywhere'), (front, 'front'):
for spec in specs:
adapters.extend(self.parse(spec, cmdline_type))
return adapters
def returns_defaultdict_int():
# We need this function to make EndStatistics picklable.
# Even a @staticmethod of EndStatistics is not sufficient
......@@ -1016,7 +611,7 @@ class MultiAdapter:
for adapter in self._adapters:
sequence = adapter.sequence
k = int(adapter.max_error_rate * len(sequence))
for s, errors, matches in hamming_environment(sequence, k):
for s, errors, matches in align.hamming_environment(sequence, k):
if s in index:
other_adapter, other_errors, other_matches = index[s]
if matches < other_matches:
......
......@@ -13,15 +13,33 @@ filters is created and each redirector is called in turn until one returns True.
The read is then assumed to have been "consumed", that is, either written
somewhere or filtered (should be discarded).
"""
from abc import ABC, abstractmethod
import errno
import dnaio
from .utils import raise_open_files_limit
# Constants used when returning from a Filter’s __call__ method to improve
# readability (it is unintuitive that "return True" means "discard the read").
DISCARD = True
KEEP = False
class NoFilter:
class SingleEndFilter(ABC):
@abstractmethod
def __call__(self, read, matches):
pass
class PairedEndFilter(ABC):
@abstractmethod
def __call__(self, read1, matches1, read2, matches2):
pass
class NoFilter(SingleEndFilter):
"""
No filtering, just send each read to the given writer.
"""
......@@ -41,7 +59,7 @@ class NoFilter:
return DISCARD
class PairedNoFilter:
class PairedNoFilter(PairedEndFilter):
"""
No filtering, just send each paired-end read to the given writer.
"""
......@@ -62,11 +80,11 @@ class PairedNoFilter:
return DISCARD
class Redirector:
class Redirector(SingleEndFilter):
"""
Redirect discarded reads to the given writer. This is for single-end reads.
"""
def __init__(self, writer, filter, filter2=None):
def __init__(self, writer, filter: SingleEndFilter, filter2=None):
# TODO filter2 should really not be here
self.filtered = 0
self.writer = writer
......@@ -85,7 +103,7 @@ class Redirector:
return KEEP
class PairedRedirector:
class PairedRedirector(PairedEndFilter):
"""
Redirect paired-end reads matching a filtering criterion to a writer.
Different filtering styles are supported, differing by which of the
......@@ -141,7 +159,7 @@ class PairedRedirector:
return KEEP
class TooShortReadFilter:
class TooShortReadFilter(SingleEndFilter):
def __init__(self, minimum_length):
self.minimum_length = minimum_length
......@@ -149,7 +167,7 @@ class TooShortReadFilter:
return len(read) < self.minimum_length
class TooLongReadFilter:
class TooLongReadFilter(SingleEndFilter):
def __init__(self, maximum_length):
self.maximum_length = maximum_length
......@@ -157,7 +175,7 @@ class TooLongReadFilter:
return len(read) > self.maximum_length
class NContentFilter:
class NContentFilter(SingleEndFilter):
"""
Discards a reads that has a number of 'N's over a given threshold. It handles both raw counts
of Ns as well as proportions. Note, for raw counts, it is a 'greater than' comparison,
......@@ -183,7 +201,7 @@ class NContentFilter:
return n_count > self.cutoff
class DiscardUntrimmedFilter:
class DiscardUntrimmedFilter(SingleEndFilter):
"""
Return True if read is untrimmed.
"""
......@@ -191,7 +209,7 @@ class DiscardUntrimmedFilter:
return not matches
class DiscardTrimmedFilter:
class DiscardTrimmedFilter(SingleEndFilter):
"""
Return True if read is trimmed.
"""
......@@ -199,7 +217,7 @@ class DiscardTrimmedFilter:
return bool(matches)
class CasavaFilter:
class CasavaFilter(SingleEndFilter):
"""
Remove reads that fail the CASAVA filter. These have header lines that
look like ``xxxx x:Y:x:x`` (with a ``Y``). Reads that pass the filter
......@@ -212,7 +230,23 @@ class CasavaFilter:
return right[1:4] == ':Y:' # discard if :Y: found
class Demultiplexer:
def _open_raise_limit(path, qualities):
"""
Open a FASTA/FASTQ file for writing. If it fails because the number of open files
would be exceeded, try to raise the soft limit and re-try.
"""
try:
f = dnaio.open(path, mode="w", qualities=qualities)
except OSError as e:
if e.errno == errno.EMFILE: # Too many open files
raise_open_files_limit(8)
f = dnaio.open(path, mode="w", qualities=qualities)
else:
raise
return f
class Demultiplexer(SingleEndFilter):
"""
Demultiplex trimmed reads. Reads are written to different output files
depending on which adapter matches. Files are created when the first read
......@@ -241,15 +275,15 @@ class Demultiplexer:
if matches:
name = matches[-1].adapter.name
if name not in self.writers:
self.writers[name] = dnaio.open(self.template.replace('{name}', name),
mode='w', qualities=self.qualities)
self.writers[name] = _open_raise_limit(
self.template.replace('{name}', name), self.qualities)
self.written += 1
self.written_bp[0] += len(read)
self.writers[name].write(read)
else:
if self.untrimmed_writer is None and self.untrimmed_path is not None:
self.untrimmed_writer = dnaio.open(self.untrimmed_path,
mode='w', qualities=self.qualities)
self.untrimmed_writer = _open_raise_limit(
self.untrimmed_path, self.qualities)
if self.untrimmed_writer is not None:
self.written += 1
self.written_bp[0] += len(read)
......@@ -263,7 +297,7 @@ class Demultiplexer:
self.untrimmed_writer.close()
class PairedDemultiplexer:
class PairedDemultiplexer(PairedEndFilter):
"""
Demultiplex trimmed paired-end reads. Reads are written to different output files
depending on which adapter (in read 1) matches.
......@@ -298,7 +332,7 @@ class PairedDemultiplexer:
self._demultiplexer2.close()
class CombinatorialDemultiplexer:
class CombinatorialDemultiplexer(PairedEndFilter):
"""
Demultiplex reads depending on which adapter matches, taking into account both matches
on R1 and R2.
......@@ -331,7 +365,6 @@ class CombinatorialDemultiplexer:
Write the read to the proper output file according to the most recent matches both on
R1 and R2
"""
# import ipdb; ipdb.set_trace()
assert read2 is not None
name1 = matches1[-1].adapter.name if matches1 else None
name2 = matches2[-1].adapter.name if matches2 else None
......@@ -346,8 +379,8 @@ class CombinatorialDemultiplexer:
path1 = self._make_path(self.template, name1, name2)
path2 = self._make_path(self.paired_template, name1, name2)
self.writers[key] = (
dnaio.open(path1, mode='w', qualities=self.qualities),
dnaio.open(path2, mode='w', qualities=self.qualities),
_open_raise_limit(path1, qualities=self.qualities),
_open_raise_limit(path2, qualities=self.qualities),
)
writer1, writer2 = self.writers[key]
self.written += 1
......@@ -363,7 +396,7 @@ class CombinatorialDemultiplexer:
w2.close()
class RestFileWriter:
class RestFileWriter(SingleEndFilter):
def __init__(self, file):
self.file = file
......@@ -375,7 +408,7 @@ class RestFileWriter:
return KEEP
class WildcardFileWriter:
class WildcardFileWriter(SingleEndFilter):
def __init__(self, file):
self.file = file
......@@ -385,7 +418,7 @@ class WildcardFileWriter:
return KEEP
class InfoFileWriter:
class InfoFileWriter(SingleEndFilter):
def __init__(self, file):
self.file = file
......
import sys
import logging
# Custom log level
REPORT = 25
class NiceFormatter(logging.Formatter):
"""
Do not prefix "INFO:" to info-level log messages (but do it for all other
levels).
Based on http://stackoverflow.com/a/9218261/715090 .
"""
def format(self, record):
if record.levelno not in (logging.INFO, REPORT):
record.msg = '{}: {}'.format(record.levelname, record.msg)
return super().format(record)
def setup_logging(logger, stdout=False, minimal=False, quiet=False, debug=False):
"""
Attach handler to the global logger object
"""
# For --report=minimal, we need this custom log level because we want to
# print nothing except the minimal report and therefore cannot use the
# INFO level (and the ERROR level would give us an 'ERROR:' prefix).
logging.addLevelName(REPORT, 'REPORT')
# Due to backwards compatibility, logging output is sent to standard output
# instead of standard error if the -o option is used.
stream_handler = logging.StreamHandler(sys.stdout if stdout else sys.stderr)
stream_handler.setFormatter(NiceFormatter())
# debug overrides quiet overrides minimal
if debug:
level = logging.DEBUG
elif quiet:
level = logging.ERROR
elif minimal:
level = REPORT
else:
level = logging.INFO
stream_handler.setLevel(level)
logger.setLevel(level)
logger.addHandler(stream_handler)
"""
Parse adapter specifications
"""
import re
import logging
from dnaio.readers import FastaReader
from .adapters import Where, WHERE_TO_REMOVE_MAP, Adapter, BackOrFrontAdapter, LinkedAdapter
logger = logging.getLogger(__name__)
class AdapterSpecification:
"""
Description of a single non-linked adapter.
These are the attributes:
- name (None or str)
- restriction (None, 'anchored', or 'noninternal')
- sequence (nucleotide sequence as string)
- parameters (dict with extra parameters such as 'max_error_rate', 'min_overlap')
- cmdline_type ('front' for -a, 'back' for -g and 'anywhere' for -b)
>>> AdapterSpecification.parse('a_name=ACGT;anywhere', 'back')
AdapterSpecification(name='a_name', restriction=None, sequence='ACGT', parameters={'anywhere': True}, cmdline_type='back')
"""
def __init__(self, name, restriction, sequence, parameters, cmdline_type):
assert restriction in (None, "anchored", "noninternal")
assert cmdline_type in ("front", "back", "anywhere")
self.name = name
self.restriction = restriction
self.sequence = sequence
self.parameters = parameters
self.cmdline_type = cmdline_type
@classmethod
def parse(cls, spec: str, cmdline_type: str):
"""Factory for creating an instance from a string specification"""
name, restriction, sequence, parameters = cls._parse(spec, cmdline_type)
return cls(name, restriction, sequence, parameters, cmdline_type)
def __repr__(self):
return '{}(name={!r}, restriction={!r}, sequence={!r}, parameters={!r}, cmdline_type={!r})'.format(
self.__class__.__name__, self.name, self.restriction, self.sequence, self.parameters, self.cmdline_type)
def __eq__(self, other):
return (
self.name == other.name
and self.restriction == other.restriction
and self.sequence == other.sequence
and self.parameters == other.parameters
and self.cmdline_type == other.cmdline_type
)
@staticmethod
def expand_braces(sequence):
"""
Replace all occurrences of ``x{n}`` (where x is any character) with n
occurrences of x. Raise ValueError if the expression cannot be parsed.
>>> AdapterSpecification.expand_braces('TGA{5}CT')
'TGAAAAACT'
"""
# Simple DFA with four states, encoded in prev
result = ''
prev = None
for s in re.split('([{}])', sequence):
if s == '':
continue
if prev is None:
if s == '{':
raise ValueError('"{" must be used after a character')
if s == '}':
raise ValueError('"}" cannot be used here')
prev = s
result += s
elif prev == '{':
prev = int(s)
if not 0 <= prev <= 10000:
raise ValueError('Value {} invalid'.format(prev))
elif isinstance(prev, int):
if s != '}':
raise ValueError('"}" expected')
result = result[:-1] + result[-1] * prev
prev = None
else:
if s != '{':
raise ValueError('Expected "{"')
prev = '{'
# Check if we are in a non-terminating state
if isinstance(prev, int) or prev == '{':
raise ValueError("Unterminated expression")
return result
@staticmethod
def _extract_name(spec):
"""
Parse an adapter specification given as 'name=adapt' into 'name' and 'adapt'.
"""
fields = spec.split('=', 1)
if len(fields) > 1:
name, spec = fields
name = name.strip()
else:
name = None
spec = spec.strip()
return name, spec
allowed_parameters = {
# abbreviations
'e': 'max_error_rate',
'error_rate': 'max_error_rate',
'o': 'min_overlap',
# allowed parameters
'max_error_rate': None,
'min_overlap': None,
'anywhere': None,
'required': None,
'optional': None, # If this is specified, 'required' will be set to False
}
@classmethod
def _parse_parameters(cls, spec):
"""Parse key=value;key=value;key=value into a dict"""
fields = spec.split(';')
result = dict()
for field in fields:
field = field.strip()
if not field:
continue
key, equals, value = field.partition('=')
if equals == '=' and value == '':
raise ValueError('No value given')
key = key.strip()
if key not in cls.allowed_parameters:
raise KeyError('Unknown parameter {}'.format(key))
# unabbreviate
while cls.allowed_parameters[key] is not None:
key = cls.allowed_parameters[key]
value = value.strip()
if value == '':
value = True
else:
try:
value = int(value)
except ValueError:
value = float(value)
if key in result:
raise KeyError('Key {} specified twice'.format(key))
result[key] = value
if 'optional' in result and 'required' in result:
raise ValueError("'optional' and 'required' cannot be specified at the same time")
if 'optional' in result:
result['required'] = False
del result['optional']
return result
@classmethod
def _parse(cls, spec, cmdline_type):
"""
Parse an adapter specification for a non-linked adapter (without '...')
Allow:
'back' and ADAPTER
'back' and ADAPTERX
'back' and ADAPTER$
'front' and ADAPTER
'front' and XADAPTER
'front' and ^ADAPTER
'anywhere' and ADAPTER
"""
if cmdline_type not in ("front", "back", "anywhere"):
raise ValueError("cmdline_type must be front, back or anywhere")
error = ValueError(
"You cannot use multiple placement restrictions for an adapter at the same time. "
"Choose one of ^ADAPTER, ADAPTER$, XADAPTER or ADAPTERX")
spec, middle, parameters_spec = spec.partition(';')
name, spec = cls._extract_name(spec)
spec = spec.strip()
parameters = cls._parse_parameters(parameters_spec)
spec = AdapterSpecification.expand_braces(spec)
# Special case for adapters consisting of only X characters:
# This needs to be supported for backwards-compatibilitity
if len(spec.strip('X')) == 0:
return name, None, spec, {}
front_restriction = None
if spec.startswith('^'):
front_restriction = 'anchored'
spec = spec[1:]
if spec.upper().startswith('X'):
if front_restriction is not None:
raise error
front_restriction = 'noninternal'
spec = spec.lstrip('xX')
back_restriction = None
if spec.endswith('$'):
back_restriction = 'anchored'
spec = spec[:-1]
if spec.upper().endswith('X'):
if back_restriction is not None:
raise error
back_restriction = 'noninternal'
spec = spec.rstrip('xX')
n_placement_restrictions = int(bool(front_restriction)) + int(bool(back_restriction))
if n_placement_restrictions > 1:
raise error
if cmdline_type == 'front' and back_restriction:
raise ValueError(
"Allowed placement restrictions for a 5' adapter are XADAPTER and ^ADAPTER")
if cmdline_type == 'back' and front_restriction:
raise ValueError(
"Allowed placement restrictions for a 3' adapter are ADAPTERX and ADAPTER$")
assert front_restriction is None or back_restriction is None
if front_restriction is not None:
restriction = front_restriction
else:
restriction = back_restriction
if cmdline_type == 'anywhere' and restriction is not None:
raise ValueError(
"Placement restrictions (with X, ^, $) not supported for 'anywhere' (-b) adapters")
return name, restriction, spec, parameters
@staticmethod
def _restriction_to_where(cmdline_type, restriction):
if cmdline_type == 'front':
if restriction is None:
return Where.FRONT
elif restriction == 'anchored':
return Where.PREFIX
elif restriction == 'noninternal':
return Where.FRONT_NOT_INTERNAL
else:
raise ValueError(
'Value {} for a front restriction not allowed'.format(restriction))
elif cmdline_type == 'back':
if restriction is None:
return Where.BACK
elif restriction == 'anchored':
return Where.SUFFIX
elif restriction == 'noninternal':
return Where.BACK_NOT_INTERNAL
else:
raise ValueError(
'Value {} for a back restriction not allowed'.format(restriction))
else:
assert cmdline_type == 'anywhere'
if restriction is None:
return Where.ANYWHERE
else:
raise ValueError('No placement may be specified for "anywhere" adapters')
def where(self):
return self._restriction_to_where(self.cmdline_type, self.restriction)
class AdapterParser:
"""
Factory for Adapter classes that all use the same default parameters (error rate,
indels etc.). The given **kwargs will be passed to the Adapter constructors.
"""
def __init__(self, **kwargs):
# kwargs: max_error_rate, min_overlap, read_wildcards, adapter_wildcards, indels
self.default_parameters = kwargs
def _parse(self, spec: str, cmdline_type='back', name=None):
"""
Parse an adapter specification not using ``file:`` notation and return
an object of an appropriate Adapter class.
name -- Adapter name if not included as part of the spec. (If spec is
'name=ADAPTER', name will be 'name'.)
cmdline_type -- describes which commandline parameter was used (``-a``
is 'back', ``-b`` is 'anywhere', and ``-g`` is 'front').
"""
if cmdline_type not in ('front', 'back', 'anywhere'):
raise ValueError('cmdline_type cannot be {!r}'.format(cmdline_type))
spec1, middle, spec2 = spec.partition('...')
if middle == '...' and spec1 and spec2:
return self._parse_linked(spec1, spec2, name, cmdline_type)
if middle == '...':
spec, cmdline_type = self._normalize_ellipsis(spec1, spec2, cmdline_type)
else:
spec = spec1
return self._parse_not_linked(spec, name, cmdline_type)
@staticmethod
def _normalize_ellipsis(spec1: str, spec2: str, cmdline_type):
if not spec1:
if cmdline_type == 'back':
# -a ...ADAPTER
spec = spec2
else:
# -g ...ADAPTER
raise ValueError('Invalid adapter specification')
elif not spec2:
if cmdline_type == 'back':
# -a ADAPTER...
cmdline_type = 'front'
spec = spec1
else:
# -g ADAPTER...
spec = spec1
else:
raise ValueError("Expected either spec1 or spec2")
return spec, cmdline_type
def _parse_not_linked(self, spec: str, name, cmdline_type):
spec = AdapterSpecification.parse(spec, cmdline_type)
where = spec.where()
if not name:
name = spec.name
if spec.parameters.pop('anywhere', False):
spec.parameters['remove'] = WHERE_TO_REMOVE_MAP[where]
where = Where.ANYWHERE
parameters = self.default_parameters.copy()
parameters.update(spec.parameters)
if where in (Where.FRONT, Where.BACK):
adapter_class = BackOrFrontAdapter
else:
adapter_class = Adapter
return adapter_class(sequence=spec.sequence, where=where, name=name, **parameters)
def _parse_linked(self, spec1: str, spec2: str, name, cmdline_type):
"""Return a linked adapter from two specification strings"""
if cmdline_type == 'anywhere':
raise ValueError("'anywhere' (-b) adapters may not be linked")
front_spec = AdapterSpecification.parse(spec1, 'front')
back_spec = AdapterSpecification.parse(spec2, 'back')
if not name:
name = front_spec.name
if cmdline_type == 'back' and front_spec.restriction is None:
import textwrap
logger.warning('\n'.join(textwrap.wrap(
"You specified a linked adapter as '-a ADAPTER1...ADAPTER2'. "
"The interpretation of what this means has changed in Cutadapt 2.0. "
"(The 5' adapter is now no longer anchored by default.) "
"To get results consist with the old behavior, you need to anchor "
"the 5' adapter explicitly as in '-a ^ADAPTER1...ADAPTER2'."
)))
front_anchored = front_spec.restriction is not None
back_anchored = back_spec.restriction is not None
front_parameters = self.default_parameters.copy()
front_parameters.update(front_spec.parameters)
back_parameters = self.default_parameters.copy()
back_parameters.update(back_spec.parameters)
if cmdline_type == 'front':
# -g requires both adapters to be present
front_required = True
back_required = True
else:
# -a requires only the anchored adapters to be present
front_required = front_anchored
back_required = back_anchored
# Handle parameters overriding whether an adapter is required
front_required = front_parameters.pop('required', front_required)
back_required = back_parameters.pop('required', back_required)
front_adapter = Adapter(front_spec.sequence, where=front_spec.where(), name=None,
**front_parameters)
back_adapter = Adapter(back_spec.sequence, where=back_spec.where(), name=None,
**back_parameters)
return LinkedAdapter(
front_adapter=front_adapter,
back_adapter=back_adapter,
front_required=front_required,
back_required=back_required,
name=name,
)
def parse(self, spec, cmdline_type='back'):
"""
Parse an adapter specification and yield appropriate Adapter classes.
This works like the _parse_no_file() function above, but also supports the
``file:`` notation for reading adapters from an external FASTA
file. Since a file can contain multiple adapters, this
function is a generator.
"""
if spec.startswith('file:'):
# read adapter sequences from a file
with FastaReader(spec[5:]) as fasta:
for record in fasta:
name = record.name.split(None, 1)
name = name[0] if name else None
yield self._parse(record.sequence, cmdline_type, name=name)
else:
yield self._parse(spec, cmdline_type, name=None)
def parse_multi(self, back, anywhere, front):
"""
Parse all three types of commandline options that can be used to
specify adapters. back, anywhere and front are lists of strings,
corresponding to the respective commandline types (-a, -b, -g).
Return a list of appropriate Adapter classes.
"""
adapters = []
for specs, cmdline_type in (back, 'back'), (anywhere, 'anywhere'), (front, 'front'):
for spec in specs:
adapters.extend(self.parse(spec, cmdline_type))
return adapters
......@@ -13,7 +13,7 @@ from xopen import xopen
import dnaio
from .utils import Progress
from .modifiers import PairedModifier, Modifier
from .modifiers import PairedModifier
from .report import Statistics
from .filters import (Redirector, PairedRedirector, NoFilter, PairedNoFilter, InfoFileWriter,
RestFileWriter, WildcardFileWriter, TooShortReadFilter, TooLongReadFilter, NContentFilter,
......@@ -35,7 +35,9 @@ class OutputFiles:
The attributes are open file-like objects except when demultiplex is True. In that case,
untrimmed, untrimmed2, out and out2 are file names or templates
as required by the used demultiplexer ('{name}' etc.).
If interleaved is True, then out is written interleaved.
Files may also be None.
"""
# TODO interleaving for the other file pairs (too_short, too_long, untrimmed)?
......@@ -94,7 +96,7 @@ class Pipeline(ABC):
def __init__(self):
self._close_files = []
self._reader = None
self._filters = []
self._filters = None
self._modifiers = []
self._outfiles = None
self._demultiplexer = None
......@@ -107,11 +109,12 @@ class Pipeline(ABC):
self.discard_trimmed = False
self.discard_untrimmed = False
def set_input(self, infiles: InputFiles):
def connect_io(self, infiles: InputFiles, outfiles: OutputFiles):
self._reader = dnaio.open(infiles.file1, file2=infiles.file2,
interleaved=infiles.interleaved, mode='r')
self._set_output(outfiles)
def _open_writer(self, file, file2, force_fasta=None, **kwargs):
def _open_writer(self, file, file2=None, force_fasta=None, **kwargs):
# TODO backwards-incompatible change (?) would be to use outfiles.interleaved
# for all outputs
if force_fasta:
......@@ -119,27 +122,27 @@ class Pipeline(ABC):
return dnaio.open(file, file2=file2, mode='w', qualities=self.uses_qualities,
**kwargs)
def set_output(self, outfiles: OutputFiles):
def _set_output(self, outfiles: OutputFiles):
self._filters = []
self._outfiles = outfiles
filter_wrapper = self._filter_wrapper()
if outfiles.rest:
self._filters.append(RestFileWriter(outfiles.rest))
if outfiles.info:
self._filters.append(InfoFileWriter(outfiles.info))
if outfiles.wildcard:
self._filters.append(WildcardFileWriter(outfiles.wildcard))
for filter_class, outfile in (
(RestFileWriter, outfiles.rest),
(InfoFileWriter, outfiles.info),
(WildcardFileWriter, outfiles.wildcard),
):
if outfile:
self._filters.append(filter_wrapper(None, filter_class(outfile), None))
# minimum length and maximum length
for lengths, file1, file2, filter_class in (
(self._minimum_length, outfiles.too_short, outfiles.too_short2, TooShortReadFilter),
(self._maximum_length, outfiles.too_long, outfiles.too_long2, TooLongReadFilter)
):
writer = None
if lengths is not None:
if file1:
writer = self._open_writer(file1, file2)
if lengths is None:
continue
writer = self._open_writer(file1, file2) if file1 else None
f1 = filter_class(lengths[0]) if lengths[0] is not None else None
if len(lengths) == 2 and lengths[1] is not None:
f2 = filter_class(lengths[1])
......@@ -251,7 +254,8 @@ class SingleEndPipeline(Pipeline):
return Redirector
def _final_filter(self, outfiles):
writer = self._open_writer(outfiles.out, outfiles.out2, force_fasta=outfiles.force_fasta)
assert outfiles.out2 is None
writer = self._open_writer(outfiles.out, force_fasta=outfiles.force_fasta)
return NoFilter(writer)
def _create_demultiplexer(self, outfiles):
......@@ -283,13 +287,10 @@ class PairedEndPipeline(Pipeline):
paired = True
def __init__(self, pair_filter_mode):
"""
Setting modify_first_read_only to True enables "legacy mode"
"""
super().__init__()
self._pair_filter_mode = pair_filter_mode
self._reader = None
# Whether to gnore pair_filter mode for discard-untrimmed filter
# Whether to ignore pair_filter mode for discard-untrimmed filter
self.override_untrimmed_pair_filter = False
def add(self, modifier1, modifier2):
......@@ -396,6 +397,15 @@ def reader_process(file, file2, connections, queue, buffer_size, stdin_fd):
and finally sends "poison pills" (the value -1) to all connections.
"""
def send_to_worker(chunk_index, chunk1, chunk2=None):
worker_index = queue.get()
connection = connections[worker_index]
connection.send(chunk_index)
connection.send_bytes(chunk1)
if chunk2 is not None:
connection.send_bytes(chunk2)
if stdin_fd != -1:
sys.stdin.close()
sys.stdin = os.fdopen(stdin_fd)
......@@ -404,19 +414,10 @@ def reader_process(file, file2, connections, queue, buffer_size, stdin_fd):
if file2:
with xopen(file2, 'rb') as f2:
for chunk_index, (chunk1, chunk2) in enumerate(dnaio.read_paired_chunks(f, f2, buffer_size)):
# Determine the worker that should get this chunk
worker_index = queue.get()
pipe = connections[worker_index]
pipe.send(chunk_index)
pipe.send_bytes(chunk1)
pipe.send_bytes(chunk2)
send_to_worker(chunk_index, chunk1, chunk2)
else:
for chunk_index, chunk in enumerate(dnaio.read_chunks(f, buffer_size)):
# Determine the worker that should get this chunk
worker_index = queue.get()
pipe = connections[worker_index]
pipe.send(chunk_index)
pipe.send_bytes(chunk)
send_to_worker(chunk_index, chunk)
# Send poison pills to all workers
for _ in range(len(connections)):
......@@ -437,13 +438,12 @@ class WorkerProcess(Process):
To notify the reader process that it wants data, it puts its own identifier into the
need_work_queue before attempting to read data from the read_pipe.
"""
def __init__(self, id_, pipeline, input_path1, input_path2,
def __init__(self, id_, pipeline, two_input_files,
interleaved_input, orig_outfiles, read_pipe, write_pipe, need_work_queue):
super().__init__()
self._id = id_
self._pipeline = pipeline
self._input_path1 = input_path1
self._input_path2 = input_path2
self._two_input_files = two_input_files
self._interleaved_input = interleaved_input
self._orig_outfiles = orig_outfiles
self._read_pipe = read_pipe
......@@ -466,45 +466,13 @@ class WorkerProcess(Process):
logger.error('%s', tb_str)
raise e
# Setting the .buffer.name attributess below is necessary because
# file format detection uses the file name
data = self._read_pipe.recv_bytes()
input = io.BytesIO(data)
input.name = self._input_path1
if self._input_path2:
data = self._read_pipe.recv_bytes()
input2 = io.BytesIO(data)
input2.name = self._input_path2
else:
input2 = None
output = io.BytesIO()
output.name = self._orig_outfiles.out.name
if self._orig_outfiles.out2 is not None:
output2 = io.BytesIO()
output2.name = self._orig_outfiles.out2.name
else:
output2 = None
infiles = InputFiles(input, input2, interleaved=self._interleaved_input)
outfiles = OutputFiles(out=output, out2=output2, interleaved=self._orig_outfiles.interleaved, force_fasta=self._orig_outfiles.force_fasta)
self._pipeline.set_input(infiles)
self._pipeline.set_output(outfiles)
infiles = self._make_input_files()
outfiles = self._make_output_files()
self._pipeline.connect_io(infiles, outfiles)
(n, bp1, bp2) = self._pipeline.process_reads()
cur_stats = Statistics().collect(n, bp1, bp2, [], self._pipeline._filters)
stats += cur_stats
output.flush()
processed_chunk = output.getvalue()
self._write_pipe.send(chunk_index)
self._write_pipe.send(n) # no. of reads processed in this chunk
self._write_pipe.send_bytes(processed_chunk)
if self._orig_outfiles.out2 is not None:
output2.flush()
processed_chunk2 = output2.getvalue()
self._write_pipe.send_bytes(processed_chunk2)
self._send_outfiles(outfiles, chunk_index, n)
m = self._pipeline._modifiers
modifier_stats = Statistics().collect(0, 0, 0 if self._pipeline.paired else None, m, [])
......@@ -515,6 +483,47 @@ class WorkerProcess(Process):
self._write_pipe.send(-2)
self._write_pipe.send((e, traceback.format_exc()))
def _make_input_files(self):
data = self._read_pipe.recv_bytes()
input = io.BytesIO(data)
if self._two_input_files:
data = self._read_pipe.recv_bytes()
input2 = io.BytesIO(data)
else:
input2 = None
return InputFiles(input, input2, interleaved=self._interleaved_input)
def _make_output_files(self):
"""
Using self._orig_outfiles as a template, make a new OutputFiles instance
that has BytesIO instances for each non-None output file
"""
output_files = copy.copy(self._orig_outfiles)
# TODO info, rest, wildcard need to be StringIO()
for attr in (
"out", "out2", "untrimmed", "untrimmed2", "too_short", "too_short2", "too_long",
"too_long2", "info", "rest", "wildcard"
):
orig_outfile = getattr(self._orig_outfiles, attr)
if orig_outfile is not None:
output = io.BytesIO()
output.name = orig_outfile.name
setattr(output_files, attr, output)
return output_files
def _send_outfiles(self, outfiles, chunk_index, n_reads):
self._write_pipe.send(chunk_index)
self._write_pipe.send(n_reads)
for f in outfiles:
if f is None:
continue
f.flush()
processed_chunk = f.getvalue()
self._write_pipe.send_bytes(processed_chunk)
class OrderedChunkWriter:
"""
......@@ -527,10 +536,10 @@ class OrderedChunkWriter:
self._current_index = 0
self._outfile = outfile
def write(self, data, chunk_index):
def write(self, data, index):
"""
"""
self._chunks[chunk_index] = data
self._chunks[index] = data
while self._current_index in self._chunks:
self._outfile.write(self._chunks[self._current_index])
del self._chunks[self._current_index]
......@@ -561,7 +570,7 @@ class ParallelPipelineRunner(PipelineRunner):
"""
Run a Pipeline in parallel
- When set_input() is called, a reader process is spawned.
- When connect_io() is called, a reader process is spawned.
- When run() is called, as many worker processes as requested are spawned.
- In the main process, results are written to the output files in the correct
order, and statistics are aggregated.
......@@ -592,8 +601,7 @@ class ParallelPipelineRunner(PipelineRunner):
self._pipes = [] # the workers read from these
self._reader_process = None
self._outfiles = None
self._input_path1 = None
self._input_path2 = None
self._two_input_files = None
self._interleaved_input = None
self._n_workers = n_workers
self._need_work_queue = Queue()
......@@ -603,9 +611,8 @@ class ParallelPipelineRunner(PipelineRunner):
def _assign_input(self, file1, file2=None, interleaved=False):
if self._reader_process is not None:
raise RuntimeError('Do not call set_input more than once')
self._input_path1 = file1 if type(file1) is str else file1.name
self._input_path2 = file2 if type(file2) is str or file2 is None else file2.name
raise RuntimeError('Do not call connect_io more than once')
self._two_input_files = file2 is not None
self._interleaved_input = interleaved
connections = [Pipe(duplex=False) for _ in range(self._n_workers)]
self._pipes, connw = zip(*connections)
......@@ -627,12 +634,6 @@ class ParallelPipelineRunner(PipelineRunner):
and outfiles.rest is None
and outfiles.info is None
and outfiles.wildcard is None
and outfiles.too_short is None
and outfiles.too_short2 is None
and outfiles.too_long is None
and outfiles.too_long2 is None
and outfiles.untrimmed is None
and outfiles.untrimmed2 is None
and not outfiles.demultiplex
)
......@@ -649,7 +650,7 @@ class ParallelPipelineRunner(PipelineRunner):
connections.append(conn_r)
worker = WorkerProcess(
index, self._pipeline,
self._input_path1, self._input_path2,
self._two_input_files,
self._interleaved_input, self._outfiles,
self._pipes[index], conn_w, self._need_work_queue)
worker.daemon = True
......@@ -660,7 +661,7 @@ class ParallelPipelineRunner(PipelineRunner):
def run(self):
workers, connections = self._start_workers()
writers = []
for outfile in [self._outfiles.out, self._outfiles.out2]:
for outfile in self._outfiles:
if outfile is None:
continue
writers.append(OrderedChunkWriter(outfile))
......@@ -730,8 +731,7 @@ class SerialPipelineRunner(PipelineRunner):
def __init__(self, pipeline: Pipeline, infiles: InputFiles, outfiles: OutputFiles):
super().__init__(pipeline)
self._pipeline.set_input(infiles)
self._pipeline.set_output(outfiles)
self._pipeline.connect_io(infiles, outfiles)
def run(self):
(n, total1_bp, total2_bp) = self._pipeline.process_reads(progress=self._progress)
......
/* Generated by Cython 0.29.12 */
/* Generated by Cython 0.29.13 */
/* BEGIN: Cython Metadata
{
......@@ -19,8 +19,8 @@ END: Cython Metadata */
#elif PY_VERSION_HEX < 0x02060000 || (0x03000000 <= PY_VERSION_HEX && PY_VERSION_HEX < 0x03030000)
#error Cython requires Python 2.6+ or Python 3.3+.
#else
#define CYTHON_ABI "0_29_12"
#define CYTHON_HEX_VERSION 0x001D0CF0
#define CYTHON_ABI "0_29_13"
#define CYTHON_HEX_VERSION 0x001D0DF0
#define CYTHON_FUTURE_DIVISION 1
#include <stddef.h>
#ifndef offsetof
......
import re
import sys
import time
import resource
import multiprocessing
......@@ -27,6 +28,12 @@ def available_cpu_count():
return multiprocessing.cpu_count()
def raise_open_files_limit(n):
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
soft += n
resource.setrlimit(resource.RLIMIT_NOFILE, (soft, hard))
class Progress:
"""
Write progress
......