Skip to content
Commits on Source (5)
......@@ -2,6 +2,15 @@
Changes
=======
v2.8 (2020-01-13)
-----------------
* :issue:`220`: With option ``--revcomp``, Cutadapt now searches both the read
and its reverse complement for adapters. The version that matches best is
kept. This can be used to “normalize” strandedness.
* :issue:`430`: ``--action=lowercase`` now works with linked adapters
* :issue:`431`: Info files can now be written even for linked adapters.
v2.7 (2019-11-22)
-----------------
......
python-cutadapt (2.8-1) unstable; urgency=medium
* Team upload.
* New upstream version
* Standards-Version: 4.5.0
* Set upstream metadata fields: Bug-Database, Bug-Submit, Repository,
Repository-Browse.
-- Steffen Moeller <moeller@debian.org> Fri, 24 Jan 2020 12:50:31 +0100
python-cutadapt (2.7-2) unstable; urgency=medium
* Team upload.
......
......@@ -17,7 +17,7 @@ Build-Depends: debhelper-compat (= 12),
python3-xopen (>= 0.5.0),
python3-dnaio,
cython3
Standards-Version: 4.4.1
Standards-Version: 4.5.0
Vcs-Browser: https://salsa.debian.org/med-team/python-cutadapt
Vcs-Git: https://salsa.debian.org/med-team/python-cutadapt.git
Homepage: https://pypi.python.org/pypi/cutadapt
......
......@@ -21,3 +21,7 @@ Registry:
Entry: SCR_011841
- Name: conda:bioconda
Entry: cutadapt
Bug-Database: https://github.com/marcelm/cutadapt/issues
Bug-Submit: https://github.com/marcelm/cutadapt/issues/new
Repository: https://github.com/marcelm/cutadapt.git
Repository-Browse: https://github.com/marcelm/cutadapt
......@@ -225,8 +225,9 @@ Adapter sequences :ref:`may also contain any IUPAC wildcard
character <wildcards>` (such as ``N``).
In addition, it is possible to :ref:`remove a fixed number of
bases <cut-bases>` from the beginning or end of each read, and to :ref:`remove
low-quality bases (quality trimming) <quality-trimming>` from the 3' and 5' ends.
bases <cut-bases>` from the beginning or end of each read, to :ref:`remove
low-quality bases (quality trimming) <quality-trimming>` from the 3' and 5' ends,
and to :ref:`search for adapters also in the reverse-complemented reads <reverse-complement>`.
Overview of adapter types
......@@ -800,6 +801,43 @@ at both ends, use ``-g "ADAPTER;anywhere"``.
:ref:`increase the minimum overlap length <random-matches>`.
.. _reverse-complement:
Searching reverse complements
-----------------------------
.. note::
Option ``--revcomp`` is added on a tentative basis. Its behaviour
By default, Cutadapt expects adapters to be given in the same orientation (5' to 3') as the reads.
To change this, use option ``--revcomp`` or its abbreviation ``--rc``. If given, Cutadapt searches
both the read and its reverse complement for adapters. If the reverse complemented read yields
a better match, then that version of the read is kept. That is, the output file will contain the
reverse-complemented sequence. This can be used to “normalize” read orientation/strandedness.
To determine which version of the read yields the better match, the full adapter search (possibly
multiple rounds if ``--times`` is used) is done independently on both versions, and the version that
results in the higher number of matching nucleotides is considered to be the better one.
The name of a reverse-complemented read is changed by adding a space and ``rc`` to it. (Please
file an issue if you would like this to be configurable.)
The report will show the number of reads that were reverse-complemented, like this::
Total reads processed: 60
Reads with adapters: 50 (83.3%)
Reverse-complemented: 20 (33.3%)
Here, 20 reverse-complemented reads contain an adapter and 50 - 20 = 30 reads that did not need to
be reverse-complemented contain an adapter.
Option ``--revcomp`` is currently available only for single-end data.
.. versionadded:: 2.8
Specifying adapter sequences
============================
......@@ -1850,7 +1888,10 @@ starts with something like this::
Sequence: 'ACGTACGTACGTTAGCTAGC'; Length: 20; Trimmed: 2402 times.
The meaning of this should be obvious.
The meaning of this should be obvious. If option ``--revcomp`` was used,
this line will additionally contain something like ``Reverse-complemented:
984 times``. This describes how many times of the 2402 total times the
adapter was found on the reverse complement of the read.
The next piece of information is this::
......@@ -1934,13 +1975,20 @@ Format of the info file
-----------------------
When the ``--info-file`` command-line parameter is given, detailed
information about the found adapters is written to the given file. The
output is a tab-separated text file. Each line corresponds to one read
of the input file (unless `--times` is used, see below). A row is written
for *all* reads, even those that are discarded from the final output
FASTA/FASTQ due to filtering options (such as ``--minimum-length``).
information about where adapters were found in each read are written
to the given file. It is a tab-separated text file that contains at
least one row per input read. Normally, there is exactly one row per
input read, but in the following cases, multiple rows may be output:
The fields in each row are:
* The option ``--times`` is in use.
* A linked adapter is used.
A row is written for *all* input reads, even those that are discarded
from the final FASTA/FASTQ output due to filtering options
(such as ``--minimum-length``). Which fields are output in each row
depends on whether an adapter match was found in the read or not.
The fields in a row that describes a match are:
1. Read name
2. Number of errors
......@@ -1963,12 +2011,12 @@ Concatenating them yields the full sequence of quality values.
If no adapter was found, the format is as follows:
1. Read name
2. The value -1
2. The value -1 (use this to distinguish between match and non-match)
3. The read sequence
4. Quality values
When parsing the file, be aware that additional columns may be added in
the future. Note also that some fields can be empty, resulting in
the future. Also, some fields can be empty, resulting in
consecutive tabs within a line.
If the ``--times`` option is used and greater than 1, each read can appear
......@@ -1979,5 +2027,16 @@ accordingly for columns 9-11). For subsequent lines, the shown sequence are the
ones that were used in subsequent rounds of adapter trimming, that is, they get
successively shorter.
Linked adapters appear with up to two rows for each read, one for each constituent
adapter for which a match has been found. To be able to see which of the two
adapters a row describes, the adapter name in column 8 is modified: If the row
describes a match of the 5' adapter, the string ``;1`` is added. If it describes
a match of the 3' adapter, the string ``;2`` is added. If there are two rows, the
5' match always comes first.
.. versionadded:: 1.9
Columns 9-11 were added.
.. versionadded:: 2.8
Linked adapters in info files work.
[mypy]
warn_unused_configs = True
[mypy-xopen]
ignore_missing_imports = True
[mypy-dnaio.*]
ignore_missing_imports = True
[mypy-cutadapt._align.*]
ignore_missing_imports = True
[mypy-cutadapt.qualtrim.*]
ignore_missing_imports = True
This diff is collapsed.
......@@ -7,6 +7,8 @@ The ...Match classes trim the reads.
import logging
from enum import Enum
from collections import defaultdict
from typing import Optional, Tuple, Sequence, Dict, Any, List
from abc import ABC, abstractmethod
from . import align
......@@ -28,18 +30,24 @@ class Where(Enum):
LINKED = 'linked'
# TODO put this in some kind of "list of pre-defined adapter types" along with the info above
class WhereToRemove(Enum):
PREFIX = 1
SUFFIX = 2
AUTO = 3
WHERE_TO_REMOVE_MAP = {
Where.PREFIX: 'prefix',
Where.FRONT_NOT_INTERNAL: 'prefix',
Where.FRONT: 'prefix',
Where.BACK: 'suffix',
Where.SUFFIX: 'suffix',
Where.BACK_NOT_INTERNAL: 'suffix',
Where.ANYWHERE: 'auto',
Where.PREFIX: WhereToRemove.PREFIX,
Where.FRONT_NOT_INTERNAL: WhereToRemove.PREFIX,
Where.FRONT: WhereToRemove.PREFIX,
Where.BACK: WhereToRemove.SUFFIX,
Where.SUFFIX: WhereToRemove.SUFFIX,
Where.BACK_NOT_INTERNAL: WhereToRemove.SUFFIX,
Where.ANYWHERE: WhereToRemove.AUTO,
}
# TODO could become a property/attribute of the Adapter classes
ADAPTER_TYPE_NAMES = {
Where.BACK: "regular 3'",
Where.BACK_NOT_INTERNAL: "non-internal 3'",
......@@ -62,15 +70,15 @@ def returns_defaultdict_int():
class EndStatistics:
"""Statistics about the 5' or 3' end"""
def __init__(self, adapter):
self.where = adapter.where
self.max_error_rate = adapter.max_error_rate
self.sequence = adapter.sequence
self.effective_length = adapter.effective_length
self.has_wildcards = adapter.adapter_wildcards
def __init__(self, adapter: "SingleAdapter"):
self.where = adapter.where # type: Where
self.max_error_rate = adapter.max_error_rate # type: float
self.sequence = adapter.sequence # type: str
self.effective_length = adapter.effective_length # type: int
self.has_wildcards = adapter.adapter_wildcards # type: bool
# self.errors[l][e] == n iff n times a sequence of length l matching at e errors was removed
self.errors = defaultdict(returns_defaultdict_int)
self._remove = adapter.remove
self.errors = defaultdict(returns_defaultdict_int) # type: Dict[int, Dict[int, int]]
self._remove = adapter.remove # type: Optional[WhereToRemove]
self.adjacent_bases = {'A': 0, 'C': 0, 'G': 0, 'T': 0, '': 0}
def __repr__(self):
......@@ -82,7 +90,7 @@ class EndStatistics:
self.adjacent_bases,
)
def __iadd__(self, other):
def __iadd__(self, other: Any):
if not isinstance(other, self.__class__):
raise ValueError("Cannot compare")
if (
......@@ -119,16 +127,16 @@ class EndStatistics:
"""
seq = self.sequence
# FIXME this is broken for self._remove == 'auto'
if self._remove == 'prefix':
if self._remove == WhereToRemove.PREFIX:
seq = seq[::-1]
allowed_bases = 'CGRYSKMBDHVN' if self.has_wildcards else 'GC'
p = 1
p = 1.
probabilities = [p]
for i, c in enumerate(seq):
if c in allowed_bases:
p *= gc_content / 2.
else:
p *= (1 - gc_content) / 2
p *= (1. - gc_content) / 2.
probabilities.append(p)
return probabilities
......@@ -137,13 +145,19 @@ class AdapterStatistics:
"""
Statistics about an adapter. An adapter can work on the 5' end (front)
or 3' end (back) of a read, and statistics for that are captured
separately.
separately in EndStatistics objects.
"""
def __init__(self, adapter, adapter2=None, where=None):
def __init__(
self,
adapter: "SingleAdapter",
adapter2: Optional["SingleAdapter"] = None,
where: Optional[Where] = None,
):
self.name = adapter.name
self.where = where if where is not None else adapter.where
self.front = EndStatistics(adapter)
self.reverse_complemented = 0
if adapter2 is None:
self.back = EndStatistics(adapter)
else:
......@@ -157,15 +171,26 @@ class AdapterStatistics:
self.back,
)
def __iadd__(self, other):
def __iadd__(self, other: "AdapterStatistics"):
if self.where != other.where: # TODO self.name != other.name or
raise ValueError('incompatible objects')
self.front += other.front
self.back += other.back
self.reverse_complemented += other.reverse_complemented
return self
class Match:
class Match(ABC):
@abstractmethod
def remainder_interval(self) -> Tuple[int, int]:
pass
@abstractmethod
def get_info_records(self) -> List[List]:
pass
class SingleMatch(Match):
"""
Representation of a single adapter matched to a single read.
"""
......@@ -173,37 +198,48 @@ class Match:
'adapter', 'read', 'length', '_trimmed_read', 'adjacent_base']
# TODO Can remove_before be removed from the constructor parameters?
def __init__(self, astart, astop, rstart, rstop, matches, errors, remove_before, adapter, read):
def __init__(
self,
astart: int,
astop: int,
rstart: int,
rstop: int,
matches: int,
errors: int,
remove_before: bool,
adapter: "SingleAdapter",
read,
):
"""
remove_before -- True: remove bases before adapter. False: remove after
"""
self.astart = astart
self.astop = astop
self.rstart = rstart
self.rstop = rstop
self.matches = matches
self.errors = errors
self.adapter = adapter
self.astart = astart # type: int
self.astop = astop # type: int
self.rstart = rstart # type: int
self.rstop = rstop # type: int
self.matches = matches # type: int
self.errors = errors # type: int
self.adapter = adapter # type: SingleAdapter
self.read = read
if remove_before:
# Compute the trimmed read, assuming it’s a 'front' adapter
self._trimmed_read = read[rstop:]
self.adjacent_base = ''
self.adjacent_base = "" # type: str
else:
# Compute the trimmed read, assuming it’s a 'back' adapter
self.adjacent_base = read.sequence[rstart - 1:rstart]
self._trimmed_read = read[:rstart]
self.remove_before = remove_before
self.remove_before = remove_before # type: bool
# Number of aligned characters in the adapter. If there are
# indels, this may be different from the number of characters
# in the read.
self.length = astop - astart
self.length = astop - astart # type: int
def __repr__(self):
return 'Match(astart={}, astop={}, rstart={}, rstop={}, matches={}, errors={})'.format(
return 'SingleMatch(astart={}, astop={}, rstart={}, rstop={}, matches={}, errors={})'.format(
self.astart, self.astop, self.rstart, self.rstop, self.matches, self.errors)
def wildcards(self, wildcard_char='N'):
def wildcards(self, wildcard_char: str = "N") -> str:
"""
Return a string that contains, for each wildcard character,
the character that it matches. For example, if the adapter
......@@ -217,7 +253,7 @@ class Match:
self.rstart + i < len(self.read.sequence)]
return ''.join(wildcards)
def rest(self):
def rest(self) -> str:
"""
Return the part of the read before this match if this is a
'front' (5') adapter,
......@@ -229,10 +265,20 @@ class Match:
else:
return self.read.sequence[self.rstop:]
def get_info_record(self):
def remainder_interval(self) -> Tuple[int, int]:
"""
Return an interval (start, stop) that describes the part of the read that would
remain after trimming
"""
if self.remove_before:
return self.rstop, len(self.read.sequence)
else:
return 0, self.rstart
def get_info_records(self) -> List[List]:
seq = self.read.sequence
qualities = self.read.qualities
info = (
info = [
self.read.name,
self.errors,
self.rstart,
......@@ -240,23 +286,23 @@ class Match:
seq[0:self.rstart],
seq[self.rstart:self.rstop],
seq[self.rstop:],
self.adapter.name
)
self.adapter.name,
]
if qualities:
info += (
info += [
qualities[0:self.rstart],
qualities[self.rstart:self.rstop],
qualities[self.rstop:]
)
]
else:
info += ('', '', '')
info += ["", "", ""]
return info
return [info]
def trimmed(self):
return self._trimmed_read
def update_statistics(self, statistics):
def update_statistics(self, statistics: AdapterStatistics):
"""Update AdapterStatistics in place"""
if self.remove_before:
statistics.front.errors[self.rstop][self.errors] += 1
......@@ -268,13 +314,26 @@ class Match:
statistics.back.adjacent_bases[''] = 1
def _generate_adapter_name(_start=[1]):
def _generate_adapter_name(_start=[1]) -> str:
name = str(_start[0])
_start[0] += 1
return name
class Adapter:
class Adapter(ABC):
def __init__(self, *args, **kwargs):
pass
@abstractmethod
def enable_debug(self):
pass
@abstractmethod
def match_to(self, read):
pass
class SingleAdapter(Adapter):
"""
This class can find a single adapter characterized by sequence, error rate,
type etc. within reads.
......@@ -282,11 +341,12 @@ class Adapter:
where -- A Where enum value. This influences where the adapter is allowed to appear within the
read.
remove -- describes which part of the read to remove if the adapter was found:
* "prefix" (for a 3' adapter)
* "suffix" (for a 5' adapter)
* "auto" for a 5'/3' mixed adapter (if the match involves the first base of the read, it
is assumed to be a 5' adapter and a 3' otherwise)
remove -- a WhereToRemove enum value. This describes which part of the read to remove if the
adapter was found:
* WhereToRemove.PREFIX (for a 3' adapter)
* WhereToRemove.SUFFIX (for a 5' adapter)
* WhereToRemove.AUTO for a 5'/3' mixed adapter (if the match involves the first base of
the read, it is assumed to be a 5' adapter and a 3' otherwise)
* None: One of the above is chosen depending on the 'where' parameter
sequence -- The adapter sequence as string. Will be converted to uppercase.
......@@ -308,19 +368,28 @@ class Adapter:
unique number.
"""
def __init__(self, sequence, where, remove=None, max_error_rate=0.1, min_overlap=3,
read_wildcards=False, adapter_wildcards=True, name=None, indels=True):
self._debug = False
self.name = _generate_adapter_name() if name is None else name
self.sequence = sequence.upper().replace('U', 'T')
def __init__(
self,
sequence: str,
where: Where,
remove: Optional[WhereToRemove] = None,
max_error_rate: float = 0.1,
min_overlap: int = 3,
read_wildcards: bool = False,
adapter_wildcards: bool = True,
name: Optional[str] = None,
indels: bool = True,
):
super().__init__()
self._debug = False # type: bool
self.name = _generate_adapter_name() if name is None else name # type: str
self.sequence = sequence.upper().replace("U", "T") # type: str
if not self.sequence:
raise ValueError("Adapter sequence is empty")
self.where = where
if remove not in (None, 'prefix', 'suffix', 'auto'):
raise ValueError('remove parameter must be "prefix", "suffix", "auto" or None')
self.remove = WHERE_TO_REMOVE_MAP[where] if remove is None else remove
self.max_error_rate = max_error_rate
self.min_overlap = min(min_overlap, len(self.sequence))
self.where = where # type: Where
self.remove = WHERE_TO_REMOVE_MAP[where] if remove is None else remove # type: WhereToRemove
self.max_error_rate = max_error_rate # type: float
self.min_overlap = min(min_overlap, len(self.sequence)) # type: int
iupac = frozenset('XACGTURYSWKMBDHVN')
if adapter_wildcards and not set(self.sequence) <= iupac:
for c in self.sequence:
......@@ -329,9 +398,9 @@ class Adapter:
'not a valid IUPAC code. Use only characters '
'XACGTURYSWKMBDHVN.'.format(c, self.sequence))
# Optimization: Use non-wildcard matching if only ACGT is used
self.adapter_wildcards = adapter_wildcards and not set(self.sequence) <= set('ACGT')
self.read_wildcards = read_wildcards
self.indels = indels
self.adapter_wildcards = adapter_wildcards and not set(self.sequence) <= set("ACGT") # type: bool
self.read_wildcards = read_wildcards # type: bool
self.indels = indels # type: bool
if self.is_anchored and not self.indels:
aligner_class = align.PrefixComparer if self.where is Where.PREFIX else align.SuffixComparer
self.aligner = aligner_class(
......@@ -357,7 +426,7 @@ class Adapter:
)
def __repr__(self):
return '<Adapter(name={name!r}, sequence={sequence!r}, where={where}, '\
return '<SingleAdapter(name={name!r}, sequence={sequence!r}, where={where}, '\
'remove={remove}, max_error_rate={max_error_rate}, min_overlap={min_overlap}, '\
'read_wildcards={read_wildcards}, '\
'adapter_wildcards={adapter_wildcards}, '\
......@@ -420,12 +489,12 @@ class Adapter:
if match_args is None:
return None
if self.remove == 'auto':
if self.remove == WhereToRemove.AUTO:
# guess: if alignment starts at pos 0, it’s a 5' adapter
remove_before = match_args[2] == 0 # index 2 is rstart
else:
remove_before = self.remove == 'prefix'
match = Match(*match_args, remove_before=remove_before, adapter=self, read=read)
remove_before = self.remove == WhereToRemove.PREFIX
match = SingleMatch(*match_args, remove_before=remove_before, adapter=self, read=read)
assert match.length >= self.min_overlap
return match
......@@ -437,7 +506,7 @@ class Adapter:
return AdapterStatistics(self)
class BackOrFrontAdapter(Adapter):
class BackOrFrontAdapter(SingleAdapter):
"""A 5' or 3' adapter.
This is separate from the Adapter class so that a specialized match_to
......@@ -450,7 +519,7 @@ class BackOrFrontAdapter(Adapter):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
assert self.where is Where.BACK or self.where is Where.FRONT
self._remove_before = self.remove == 'prefix'
self._remove_before = self.remove is WhereToRemove.PREFIX
def match_to(self, read):
"""
......@@ -477,21 +546,19 @@ class BackOrFrontAdapter(Adapter):
if alignment is None:
return None
match = Match(*alignment, remove_before=self._remove_before, adapter=self, read=read)
match = SingleMatch(*alignment, remove_before=self._remove_before, adapter=self, read=read)
return match
class LinkedMatch:
class LinkedMatch(Match):
"""
Represent a match of a LinkedAdapter
"""
def __init__(self, front_match, back_match, adapter):
"""
One of front_match and back_match must be not None!
"""
self.front_match = front_match
self.back_match = back_match
self.adapter = adapter
def __init__(self, front_match: SingleMatch, back_match: SingleMatch, adapter: "LinkedAdapter"):
assert front_match is not None or back_match is not None
self.front_match = front_match # type: SingleMatch
self.back_match = back_match # type: SingleMatch
self.adapter = adapter # type: LinkedAdapter
def __repr__(self):
return '<LinkedMatch(front_match={!r}, back_match={}, adapter={})>'.format(
......@@ -536,8 +603,25 @@ class LinkedMatch:
if self.back_match:
statistics.back.errors[len(self.back_match.read) - self.back_match.rstart][self.back_match.errors] += 1
def remainder_interval(self) -> Tuple[int, int]:
matches = [match for match in [self.front_match, self.back_match] if match is not None]
return remainder(matches)
def get_info_records(self) -> List[List]:
records = []
for match, namesuffix in [
(self.front_match, ";1"),
(self.back_match, ";2"),
]:
if match is None:
continue
record = match.get_info_records()[0]
record[7] = self.adapter.name + namesuffix
records.append(record)
return records
class LinkedAdapter:
class LinkedAdapter(Adapter):
"""
"""
def __init__(
......@@ -548,6 +632,7 @@ class LinkedAdapter:
back_required,
name,
):
super().__init__()
self.front_required = front_required
self.back_required = back_required
......@@ -592,7 +677,7 @@ class LinkedAdapter:
return None
class MultiAdapter:
class MultiAdapter(Adapter):
"""
Represent multiple adapters of the same type at once and use an index data structure
to speed up matching. This acts like a "normal" Adapter as it provides a match_to
......@@ -610,6 +695,7 @@ class MultiAdapter:
def __init__(self, adapters):
"""All given adapters must be of the same type, either Where.PREFIX or Where.SUFFIX"""
super().__init__()
if not adapters:
raise ValueError("Adapter list is empty")
self._where = adapters[0].where
......@@ -723,18 +809,21 @@ class MultiAdapter:
else:
assert self._where is Where.SUFFIX
rstart, rstop = len(read) - best_length, len(read)
return Match(
return SingleMatch(
astart=0,
astop=len(best_adapter.sequence),
rstart=rstart,
rstop=rstop,
matches=best_m,
errors=best_e,
remove_before=best_adapter.remove == 'prefix',
remove_before=best_adapter.remove is WhereToRemove.PREFIX,
adapter=best_adapter,
read=read
)
def enable_debug(self):
pass
def warn_duplicate_adapters(adapters):
d = dict()
......@@ -745,3 +834,20 @@ def warn_duplicate_adapters(adapters):
"Please make sure that this is what you want.",
adapter.sequence, ADAPTER_TYPE_NAMES[adapter.where])
d[key] = adapter.name
def remainder(matches: Sequence[Match]) -> Tuple[int, int]:
"""
Determine which section of the read would not be trimmed. Return a tuple (start, stop)
that gives the interval of the untrimmed part relative to the original read.
matches must be non-empty
"""
if not matches:
raise ValueError("matches must not be empty")
start = 0
for match in matches:
match_start, match_stop = match.remainder_interval()
start += match_start
length = match_stop - match_start
return (start, start + length)
......@@ -14,6 +14,8 @@ The read is then assumed to have been "consumed", that is, either written
somewhere or filtered (should be discarded).
"""
from abc import ABC, abstractmethod
from typing import List
from .adapters import Match
# Constants used when returning from a Filter’s __call__ method to improve
......@@ -403,10 +405,10 @@ class InfoFileWriter(SingleEndFilter):
def __init__(self, file):
self.file = file
def __call__(self, read, matches):
def __call__(self, read, matches: List[Match]):
if matches:
for match in matches:
info_record = match.get_info_record()
for info_record in match.get_info_records():
print(*info_record, sep='\t', file=self.file)
else:
seq = read.sequence
......
......@@ -4,19 +4,34 @@ A modifier must be callable and typically implemented as a class with a
__call__ method.
"""
import re
from typing import Sequence, List, Optional
from abc import ABC, abstractmethod
from collections import OrderedDict
from .qualtrim import quality_trim_index, nextseq_trim_index
from .adapters import Where, MultiAdapter
from .adapters import Where, MultiAdapter, Match, remainder
from .utils import reverse_complemented_sequence
class Modifier(ABC):
@abstractmethod
def __call__(self, read, matches: List[Match]):
pass
class PairedModifier:
class PairedModifier(ABC):
@abstractmethod
def __call__(self, read1, read2, matches1, matches2):
pass
class PairedModifierWrapper(PairedModifier):
"""
Wrapper for modifiers that work on both reads in a paired-end read
"""
paired = True
def __init__(self, modifier1, modifier2):
def __init__(self, modifier1: Optional[Modifier], modifier2: Optional[Modifier]):
"""Set one of the modifiers to None to work on R1 or R2 only"""
self._modifier1 = modifier1
self._modifier2 = modifier2
......@@ -33,12 +48,6 @@ class PairedModifier:
return self._modifier1(read1, matches1), self._modifier2(read2, matches2)
class Modifier(ABC):
@abstractmethod
def __call__(self, read, matches):
pass
class AdapterCutter(Modifier):
"""
Repeatedly find one of multiple adapters in reads.
......@@ -116,30 +125,8 @@ class AdapterCutter(Modifier):
return best_match
@staticmethod
def remainder(matches):
"""
Determine which part of the read was not trimmed. Return a tuple (start, stop)
that gives the interval of the untrimmed part relative to the original read.
matches is a list of Match objects. The original read is assumed to be
matches[0].read
"""
# Start with the full read
read = matches[0].read
start, stop = 0, len(read)
for match in matches:
if match.remove_before:
# Length of the prefix that was removed
start += match.rstop
else:
# Length of the suffix that was removed
stop -= len(match.read) - match.rstart
return (start, stop)
@staticmethod
def masked_read(trimmed_read, matches):
start, stop = AdapterCutter.remainder(matches)
read = matches[0].read
def masked_read(read, trimmed_read, matches: Sequence[Match]):
start, stop = remainder(matches)
# TODO modification in place
trimmed_read.sequence = (
'N' * start
......@@ -149,19 +136,19 @@ class AdapterCutter(Modifier):
return trimmed_read
@staticmethod
def lowercased_read(trimmed_read, matches):
start, stop = AdapterCutter.remainder(matches)
read_sequence = matches[0].read.sequence
def lowercased_read(read, trimmed_read, matches: Sequence[Match]):
start, stop = remainder(matches)
read_sequence = read.sequence
# TODO modification in place
trimmed_read.sequence = (
read_sequence[:start].lower()
+ read_sequence[start:stop].upper()
+ read_sequence[stop:].lower()
)
trimmed_read.qualities = matches[0].read.qualities
trimmed_read.qualities = read.qualities
return trimmed_read
def __call__(self, read, inmatches):
def __call__(self, read, inmatches: List[Match]):
trimmed_read, matches = self.match_and_trim(read)
if matches:
self.with_adapters += 1
......@@ -183,9 +170,9 @@ class AdapterCutter(Modifier):
Return a pair (trimmed_read, matches), where matches is a list of Match instances.
"""
matches = []
trimmed_read = read
if self.action == 'lowercase':
trimmed_read.sequence = trimmed_read.sequence.upper()
read.sequence = read.sequence.upper()
trimmed_read = read
for _ in range(self.times):
match = AdapterCutter.best_match(self.adapters, trimmed_read)
if match is None:
......@@ -201,9 +188,9 @@ class AdapterCutter(Modifier):
# read is already trimmed, nothing to do
pass
elif self.action == 'mask':
trimmed_read = self.masked_read(trimmed_read, matches)
trimmed_read = self.masked_read(read, trimmed_read, matches)
elif self.action == 'lowercase':
trimmed_read = self.lowercased_read(trimmed_read, matches)
trimmed_read = self.lowercased_read(read, trimmed_read, matches)
assert len(trimmed_read.sequence) == len(read)
elif self.action is None: # --no-trim
trimmed_read = read[:]
......@@ -211,11 +198,51 @@ class AdapterCutter(Modifier):
return trimmed_read, matches
class ReverseComplementer(Modifier):
"""Trim adapters from a read and its reverse complement"""
def __init__(self, adapter_cutter: AdapterCutter, rc_suffix: Optional[str] = " rc"):
"""
rc_suffix -- suffix to add to the read name if sequence was reverse-complemented
"""
self.adapter_cutter = adapter_cutter
self.reverse_complemented = 0
self._suffix = rc_suffix
def __call__(self, read, inmatches: List[Match]):
reverse_read = reverse_complemented_sequence(read)
forward_trimmed_read, forward_matches = self.adapter_cutter.match_and_trim(read)
reverse_trimmed_read, reverse_matches = self.adapter_cutter.match_and_trim(reverse_read)
forward_match_count = sum(m.matches for m in forward_matches)
reverse_match_count = sum(m.matches for m in reverse_matches)
use_reverse_complement = reverse_match_count > forward_match_count
if use_reverse_complement:
self.reverse_complemented += 1
assert reverse_matches
trimmed_read, matches = reverse_trimmed_read, reverse_matches
if self._suffix:
trimmed_read.name += self._suffix
else:
trimmed_read, matches = forward_trimmed_read, forward_matches
if matches:
self.adapter_cutter.with_adapters += 1
for match in matches:
stats = self.adapter_cutter.adapter_statistics[match.adapter]
match.update_statistics(stats)
stats.reverse_complemented += bool(use_reverse_complement)
inmatches.extend(matches)
return trimmed_read
class PairedAdapterCutterError(Exception):
pass
class PairedAdapterCutter:
class PairedAdapterCutter(PairedModifier):
"""
A Modifier that trims adapter pairs from R1 and R2.
"""
......@@ -230,6 +257,7 @@ class PairedAdapterCutter:
action -- What to do with a found adapter: None, 'trim', or 'mask'
"""
super().__init__()
if len(adapters1) != len(adapters2):
raise PairedAdapterCutterError(
"The number of reads to trim from R1 and R2 must be the same. "
......@@ -294,10 +322,10 @@ class UnconditionalCutter(Modifier):
If the length is positive, the bases are removed from the beginning of the read.
If the length is negative, the bases are removed from the end of the read.
"""
def __init__(self, length):
def __init__(self, length: int):
self.length = length
def __call__(self, read, matches):
def __call__(self, read, matches: List[Match]):
if self.length > 0:
return read[self.length:]
elif self.length < 0:
......
......@@ -3,9 +3,10 @@ Parse adapter specifications
"""
import re
import logging
from typing import Type, Optional, List, Tuple, Iterator, Any, Dict
from xopen import xopen
from dnaio.readers import FastaReader
from .adapters import Where, WHERE_TO_REMOVE_MAP, Adapter, BackOrFrontAdapter, LinkedAdapter
from .adapters import Where, WHERE_TO_REMOVE_MAP, Adapter, SingleAdapter, BackOrFrontAdapter, LinkedAdapter
logger = logging.getLogger(__name__)
......@@ -26,7 +27,14 @@ class AdapterSpecification:
AdapterSpecification(name='a_name', restriction=None, sequence='ACGT', parameters={'anywhere': True}, cmdline_type='back')
"""
def __init__(self, name, restriction, sequence, parameters, cmdline_type):
def __init__(
self,
name: str,
restriction: Optional[str],
sequence: str,
parameters,
cmdline_type: str,
):
assert restriction in (None, "anchored", "noninternal")
assert cmdline_type in ("front", "back", "anywhere")
self.name = name
......@@ -55,7 +63,7 @@ class AdapterSpecification:
)
@staticmethod
def expand_braces(sequence):
def expand_braces(sequence: str) -> str:
"""
Replace all occurrences of ``x{n}`` (where x is any character) with n
occurrences of x. Raise ValueError if the expression cannot be parsed.
......@@ -95,16 +103,15 @@ class AdapterSpecification:
return result
@staticmethod
def _extract_name(spec):
def _extract_name(spec: str) -> Tuple[Optional[str], str]:
"""
Parse an adapter specification given as 'name=adapt' into 'name' and 'adapt'.
"""
fields = spec.split('=', 1)
name = None # type: Optional[str]
if len(fields) > 1:
name, spec = fields
name = name.strip()
else:
name = None
spec = spec.strip()
return name, spec
......@@ -123,16 +130,16 @@ class AdapterSpecification:
}
@classmethod
def _parse_parameters(cls, spec):
def _parse_parameters(cls, spec: str):
"""Parse key=value;key=value;key=value into a dict"""
fields = spec.split(';')
result = dict()
result = dict() # type: Dict[str,Any]
for field in fields:
field = field.strip()
if not field:
continue
key, equals, value = field.partition('=')
key, equals, value = field.partition('=') # type: (str, str, Any)
if equals == '=' and value == '':
raise ValueError('No value given')
key = key.strip()
......@@ -140,7 +147,7 @@ class AdapterSpecification:
raise KeyError('Unknown parameter {}'.format(key))
# unabbreviate
while cls.allowed_parameters[key] is not None:
key = cls.allowed_parameters[key]
key = cls.allowed_parameters[key] # type: ignore
value = value.strip()
if value == '':
value = True
......@@ -274,7 +281,7 @@ class AdapterParser:
# kwargs: max_error_rate, min_overlap, read_wildcards, adapter_wildcards, indels
self.default_parameters = kwargs
def _parse(self, spec: str, cmdline_type='back', name=None):
def _parse(self, spec: str, cmdline_type: str = "back", name: Optional[str] = None) -> Adapter:
"""
Parse an adapter specification not using ``file:`` notation and return
an object of an appropriate Adapter class.
......@@ -298,7 +305,7 @@ class AdapterParser:
return self._parse_not_linked(spec, name, cmdline_type)
@staticmethod
def _normalize_ellipsis(spec1: str, spec2: str, cmdline_type):
def _normalize_ellipsis(spec1: str, spec2: str, cmdline_type) -> Tuple[str, str]:
if not spec1:
if cmdline_type == 'back':
# -a ...ADAPTER
......@@ -318,23 +325,23 @@ class AdapterParser:
raise ValueError("Expected either spec1 or spec2")
return spec, cmdline_type
def _parse_not_linked(self, spec: str, name, cmdline_type):
spec = AdapterSpecification.parse(spec, cmdline_type)
where = spec.where()
def _parse_not_linked(self, spec: str, name: Optional[str], cmdline_type: str) -> Adapter:
aspec = AdapterSpecification.parse(spec, cmdline_type)
where = aspec.where()
if not name:
name = spec.name
if spec.parameters.pop('anywhere', False):
spec.parameters['remove'] = WHERE_TO_REMOVE_MAP[where]
name = aspec.name
if aspec.parameters.pop('anywhere', False):
aspec.parameters['remove'] = WHERE_TO_REMOVE_MAP[where]
where = Where.ANYWHERE
parameters = self.default_parameters.copy()
parameters.update(spec.parameters)
parameters.update(aspec.parameters)
if where in (Where.FRONT, Where.BACK):
adapter_class = BackOrFrontAdapter
adapter_class = BackOrFrontAdapter # type: Type[Adapter]
else:
adapter_class = Adapter
return adapter_class(sequence=spec.sequence, where=where, name=name, **parameters)
adapter_class = SingleAdapter
return adapter_class(sequence=aspec.sequence, where=where, name=name, **parameters)
def _parse_linked(self, spec1: str, spec2: str, name, cmdline_type):
def _parse_linked(self, spec1: str, spec2: str, name: Optional[str], cmdline_type: str) -> LinkedAdapter:
"""Return a linked adapter from two specification strings"""
if cmdline_type == 'anywhere':
......@@ -375,9 +382,9 @@ class AdapterParser:
front_required = front_parameters.pop('required', front_required)
back_required = back_parameters.pop('required', back_required)
front_adapter = Adapter(front_spec.sequence, where=front_spec.where(), name=None,
front_adapter = SingleAdapter(front_spec.sequence, where=front_spec.where(), name=None,
**front_parameters)
back_adapter = Adapter(back_spec.sequence, where=back_spec.where(), name=None,
back_adapter = SingleAdapter(back_spec.sequence, where=back_spec.where(), name=None,
**back_parameters)
return LinkedAdapter(
......@@ -388,7 +395,7 @@ class AdapterParser:
name=name,
)
def parse(self, spec, cmdline_type='back'):
def parse(self, spec: str, cmdline_type: str = 'back') -> Iterator[Adapter]:
"""
Parse an adapter specification and yield appropriate Adapter classes.
This works like the _parse_no_file() function above, but also supports the
......@@ -407,7 +414,7 @@ class AdapterParser:
else:
yield self._parse(spec, cmdline_type, name=None)
def parse_multi(self, type_spec_pairs):
def parse_multi(self, type_spec_pairs: List[Tuple[str, str]]) -> List[Adapter]:
"""
Parse all three types of commandline options that can be used to
specify adapters. adapters must be a list of (str, str) pairs, where the first is
......@@ -416,7 +423,7 @@ class AdapterParser:
Return a list of appropriate Adapter classes.
"""
adapters = []
adapters = [] # type: List[Adapter]
for cmdline_type, spec in type_spec_pairs:
if cmdline_type not in {'front', 'back', 'anywhere'}:
raise ValueError('adapter type must be front, back or anywhere')
......
......@@ -4,17 +4,19 @@ import sys
import copy
import logging
import functools
from typing import List, IO, Optional, BinaryIO, TextIO, Any, Tuple, Union
from abc import ABC, abstractmethod
from multiprocessing import Process, Pipe, Queue
from pathlib import Path
import multiprocessing.connection
from multiprocessing.connection import Connection
import traceback
from xopen import xopen
import dnaio
from .utils import Progress, FileOpener
from .modifiers import PairedModifier
from .modifiers import Modifier, PairedModifier, PairedModifierWrapper
from .report import Statistics
from .filters import (Redirector, PairedRedirector, NoFilter, PairedNoFilter, InfoFileWriter,
RestFileWriter, WildcardFileWriter, TooShortReadFilter, TooLongReadFilter, NContentFilter,
......@@ -25,7 +27,7 @@ logger = logging.getLogger()
class InputFiles:
def __init__(self, file1, file2=None, interleaved=False):
def __init__(self, file1: BinaryIO, file2: Optional[BinaryIO] = None, interleaved: bool = False):
self.file1 = file1
self.file2 = file2
self.interleaved = interleaved
......@@ -44,20 +46,20 @@ class OutputFiles:
# TODO interleaving for the other file pairs (too_short, too_long, untrimmed)?
def __init__(
self,
out=None,
out2=None,
untrimmed=None,
untrimmed2=None,
too_short=None,
too_short2=None,
too_long=None,
too_long2=None,
info=None,
rest=None,
wildcard=None,
demultiplex=False,
interleaved=False,
force_fasta=None,
out: Optional[BinaryIO] = None,
out2: Optional[BinaryIO] = None,
untrimmed: Optional[BinaryIO] = None,
untrimmed2: Optional[BinaryIO] = None,
too_short: Optional[BinaryIO] = None,
too_short2: Optional[BinaryIO] = None,
too_long: Optional[BinaryIO] = None,
too_long2: Optional[BinaryIO] = None,
info: Optional[BinaryIO] = None,
rest: Optional[BinaryIO] = None,
wildcard: Optional[BinaryIO] = None,
demultiplex: bool = False,
interleaved: bool = False,
force_fasta: Optional[bool] = None,
):
self.out = out
self.out2 = out2
......@@ -95,13 +97,14 @@ class Pipeline(ABC):
n_adapters = 0
def __init__(self, file_opener: FileOpener):
self._close_files = []
self._close_files = [] # type: List[IO]
self._reader = None
self._filters = None
self._modifiers = []
self._outfiles = None
self._filters = [] # type: List[Any]
# TODO type should be Union[List[Modifier], List[PairedModifier]]
self._modifiers = [] # type: List[Union[Modifier, PairedModifier]]
self._outfiles = None # type: Optional[OutputFiles]
self._demultiplexer = None
self._textiowrappers = []
self._textiowrappers = [] # type: List[TextIO]
# Filter settings
self._minimum_length = None
......@@ -117,7 +120,13 @@ class Pipeline(ABC):
interleaved=infiles.interleaved, mode='r')
self._set_output(outfiles)
def _open_writer(self, file, file2=None, force_fasta=None, **kwargs):
def _open_writer(
self,
file: BinaryIO,
file2: Optional[BinaryIO] = None,
force_fasta: Optional[bool] = None,
**kwargs
):
# TODO backwards-incompatible change (?) would be to use outfiles.interleaved
# for all outputs
if force_fasta:
......@@ -125,7 +134,7 @@ class Pipeline(ABC):
# file and file2 must already be file-like objects because we don’t want to
# take care of threads and compression levels here.
for f in (file, file2):
assert not (f is None and isinstance(f, (str, bytes, Path)))
assert not isinstance(f, (str, bytes, Path))
return dnaio.open(file, file2=file2, mode='w', qualities=self.uses_qualities,
**kwargs)
......@@ -193,16 +202,18 @@ class Pipeline(ABC):
untrimmed_filter_wrapper(untrimmed_writer, DiscardUntrimmedFilter(), DiscardUntrimmedFilter()))
self._filters.append(self._final_filter(outfiles))
def flush(self):
def flush(self) -> None:
for f in self._textiowrappers:
f.flush()
assert self._outfiles is not None
for f in self._outfiles:
if f is not None:
f.flush()
def close(self):
def close(self) -> None:
for f in self._textiowrappers:
f.close() # This also closes the underlying files; a second close occurs below
assert self._outfiles is not None
for f in self._outfiles:
# TODO do not use hasattr
if f is not None and f is not sys.stdin and f is not sys.stdout and hasattr(f, 'close'):
......@@ -211,7 +222,8 @@ class Pipeline(ABC):
self._demultiplexer.close()
@property
def uses_qualities(self):
def uses_qualities(self) -> bool:
assert self._reader is not None
return self._reader.delivers_qualities
@abstractmethod
......@@ -227,11 +239,11 @@ class Pipeline(ABC):
pass
@abstractmethod
def _final_filter(self, outfiles):
def _final_filter(self, outfiles: OutputFiles):
pass
@abstractmethod
def _create_demultiplexer(self, outfiles):
def _create_demultiplexer(self, outfiles: OutputFiles):
pass
......@@ -245,7 +257,7 @@ class SingleEndPipeline(Pipeline):
super().__init__(file_opener)
self._modifiers = []
def add(self, modifier):
def add(self, modifier: Modifier):
if modifier is None:
raise ValueError("Modifier must not be None")
self._modifiers.append(modifier)
......@@ -254,6 +266,7 @@ class SingleEndPipeline(Pipeline):
"""Run the pipeline. Return statistics"""
n = 0 # no. of processed reads # TODO turn into attribute
total_bp = 0
assert self._reader is not None
for read in self._reader:
n += 1
if n % 10000 == 0 and progress:
......@@ -273,12 +286,12 @@ class SingleEndPipeline(Pipeline):
def _untrimmed_filter_wrapper(self):
return Redirector
def _final_filter(self, outfiles):
assert outfiles.out2 is None
def _final_filter(self, outfiles: OutputFiles):
assert outfiles.out2 is None and outfiles.out is not None
writer = self._open_writer(outfiles.out, force_fasta=outfiles.force_fasta)
return NoFilter(writer)
def _create_demultiplexer(self, outfiles):
def _create_demultiplexer(self, outfiles: OutputFiles):
return Demultiplexer(outfiles.out, outfiles.untrimmed, qualities=self.uses_qualities,
file_opener=self.file_opener)
......@@ -314,30 +327,31 @@ class PairedEndPipeline(Pipeline):
# Whether to ignore pair_filter mode for discard-untrimmed filter
self.override_untrimmed_pair_filter = False
def add(self, modifier1, modifier2):
def add(self, modifier1: Optional[Modifier], modifier2: Optional[Modifier]):
"""
Add a modifier for R1 and R2. One of them can be None, in which case the modifier
will only be added for the respective read.
"""
if modifier1 is None and modifier2 is None:
raise ValueError("Not both modifiers can be None")
self._modifiers.append(PairedModifier(modifier1, modifier2))
self._modifiers.append(PairedModifierWrapper(modifier1, modifier2))
def add_both(self, modifier):
def add_both(self, modifier: Modifier):
"""
Add one modifier for both R1 and R2
"""
assert modifier is not None
self._modifiers.append(PairedModifier(modifier, copy.copy(modifier)))
self._modifiers.append(PairedModifierWrapper(modifier, copy.copy(modifier)))
def add_paired_modifier(self, paired_modifier):
"""Add a Modifier without wrapping it in a PairedModifier"""
def add_paired_modifier(self, paired_modifier: PairedModifier):
"""Add a Modifier (without wrapping it in a PairedModifierWrapper)"""
self._modifiers.append(paired_modifier)
def process_reads(self, progress: Progress = None):
n = 0 # no. of processed reads
total1_bp = 0
total2_bp = 0
assert self._reader is not None
for read1, read2 in self._reader:
n += 1
if n % 10000 == 0 and progress:
......@@ -403,53 +417,65 @@ class PairedEndPipeline(Pipeline):
self._maximum_length = value
def reader_process(file, file2, connections, queue, buffer_size, stdin_fd):
class ReaderProcess(Process):
"""
Read chunks of FASTA or FASTQ data from *file* and send to a worker.
Read chunks of FASTA or FASTQ data (single-end or paired) and send to a worker.
queue -- a Queue of worker indices. A worker writes its own index into this
queue to notify the reader that it is ready to receive more data.
connections -- a list of Connection objects, one for each worker.
The function repeatedly
The reader repeatedly
- reads a chunk from the file
- reads a chunk from the file(s)
- reads a worker index from the Queue
- sends the chunk to connections[index]
and finally sends "poison pills" (the value -1) to all connections.
and finally sends the stop token -1 ("poison pills") to all connections.
"""
def send_to_worker(chunk_index, chunk1, chunk2=None):
worker_index = queue.get()
connection = connections[worker_index]
connection.send(chunk_index)
connection.send_bytes(chunk1)
if chunk2 is not None:
connection.send_bytes(chunk2)
def __init__(self, file, file2, connections, queue, buffer_size, stdin_fd):
"""
queue -- a Queue of worker indices. A worker writes its own index into this
queue to notify the reader that it is ready to receive more data.
connections -- a list of Connection objects, one for each worker.
"""
super().__init__()
self.file = file
self.file2 = file2
self.connections = connections
self.queue = queue
self.buffer_size = buffer_size
self.stdin_fd = stdin_fd
if stdin_fd != -1:
def run(self):
if self.stdin_fd != -1:
sys.stdin.close()
sys.stdin = os.fdopen(stdin_fd)
sys.stdin = os.fdopen(self.stdin_fd)
try:
with xopen(file, 'rb') as f:
if file2:
with xopen(file2, 'rb') as f2:
for chunk_index, (chunk1, chunk2) in enumerate(dnaio.read_paired_chunks(f, f2, buffer_size)):
send_to_worker(chunk_index, chunk1, chunk2)
with xopen(self.file, 'rb') as f:
if self.file2:
with xopen(self.file2, 'rb') as f2:
for chunk_index, (chunk1, chunk2) in enumerate(
dnaio.read_paired_chunks(f, f2, self.buffer_size)):
self.send_to_worker(chunk_index, chunk1, chunk2)
else:
for chunk_index, chunk in enumerate(dnaio.read_chunks(f, buffer_size)):
send_to_worker(chunk_index, chunk)
for chunk_index, chunk in enumerate(dnaio.read_chunks(f, self.buffer_size)):
self.send_to_worker(chunk_index, chunk)
# Send poison pills to all workers
for _ in range(len(connections)):
worker_index = queue.get()
connections[worker_index].send(-1)
for _ in range(len(self.connections)):
worker_index = self.queue.get()
self.connections[worker_index].send(-1)
except Exception as e:
# TODO better send this to a common "something went wrong" Queue
for worker_index in range(len(connections)):
connections[worker_index].send(-2)
connections[worker_index].send((e, traceback.format_exc()))
for connection in self.connections:
connection.send(-2)
connection.send((e, traceback.format_exc()))
def send_to_worker(self, chunk_index, chunk1, chunk2=None):
worker_index = self.queue.get()
connection = self.connections[worker_index]
connection.send(chunk_index)
connection.send_bytes(chunk1)
if chunk2 is not None:
connection.send_bytes(chunk2)
class WorkerProcess(Process):
......@@ -535,7 +561,7 @@ class WorkerProcess(Process):
return output_files
def _send_outfiles(self, outfiles, chunk_index, n_reads):
def _send_outfiles(self, outfiles: OutputFiles, chunk_index: int, n_reads: int):
self._write_pipe.send(chunk_index)
self._write_pipe.send(n_reads)
......@@ -543,6 +569,7 @@ class WorkerProcess(Process):
if f is None:
continue
f.flush()
assert isinstance(f, io.BytesIO)
processed_chunk = f.getvalue()
self._write_pipe.send_bytes(processed_chunk)
......@@ -575,7 +602,7 @@ class PipelineRunner(ABC):
"""
A read processing pipeline
"""
def __init__(self, pipeline, progress):
def __init__(self, pipeline: Pipeline, progress: Progress):
self._pipeline = pipeline
self._progress = progress
......@@ -618,48 +645,47 @@ class ParallelPipelineRunner(PipelineRunner):
outfiles: OutputFiles,
progress: Progress,
n_workers: int,
buffer_size=4*1024**2,
buffer_size: int = 4 * 1024**2,
):
super().__init__(pipeline, progress)
self._pipes = [] # the workers read from these
self._reader_process = None
self._outfiles = None
self._two_input_files = None
self._interleaved_input = None
self._n_workers = n_workers
self._need_work_queue = Queue()
self._need_work_queue = Queue() # type: Queue
self._buffer_size = buffer_size
self._assign_input(infiles.file1, infiles.file2, infiles.interleaved)
self._assign_output(outfiles)
def _assign_input(self, file1, file2=None, interleaved=False):
if self._reader_process is not None:
raise RuntimeError('Do not call connect_io more than once')
def _assign_input(
self,
file1: BinaryIO,
file2: Optional[BinaryIO] = None,
interleaved: bool = False,
):
self._two_input_files = file2 is not None
self._interleaved_input = interleaved
# the workers read from these connections
connections = [Pipe(duplex=False) for _ in range(self._n_workers)]
self._pipes, connw = zip(*connections)
self._connections, connw = zip(*connections)
try:
fileno = sys.stdin.fileno()
except io.UnsupportedOperation:
# This happens during tests: pytest sets sys.stdin to an object
# that does not have a file descriptor.
fileno = -1
self._reader_process = Process(target=reader_process, args=(file1, file2, connw,
self._need_work_queue, self._buffer_size, fileno))
self._reader_process = ReaderProcess(file1, file2, connw,
self._need_work_queue, self._buffer_size, fileno)
self._reader_process.daemon = True
self._reader_process.start()
@staticmethod
def can_output_to(outfiles):
def can_output_to(outfiles: OutputFiles) -> bool:
return outfiles.out is not None and not outfiles.demultiplex
def _assign_output(self, outfiles):
def _assign_output(self, outfiles: OutputFiles):
if not self.can_output_to(outfiles):
raise ValueError()
self._outfiles = outfiles
def _start_workers(self):
def _start_workers(self) -> Tuple[List[WorkerProcess], List[Connection]]:
workers = []
connections = []
for index in range(self._n_workers):
......@@ -669,7 +695,7 @@ class ParallelPipelineRunner(PipelineRunner):
index, self._pipeline,
self._two_input_files,
self._interleaved_input, self._outfiles,
self._pipes[index], conn_w, self._need_work_queue)
self._connections[index], conn_w, self._need_work_queue)
worker.daemon = True
worker.start()
workers.append(worker)
......
......@@ -3,8 +3,10 @@ Routines for printing a report.
"""
from io import StringIO
import textwrap
from .adapters import Where, EndStatistics, ADAPTER_TYPE_NAMES
from .modifiers import QualityTrimmer, NextseqQualityTrimmer, AdapterCutter, PairedAdapterCutter
from typing import Any, Optional, List
from .adapters import Where, EndStatistics, AdapterStatistics, ADAPTER_TYPE_NAMES
from .modifiers import (Modifier, PairedModifier, QualityTrimmer, NextseqQualityTrimmer,
AdapterCutter, PairedAdapterCutter, ReverseComplementer)
from .filters import (NoFilter, PairedNoFilter, TooShortReadFilter, TooLongReadFilter,
PairedDemultiplexer, CombinatorialDemultiplexer, Demultiplexer, NContentFilter, InfoFileWriter,
WildcardFileWriter, RestFileWriter)
......@@ -26,23 +28,24 @@ def add_if_not_none(a, b):
class Statistics:
def __init__(self):
def __init__(self) -> None:
"""
"""
self.paired = None
self.paired = None # type: Optional[bool]
self.did_quality_trimming = None # type: Optional[bool]
self.too_short = None
self.too_long = None
self.too_many_n = None
self.did_quality_trimming = None
self.reverse_complemented = None # type: Optional[int]
self.n = 0
self.written = 0
self.total_bp = [0, 0]
self.written_bp = [0, 0]
self.with_adapters = [0, 0]
self.quality_trimmed_bp = [0, 0]
self.adapter_stats = [[], []]
self.adapter_stats = [[], []] # type: List[List[AdapterStatistics]]
def __iadd__(self, other):
def __iadd__(self, other: Any):
self.n += other.n
self.written += other.written
......@@ -55,6 +58,8 @@ class Statistics:
elif self.did_quality_trimming != other.did_quality_trimming:
raise ValueError('Incompatible Statistics: did_quality_trimming is not equal')
self.reverse_complemented = add_if_not_none(
self.reverse_complemented, other.reverse_complemented)
self.too_short = add_if_not_none(self.too_short, other.too_short)
self.too_long = add_if_not_none(self.too_long, other.too_long)
self.too_many_n = add_if_not_none(self.too_many_n, other.too_many_n)
......@@ -111,13 +116,13 @@ class Statistics:
elif isinstance(w.filter, NContentFilter):
self.too_many_n = w.filtered
def _collect_modifier(self, m):
def _collect_modifier(self, m: Modifier):
if isinstance(m, PairedAdapterCutter):
for i in 0, 1:
self.with_adapters[i] += m.with_adapters
self.adapter_stats[i] = list(m.adapter_statistics[i].values())
return
if getattr(m, 'paired', False):
if isinstance(m, PairedModifier):
modifiers_list = [(0, m._modifier1), (1, m._modifier2)]
else:
modifiers_list = [(0, m)]
......@@ -128,6 +133,10 @@ class Statistics:
elif isinstance(modifier, AdapterCutter):
self.with_adapters[i] += modifier.with_adapters
self.adapter_stats[i] = list(modifier.adapter_statistics.values())
elif isinstance(modifier, ReverseComplementer):
self.with_adapters[i] += modifier.adapter_cutter.with_adapters
self.adapter_stats[i] = list(modifier.adapter_cutter.adapter_statistics.values())
self.reverse_complemented = modifier.reverse_complemented
@property
def total(self):
......@@ -157,6 +166,10 @@ class Statistics:
def total_written_bp_fraction(self):
return safe_divide(self.total_written_bp, self.total)
@property
def reverse_complemented_fraction(self):
return safe_divide(self.reverse_complemented, self.n)
@property
def too_short_fraction(self):
return safe_divide(self.too_short, self.n)
......@@ -262,7 +275,7 @@ class AdjacentBaseStatistics:
return sio.getvalue()
def full_report(stats, time, gc_content) -> str:
def full_report(stats: Statistics, time: float, gc_content: float) -> str: # noqa: C901
"""Print report to standard output."""
if stats.n == 0:
return "No reads processed!"
......@@ -288,6 +301,9 @@ def full_report(stats, time, gc_content) -> str:
Total reads processed: {o.n:13,d}
Reads with adapters: {o.with_adapters[0]:13,d} ({o.with_adapters_fraction[0]:.1%})
""")
if stats.reverse_complemented is not None:
report += "Reverse-complemented: " \
"{o.reverse_complemented:13,d} ({o.reverse_complemented_fraction:.1%})\n"
if stats.too_short is not None:
report += "{pairs_or_reads} that were too short: {o.too_short:13,d} ({o.too_short_fraction:.1%})\n"
if stats.too_long is not None:
......@@ -324,6 +340,7 @@ def full_report(stats, time, gc_content) -> str:
total_front = sum(adapter_statistics.front.lengths.values())
total_back = sum(adapter_statistics.back.lengths.values())
total = total_front + total_back
reverse_complemented = adapter_statistics.reverse_complemented
where = adapter_statistics.where
where_backs = (Where.BACK, Where.BACK_NOT_INTERNAL, Where.SUFFIX)
where_fronts = (Where.FRONT, Where.FRONT_NOT_INTERNAL, Where.PREFIX)
......@@ -348,9 +365,13 @@ def full_report(stats, time, gc_content) -> str:
len(adapter_statistics.back.sequence),
total_front, total_back))
else:
print_s("Sequence: {}; Type: {}; Length: {}; Trimmed: {} times.".
print_s("Sequence: {}; Type: {}; Length: {}; Trimmed: {} times".
format(adapter_statistics.front.sequence, ADAPTER_TYPE_NAMES[adapter_statistics.where],
len(adapter_statistics.front.sequence), total))
len(adapter_statistics.front.sequence), total), end="")
if reverse_complemented is not None:
print_s("; Reverse-complemented: {} times".format(reverse_complemented))
else:
print_s()
if total == 0:
print_s()
continue
......@@ -396,7 +417,7 @@ def full_report(stats, time, gc_content) -> str:
return sio.getvalue().rstrip()
def minimal_report(stats, time, gc_content) -> str:
def minimal_report(stats, _time, _gc_content) -> str:
"""Create a minimal tabular report suitable for concatenation"""
def none(value):
......
......@@ -154,6 +154,23 @@ class FileOpener:
def xopen(self, path, mode):
return xopen(path, mode, compresslevel=self.compression_level, threads=self.threads)
def xopen_or_none(self, path, mode):
"""Return opened file or None if the path is None"""
if path is None:
return None
return self.xopen(path, mode)
def xopen_pair(self, path1, path2, mode):
file1 = file2 = None
if path1 is not None:
file1 = self.xopen(path1, mode)
if path2 is not None:
file2 = self.xopen(path2, mode)
elif path2 is not None:
raise ValueError("When giving paths for paired-end files, only providing the second"
" file is not supported")
return file1, file2
def dnaio_open(self, *args, **kwargs):
kwargs["opener"] = self.xopen
return dnaio.open(*args, **kwargs)
......
r1 5' adapter and 3' adapter 0 0 10 AAAAAAAAAA CCCCCCCCCCTTTTTTTTTTGGGGGGG linkedadapter;1
r1 5' adapter and 3' adapter 0 10 20 CCCCCCCCCC TTTTTTTTTT GGGGGGG linkedadapter;2
r2 without any adapter -1 GGGGGGGGGGGGGGGGGGG
r3 5' adapter, partial 3' adapter 0 0 10 AAAAAAAAAA CCCGGCCCCCTTTTT linkedadapter;1
r3 5' adapter, partial 3' adapter 0 10 15 CCCGGCCCCC TTTTT linkedadapter;2
r4 only 3' adapter -1 GGGGGGGGGGCCCCCCCCCCTTTTTTTTTTGGGGGGG
r5 only 5' adapter 0 0 10 AAAAAAAAAA CCCCCCCCCCGGGGGGG linkedadapter;1
r6 partial 5' adapter -1 AAAAAACCCCCCCCCCTTTTTTTTTTGGGGGGG
r7 5' adapter plus preceding bases -1 AACCGGTTTTAAAAAAAAAACCCCCCCCCCTTTTTTTTTTGGGGGGG
>r1 5' adapter and 3' adapter
AAAAAAAAAACCCCCCCCCCTTTTTTTTTTGGGGGGG
>r2 without any adapter
GGGGGGGGGGGGGGGGGGG
>r3 5' adapter, partial 3' adapter
aaaaAAAAAACCCGGCCCCCTtttt
>r4 only 3' adapter
GGGGGGGGGGCCCCCCCCCCTTTTTTTTTTGGGGGGG
>r5 only 5' adapter
AAAAAAAAAACCCCCCCCCCGGGGGGG
>r6 partial 5' adapter
AAAAAACCCCCCCCCCTTTTTTTTTTGGGGGGG
>r7 5' adapter plus preceding bases
aaccggttttaaaaAAAAAACCCCCCCCCCTTTTTTttttggggggg
@read1/1
CCAGCTTAGACATATCGCCT
+
G=1C(C1=J1J=C(18C(1(
@read2/1 rc
ACCATCCGATATGTCTAATGTGGCCTGTTG
+
18C=((GC=C88(1JJ=JC88=C1(CJJJ8
@read3/1
CCAACTTGATATTAATAACATTagacaXataa
+
111C1=8(G(8CJ8118=C8J=88G(J=8(=8
@read4/1
GACAGGCCGTTTGAATGTTGACGGGATGTT
+
11=CCJJ(G===1==J8G1=8=J=CCGJ(J
@read5
CCAGCTTAGACATATCGCCT
+
(J=1(G(J1GCC==CCG(=G
@read6 rc
ATGAGGAAGAAGTCAGATATGATCGAAATGTTG
+
11G1=G8(CJJGC=G(JG1J=(=1(J88=J1(J
@read1/1
CCAGCTTAGACATATCGCCT
+
G=1C(C1=J1J=C(18C(1(
@read2/1
CAACAGGCCACATTAGACATATCGGATGGT
+
8JJJC(1C=88CJ=JJ1(88C=CG((=C81
@read3/1
CCAACTTGATATTAATAACATTagacaXataa
+
111C1=8(G(8CJ8118=C8J=88G(J=8(=8
@read4/1
GACAGGCCGTTTGAATGTTGACGGGATGTT
+
11=CCJJ(G===1==J8G1=8=J=CCGJ(J
@read5
CCAGCTTAGACATATCGCCT
+
(J=1(G(J1GCC==CCG(=G
@read6
CAACATTTCGATCATATCTGACTTCTTCCTCAT
+
J(1J=88J(1=(=J1GJ(G=CGJJC(8G=1G11
@read1/1
ttatttgtctCCAGCTTAGACATATCGCCT
+
(8J181J(J1G=1C(C1=J1J=C(18C(1(
@read2/1
CAACAGGCCACATTAGACATATCGGATGGTagacaaataa
+
8JJJC(1C=88CJ=JJ1(88C=CG((=C81GJG8=J=8=(
@read3/1
tccgcactggCCAACTTGATATTAATAACATTagacaXataa
+
1GC((GCG=(111C1=8(G(8CJ8118=C8J=88G(J=8(=8
@read4/1
GACAGGCCGTTTGAATGTTGACGGGATGTT
+
11=CCJJ(G===1==J8G1=8=J=CCGJ(J
@read5
ttatttgtctCCAGCTTAGACATATCGCCT
+
=GG=1GC(JG(J=1(G(J1GCC==CCG(=G
@read6
CAACATTTCGATCATATCTGACTTCTTCCTCATagacaaataa
+
J(1J=88J(1=(=J1GJ(G=CGJJC(8G=1G118JC8(CC8((
@read1/2
GCTGGAGACAAATAACAGTGGAGTAGTTTT
+
0FIF<<IF<B7''F7FB'0'<B'F'7F7BI
@read2/2
TGTGGCCTGTTGCAGTGGAGTAACTCCAGC
+
'<07I'FIB'<<FBI''I0'BFB7F'<<0I
@read3/2
TGTTATTAATATCAAGTTGGCAGTG
+
B7B'<<7BB77'F<'<0F7<0I000
@read4/2
CATCCCGTCAACATTCAAACGGCCTGTCCAagacaaataa
+
7F0<'I0F<'F<B7FB7B<77'00BBF'BFIB<'<BF7<7
@read5
CAACAGGCCACATTAGACATATCGGATGGTagacaaataa
+
7IF<F0B0''I'77<F'IB'<F7FI<<F0'F<7<7BBB'B
@read6
AGAAGCGGGGCGGGTGTCTCccagtgcgga
+
B0FF7I''I7BBFF<<B'FBB0F<I<I700