Skip to content
Commits on Source (10)
......@@ -6,6 +6,12 @@ python:
- '2.7'
- '3.5'
stages:
- lint
- test
- name: deploy
if: tag IS present
env:
global:
- PYTHON=python
......@@ -13,12 +19,19 @@ env:
- secure: 'kFoqHCxat/ETS2SUc2q9M7YvzvnlR7sgHmx7SRvVgTyLkk1efpJ++YPwDBEYZ3v+GLf2nRfc20GxtZkH6ey1f//aj4CT2q2CJiUsKAlkFAOHzKo/3mTLl/WDHkPAr9MW7AdnbNk6W8sIPCKqFsyKL2FTH70dBcxa1e7trQ2RC64hnOOkt/tm2cQhj6sX0gROggN5QrpHE8tDZb9ugF0uf92L/CGxeClAebWgb7zVChHDMTNsmnOvWUF9m6LZOvkgFmuIeh70EPuOWh6LxU/n5JyevYIGO5vVDbjgfmNELlG2KUTm6dWeoyofcj6hUqYmQsmI1ATrf7ThY1+b6asQGy+Exp/76MBXiYRh+RgVKifwaZMOWehzfjDQvPYOGvf6rXOVGeVZ+nBkskr0HARsX1KnyDE+k+XPoP7zqvW6mCic9ZQ+IdQJtxMHOTxxFjuPAlunvaUqDNM9VP6YEWOI4UqIOO1nQh4E2zkPhXI2yY744q+BV/5+3HHqNQj1+5qFPoZeyDEuNXwgDCjrJ8i3hna/LTTvRigx6/YQL1PF/C30R4h/nkqp8ghA4VpNRPnQ8nOO+oD6AdN7Pswc3C4qGPEwoeqfNzEIR1KfEWzB7HsfTFbgyGFFNGuQ/P26DMK+kPBNZ6GhZ9wb5/xT226OA+ovcAmVGn/Hnt/qVaylXNk='
_deploy_common: &deploy_common
if: tag IS present
install:
- $PYTHON -m pip install cibuildwheel twine
matrix:
include:
- stage: lint
python: '3.5'
addons: {}
install:
- $PYTHON -m pip install flake8 flake8-import-order
script:
- flake8 .
- stage: deploy
python: '3.5'
services:
......@@ -32,6 +45,7 @@ matrix:
- $PYTHON setup.py sdist
- twine check dist/*
- twine upload --skip-existing dist/*
- stage: deploy
os: osx
language: generic
......
python-bx (0.8.5-2) UNRELEASED; urgency=medium
python-bx (0.8.6-1) unstable; urgency=medium
* Team upload.
[ Michael R. Crusoe ]
* Disable the bx.interval_index_file_tests.test_interval_index_file test on
i386 Closes: #944409
-- Michael R. Crusoe <michael.crusoe@gmail.com> Sat, 07 Dec 2019 13:25:33 +0100
[ Andreas Tille ]
* New upstream version
* Simplify lintian-override
* Trim trailing whitespace.
* debian/copyright: use spaces rather than tabs to start continuation
lines.
* Set upstream metadata fields: Bug-Database, Repository, Repository-
Browse.
* Enhance description
-- Andreas Tille <tille@debian.org> Wed, 18 Dec 2019 09:25:52 +0100
python-bx (0.8.5-1) unstable; urgency=medium
......
......@@ -30,18 +30,18 @@ Breaks: python3-bx-tools
Provides: python3-bx-tools
Replaces: python3-bx-tools
Description: library to manage genomic data and its alignment
The bx-python project is a python library and associated set of scripts to
The bx-python project is a Python3 library and associated set of scripts to
allow for rapid implementation of genome scale analyses. The library contains
a variety of useful modules, but the particular strengths are:
* Classes for reading and working with genome-scale multiple local
alignments (in MAF, AXT, and LAV formats)
alignments (in MAF, AXT, and LAV formats)
* Generic data structure for indexing on disk files that contain blocks of
data associated with intervals on various sequences (used, for example, to
provide random access to individual alignments in huge files; optimized for
use over network filesystems)
data associated with intervals on various sequences (used, for example, to
provide random access to individual alignments in huge files; optimized
for use over network filesystems)
* Data structures for working with intervals on sequences
* "Binned bitsets" which act just like chromosome sized bit arrays, but
lazily allocate regions and allow large blocks of all set or all unset bits
to be stored compactly
lazily allocate regions and allow large blocks of all set or all unset
bits to be stored compactly
* "Intersecter" for performing fast intersection tests that preserve both
query and target intervals and associated annotation
query and target intervals and associated annotation
......@@ -9,7 +9,7 @@ Files: *
Copyright: 2005-2015 The Pennsylvania State University
James Taylor <james@bx.psu.edu>
Bob Harris <rsharris@bx.psu.edu>
2013-2015 The Johns Hopkins University
2013-2015 The Johns Hopkins University
License: MIT
Files: lib/bx_extras/fpconst.py
......
# Not fiddling with .py endings because of reverse-dependencies expected from toil
# We shall revisit this decision once all packages for toil are in the distribution
# and we have learned more about how bx is used in the community in 3rd party scripts.
python3-bx: script-with-language-extension usr/bin/aggregate_scores_in_intervals.py
python3-bx: script-with-language-extension usr/bin/align_print_template.py
python3-bx: script-with-language-extension usr/bin/axt_extract_ranges.py
python3-bx: script-with-language-extension usr/bin/axt_to_fasta.py
python3-bx: script-with-language-extension usr/bin/axt_to_lav.py
python3-bx: script-with-language-extension usr/bin/axt_to_maf.py
python3-bx: script-with-language-extension usr/bin/bed_bigwig_profile.py
python3-bx: script-with-language-extension usr/bin/bed_build_windows.py
python3-bx: script-with-language-extension usr/bin/bed_complement.py
python3-bx: script-with-language-extension usr/bin/bed_count_by_interval.py
python3-bx: script-with-language-extension usr/bin/bed_count_overlapping.py
python3-bx: script-with-language-extension usr/bin/bed_coverage.py
python3-bx: script-with-language-extension usr/bin/bed_coverage_by_interval.py
python3-bx: script-with-language-extension usr/bin/bed_diff_basewise_summary.py
python3-bx: script-with-language-extension usr/bin/bed_extend_to.py
python3-bx: script-with-language-extension usr/bin/bed_intersect.py
python3-bx: script-with-language-extension usr/bin/bed_intersect_basewise.py
python3-bx: script-with-language-extension usr/bin/bed_merge_overlapping.py
python3-bx: script-with-language-extension usr/bin/bed_rand_intersect.py
python3-bx: script-with-language-extension usr/bin/bed_subtract_basewise.py
python3-bx: script-with-language-extension usr/bin/bnMapper.py
python3-bx: script-with-language-extension usr/bin/div_snp_table_chr.py
python3-bx: script-with-language-extension usr/bin/find_in_sorted_file.py
python3-bx: script-with-language-extension usr/bin/gene_fourfold_sites.py
python3-bx: script-with-language-extension usr/bin/get_scores_in_intervals.py
python3-bx: script-with-language-extension usr/bin/int_seqs_to_char_strings.py
python3-bx: script-with-language-extension usr/bin/interval_count_intersections.py
python3-bx: script-with-language-extension usr/bin/interval_join.py
python3-bx: script-with-language-extension usr/bin/lav_to_axt.py
python3-bx: script-with-language-extension usr/bin/lav_to_maf.py
python3-bx: script-with-language-extension usr/bin/line_select.py
python3-bx: script-with-language-extension usr/bin/lzop_build_offset_table.py
python3-bx: script-with-language-extension usr/bin/mMK_bitset.py
python3-bx: script-with-language-extension usr/bin/maf_build_index.py
python3-bx: script-with-language-extension usr/bin/maf_chop.py
python3-bx: script-with-language-extension usr/bin/maf_chunk.py
python3-bx: script-with-language-extension usr/bin/maf_col_counts.py
python3-bx: script-with-language-extension usr/bin/maf_col_counts_all.py
python3-bx: script-with-language-extension usr/bin/maf_count.py
python3-bx: script-with-language-extension usr/bin/maf_covered_ranges.py
python3-bx: script-with-language-extension usr/bin/maf_covered_regions.py
python3-bx: script-with-language-extension usr/bin/maf_div_sites.py
python3-bx: script-with-language-extension usr/bin/maf_drop_overlapping.py
python3-bx: script-with-language-extension usr/bin/maf_extract_chrom_ranges.py
python3-bx: script-with-language-extension usr/bin/maf_extract_ranges.py
python3-bx: script-with-language-extension usr/bin/maf_extract_ranges_indexed.py
python3-bx: script-with-language-extension usr/bin/maf_filter.py
python3-bx: script-with-language-extension usr/bin/maf_filter_max_wc.py
python3-bx: script-with-language-extension usr/bin/maf_gap_frequency.py
python3-bx: script-with-language-extension usr/bin/maf_gc_content.py
python3-bx: script-with-language-extension usr/bin/maf_interval_alignibility.py
python3-bx: script-with-language-extension usr/bin/maf_limit_to_species.py
python3-bx: script-with-language-extension usr/bin/maf_mapping_word_frequency.py
python3-bx: script-with-language-extension usr/bin/maf_mask_cpg.py
python3-bx: script-with-language-extension usr/bin/maf_mean_length_ungapped_piece.py
python3-bx: script-with-language-extension usr/bin/maf_percent_columns_matching.py
python3-bx: script-with-language-extension usr/bin/maf_percent_identity.py
python3-bx: script-with-language-extension usr/bin/maf_print_chroms.py
python3-bx: script-with-language-extension usr/bin/maf_print_scores.py
python3-bx: script-with-language-extension usr/bin/maf_randomize.py
python3-bx: script-with-language-extension usr/bin/maf_region_coverage_by_src.py
python3-bx: script-with-language-extension usr/bin/maf_select.py
python3-bx: script-with-language-extension usr/bin/maf_shuffle_columns.py
python3-bx: script-with-language-extension usr/bin/maf_species_in_all_files.py
python3-bx: script-with-language-extension usr/bin/maf_split_by_src.py
python3-bx: script-with-language-extension usr/bin/maf_thread_for_species.py
python3-bx: script-with-language-extension usr/bin/maf_tile.py
python3-bx: script-with-language-extension usr/bin/maf_tile_2.py
python3-bx: script-with-language-extension usr/bin/maf_tile_2bit.py
python3-bx: script-with-language-extension usr/bin/maf_to_axt.py
python3-bx: script-with-language-extension usr/bin/maf_to_concat_fasta.py
python3-bx: script-with-language-extension usr/bin/maf_to_fasta.py
python3-bx: script-with-language-extension usr/bin/maf_to_int_seqs.py
python3-bx: script-with-language-extension usr/bin/maf_translate_chars.py
python3-bx: script-with-language-extension usr/bin/maf_truncate.py
python3-bx: script-with-language-extension usr/bin/maf_word_frequency.py
python3-bx: script-with-language-extension usr/bin/mask_quality.py
python3-bx: script-with-language-extension usr/bin/nib_chrom_intervals_to_fasta.py
python3-bx: script-with-language-extension usr/bin/nib_intervals_to_fasta.py
python3-bx: script-with-language-extension usr/bin/nib_length.py
python3-bx: script-with-language-extension usr/bin/one_field_per_line.py
python3-bx: script-with-language-extension usr/bin/out_to_chain.py
python3-bx: script-with-language-extension usr/bin/prefix_lines.py
python3-bx: script-with-language-extension usr/bin/pretty_table.py
python3-bx: script-with-language-extension usr/bin/qv_to_bqv.py
python3-bx: script-with-language-extension usr/bin/random_lines.py
python3-bx: script-with-language-extension usr/bin/table_add_column.py
python3-bx: script-with-language-extension usr/bin/table_filter.py
python3-bx: script-with-language-extension usr/bin/tfloc_summary.py
python3-bx: script-with-language-extension usr/bin/ucsc_gene_table_to_intervals.py
python3-bx: script-with-language-extension usr/bin/wiggle_to_array_tree.py
python3-bx: script-with-language-extension usr/bin/wiggle_to_binned_array.py
python3-bx: script-with-language-extension usr/bin/wiggle_to_chr_binned_array.py
python3-bx: script-with-language-extension usr/bin/wiggle_to_simple.py
# see https://lists.debian.org/debian-med/2018/06/msg00043.html
python3-bx: script-with-language-extension usr/bin/*.py
......@@ -26,5 +26,3 @@ override_dh_auto_clean:
grep -rli "Generated by Cython" lib/ | xargs -r rm
# Changes perpetually redone
rm -rf lib/bx_python.egg-info
Registry:
- Name: OMICtools
Entry: OMICS_18239
- Name: bio.tools
Entry: NA
- Name: SciCrunch
Entry: NA
- Name: conda:bioconda
Entry: bx-python
- Name: OMICtools
Entry: OMICS_18239
- Name: bio.tools
Entry: NA
- Name: SciCrunch
Entry: NA
- Name: conda:bioconda
Entry: bx-python
Bug-Database: https://github.com/bxlab/bx-python/issues
Repository: https://github.com/bxlab/bx-python.git
Repository-Browse: https://github.com/bxlab/bx-python
......@@ -14,15 +14,9 @@
# All configuration values have a default; values that are commented out
# serve to show the default.
import sys, os
# If your extensions are in another directory, add it here. If the directory
# is relative to the documentation root, use os.path.abspath to make it
# absolute, like shown here.
#sys.path.append(os.path.abspath('.'))
## curr_dir = os.path.dirname( __file__ )
## bx_dir = os.path.join( curr_dir, '..', '..', 'lib')
## sys.path.insert( 0, bx_dir )
import bx
# General configuration
......@@ -132,7 +126,7 @@ html_static_path = ['static']
# Custom sidebar templates, maps document names to template names.
html_index = 'index.html'
html_sidebars = { 'index': 'indexsidebar.html'}
html_sidebars = {'index': 'indexsidebar.html'}
# Additional templates that should be rendered to pages, maps page names to
# template names.
......@@ -175,10 +169,9 @@ htmlhelp_basename = 'bx-doc'
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, author, document class [howto/manual]).
latex_documents = [
('index', 'bx-python.tex', ur'bx-python Documentation',
ur'James Taylor', 'manual'),
]
latex_documents = [(
'index', 'bx-python.tex', u'bx-python Documentation',
u'James Taylor', 'manual'), ]
# The name of an image file (relative to this directory) to place at the top of
# the title page.
......
......@@ -6,18 +6,16 @@ Setuptools bootstrapping installer.
Run this script to install or upgrade setuptools.
"""
import contextlib
import optparse
import os
import platform
import shutil
import subprocess
import sys
import tempfile
import zipfile
import optparse
import subprocess
import platform
import textwrap
import contextlib
import warnings
import zipfile
from distutils import log
try:
......@@ -244,6 +242,8 @@ def has_powershell():
except Exception:
return False
return True
download_file_powershell.viable = has_powershell
......@@ -260,6 +260,8 @@ def has_curl():
except Exception:
return False
return True
download_file_curl.viable = has_curl
......@@ -276,6 +278,8 @@ def has_wget():
except Exception:
return False
return True
download_file_wget.viable = has_wget
......@@ -291,6 +295,8 @@ def download_file_insecure(url, target):
# Write all the data in one block to avoid creating a partial file.
with open(target, "wb") as dst:
dst.write(data)
download_file_insecure.viable = lambda: True
......@@ -362,9 +368,9 @@ def _parse_args():
default=DEFAULT_VERSION,
)
parser.add_option(
'--to-dir',
help="Directory to save (and re-use) package",
default=DEFAULT_SAVE_DIR,
'--to-dir',
help="Directory to save (and re-use) package",
default=DEFAULT_SAVE_DIR,
)
options, args = parser.parse_args()
# positional arguments are ignored
......@@ -372,13 +378,13 @@ def _parse_args():
def _download_args(options):
"""Return args for download_setuptools function from cmdline args."""
return dict(
version=options.version,
download_base=options.download_base,
downloader_factory=options.downloader_factory,
to_dir=options.to_dir,
)
"""Return args for download_setuptools function from cmdline args."""
return dict(
version=options.version,
download_base=options.download_base,
downloader_factory=options.downloader_factory,
to_dir=options.to_dir,
)
def main():
......@@ -387,5 +393,6 @@ def main():
archive = download_setuptools(**_download_args(options))
return _install(archive, _build_install_args(options))
if __name__ == '__main__':
sys.exit(main())
......@@ -4,4 +4,4 @@ the abstract alignment classes and `maf`, `axt`, and `lav` for readers and
writers in various formats.
"""
from bx.align.core import *
from bx.align.core import * # noqa
......@@ -4,106 +4,125 @@ alignments.
.. _AXT: http://genome.ucsc.edu/goldenPath/help/axt.html
"""
import itertools
from six import Iterator
from bx import interval_index_file
from bx.align import *
from bx.align import (
Alignment,
Component,
src_split
)
# Tools for dealing with pairwise alignments in AXT format
class MultiIndexed( object ):
class MultiIndexed(object):
"""Similar to 'indexed' but wraps more than one axt_file"""
def __init__( self, axt_filenames, keep_open=False ):
self.indexes = [ Indexed( axt_file, axt_file + ".index" ) for axt_file in axt_filenames ]
def get( self, src, start, end ):
def __init__(self, axt_filenames, keep_open=False):
self.indexes = [Indexed(axt_file, axt_file + ".index") for axt_file in axt_filenames]
def get(self, src, start, end):
blocks = []
for index in self.indexes: blocks += index.get( src, start, end )
for index in self.indexes:
blocks += index.get(src, start, end)
return blocks
class Indexed( object ):
class Indexed(object):
"""Indexed access to a axt using overlap queries, requires an index file"""
def __init__( self, axt_filename, index_filename=None, keep_open=False, species1 = None, species2=None, species_to_lengths=None, support_ids=False ):
if index_filename is None: index_filename = axt_filename + ".index"
self.indexes = interval_index_file.Indexes( filename=index_filename )
def __init__(self, axt_filename, index_filename=None, keep_open=False, species1=None, species2=None, species_to_lengths=None, support_ids=False):
if index_filename is None:
index_filename = axt_filename + ".index"
self.indexes = interval_index_file.Indexes(filename=index_filename)
self.axt_filename = axt_filename
# nota bene: (self.species1 = species1 or "species1") is incorrect if species1=""
self.species1 = species1
if (self.species1 == None): self.species1 = "species1"
if self.species1 is None:
self.species1 = "species1"
self.species2 = species2
if (self.species2 == None): self.species2 = "species2"
if self.species2 is None:
self.species2 = "species2"
self.species_to_lengths = species_to_lengths
self.support_ids = support_ids # for extra text at end of axt header lines
self.support_ids = support_ids # for extra text at end of axt header lines
if keep_open:
self.f = open( axt_filename )
self.f = open(axt_filename)
else:
self.f = None
def get( self, src, start, end ):
intersections = self.indexes.find( src, start, end )
return ( self.get_axt_at_offset( val ) for start, end, val in intersections )
def get(self, src, start, end):
intersections = self.indexes.find(src, start, end)
return (self.get_axt_at_offset(val) for start, end, val in intersections)
def get_axt_at_offset( self, offset ):
def get_axt_at_offset(self, offset):
if self.f:
self.f.seek( offset )
return read_next_axt( self.f, self.species1, self.species2, self.species_to_lengths, self.support_ids )
self.f.seek(offset)
return read_next_axt(self.f, self.species1, self.species2, self.species_to_lengths, self.support_ids)
else:
f = open( self.axt_filename )
f = open(self.axt_filename)
try:
f.seek( offset )
return read_next_axt( f, self.species1, self.species2, self.species_to_lengths, self.support_ids )
f.seek(offset)
return read_next_axt(f, self.species1, self.species2, self.species_to_lengths, self.support_ids)
finally:
f.close()
class Reader( Iterator ):
class Reader(Iterator):
"""Iterate over all axt blocks in a file in order"""
def __init__( self, file, species1 = None, species2=None, species_to_lengths=None, support_ids=False ):
def __init__(self, file, species1=None, species2=None, species_to_lengths=None, support_ids=False):
self.file = file
# nota bene: (self.species1 = species1 or "species1") is incorrect if species1=""
self.species1 = species1
if (self.species1 == None): self.species1 = "species1"
if self.species1 is None:
self.species1 = "species1"
self.species2 = species2
if (self.species2 == None): self.species2 = "species2"
if self.species2 is None:
self.species2 = "species2"
self.species_to_lengths = species_to_lengths
self.support_ids = support_ids # for extra text at end of axt header lines
self.support_ids = support_ids # for extra text at end of axt header lines
self.attributes = {}
def __next__( self ):
return read_next_axt( self.file, self.species1, self.species2, self.species_to_lengths, self.support_ids )
def __next__(self):
return read_next_axt(self.file, self.species1, self.species2, self.species_to_lengths, self.support_ids)
def __iter__( self ):
return ReaderIter( self )
def __iter__(self):
return ReaderIter(self)
def close( self ):
def close(self):
self.file.close()
class ReaderIter( Iterator ):
def __init__( self, reader ):
class ReaderIter(Iterator):
def __init__(self, reader):
self.reader = reader
def __iter__( self ):
def __iter__(self):
return self
def __next__( self ):
def __next__(self):
v = next(self.reader)
if not v: raise StopIteration
if not v:
raise StopIteration
return v
class Writer( object ):
def __init__( self, file, attributes={} ):
class Writer(object):
def __init__(self, file, attributes={}):
self.file = file
self.block = 0
self.src_split = True
if ("src_split" in attributes):
if "src_split" in attributes:
self.src_split = attributes["src_split"]
def write( self, alignment ):
if (len(alignment.components) != 2):
raise ValueError("%d-component alignment is not compatible with axt" % \
len(alignment.components))
def write(self, alignment):
if len(alignment.components) != 2:
raise ValueError(
"%d-component alignment is not compatible with axt" %
len(alignment.components))
c1 = alignment.components[0]
c2 = alignment.components[1]
......@@ -111,22 +130,23 @@ class Writer( object ):
c1 = c1.reverse_complement()
c2 = c2.reverse_complement()
if (self.src_split):
spec1,chr1 = src_split( c1.src )
spec2,chr2 = src_split( c2.src )
if self.src_split:
spec1, chr1 = src_split(c1.src)
spec2, chr2 = src_split(c2.src)
else:
chr1,chr2 = c1.src,c2.src
self.file.write( "%d %s %d %d %s %d %d %s %s\n" % \
(self.block,chr1,c1.start+1,c1.start+c1.size,
chr2,c2.start+1,c2.start+c2.size,c2.strand,
alignment.score))
self.file.write( "%s\n" % c1.text )
self.file.write( "%s\n" % c2.text )
self.file.write( "\n" )
chr1, chr2 = c1.src, c2.src
self.file.write(
"%d %s %d %d %s %d %d %s %s\n" %
(self.block, chr1, c1.start+1, c1.start+c1.size,
chr2, c2.start+1, c2.start+c2.size, c2.strand,
alignment.score))
self.file.write("%s\n" % c1.text)
self.file.write("%s\n" % c2.text)
self.file.write("\n")
self.block += 1
def close( self ):
def close(self):
self.file.close()
# ---- Helper methods ---------------------------------------------------------
......@@ -139,56 +159,63 @@ class Writer( object ):
# first species is always on plus strand
# when second species is on minus strand, start and stop are counted from sequence end
def read_next_axt( file, species1, species2, species_to_lengths=None, support_ids=False ):
line = readline( file, skip_blank=True )
if not line: return
def read_next_axt(file, species1, species2, species_to_lengths=None, support_ids=False):
line = readline(file, skip_blank=True)
if not line:
return
fields = line.split()
if (len(fields) < 9) or ((not support_ids) and (len(fields) > 9)):
if len(fields) < 9 or (not support_ids and len(fields) > 9):
raise ValueError("bad axt-block header: %s" % line)
attributes = {}
if (len(fields) > 9):
if len(fields) > 9:
attributes["id"] = "_".join(fields[9:])
seq1 = readline( file )
if not line or line.isspace(): raise ValueError("incomplete axt-block; header: %s" % line)
seq2 = readline( file )
if not line or line.isspace(): raise ValueError("incomplete axt-block; header: %s" % line)
seq1 = readline(file)
if not line or line.isspace():
raise ValueError("incomplete axt-block; header: %s" % line)
seq2 = readline(file)
if not line or line.isspace():
raise ValueError("incomplete axt-block; header: %s" % line)
# Build 2 component alignment
alignment = Alignment(attributes=attributes,species_to_lengths=species_to_lengths)
alignment = Alignment(attributes=attributes, species_to_lengths=species_to_lengths)
# Build component for species 1
component = Component()
component.src = fields[1]
if (species1 != ""): component.src = species1 + "." + component.src
component.start = int( fields[2] ) - 1 # (axt intervals are origin-1
end = int( fields[3] ) # and inclusive on both ends)
if species1 != "":
component.src = species1 + "." + component.src
component.start = int(fields[2]) - 1 # (axt intervals are origin-1
end = int(fields[3]) # and inclusive on both ends)
component.size = end - component.start
component.strand = "+"
component.text = seq1.strip()
alignment.add_component( component )
alignment.add_component(component)
# Build component for species 2
component = Component()
component.src = fields[4]
if (species2 != ""): component.src = species2 + "." + component.src
component.start = int( fields[5] ) - 1
end = int( fields[6] )
if species2 != "":
component.src = species2 + "." + component.src
component.start = int(fields[5]) - 1
end = int(fields[6])
component.size = end - component.start
component.strand = fields[7]
component.text = seq2.strip()
alignment.add_component( component )
alignment.add_component(component)
# add score
try:
alignment.score = int( fields[8] )
except:
alignment.score = int(fields[8])
except ValueError:
try:
alignment.score = float( fields[8] )
except:
alignment.score = float(fields[8])
except ValueError:
alignment.score = fields[8]
return alignment
def readline( file, skip_blank=False ):
def readline(file, skip_blank=False):
"""Read a line from provided file, skipping any blank or comment lines"""
while 1:
while True:
line = file.readline()
if not line: return None
if line[0] != '#' and not ( skip_blank and line.isspace() ):
if not line:
return None
if line[0] != '#' and not (skip_blank and line.isspace()):
return line
This diff is collapsed.
......@@ -4,18 +4,23 @@
from __future__ import with_statement
import logging
import re
import os
import re
from collections import namedtuple
from six.moves import cPickle
from ._epo import rem_dash, fastLoadChain, bed_union, cummulative_intervals
from ._epo import ( # noqa: F401
bed_union,
cummulative_intervals,
fastLoadChain,
rem_dash
)
log = logging.getLogger(__name__)
class Chain( namedtuple('Chain', 'score tName tSize tStrand tStart tEnd qName qSize qStrand qStart qEnd id') ):
class Chain(namedtuple('Chain', 'score tName tSize tStrand tStart tEnd qName qSize qStrand qStart qEnd id')):
"""A Chain header as in http://genome.ucsc.edu/goldenPath/help/chain.html
chain coordinates are with respect to the strand, so for example tStart on the + strand is the
......@@ -33,9 +38,9 @@ class Chain( namedtuple('Chain', 'score tName tSize tStrand tStart tEnd qName qS
:param line: header of a chain (in .chain format)
"""
assert type(line) == str, "this is a factory from string"
assert isinstance(line, str), "this is a factory from string"
line = line.rstrip().split()[1:] # the first component is the keyword "chain"
line = line.rstrip().split()[1:] # the first component is the keyword "chain"
tup = [t[0](t[1]) for t in zip([int, str, int, str, int, int, str, int, str, int, int, str], line)]
return tuple.__new__(cls, tup)
......@@ -62,9 +67,9 @@ class Chain( namedtuple('Chain', 'score tName tSize tStrand tStart tEnd qName qS
# size, target, query arrays
S, T, Q = [], [], []
#the target strand of the chain must be on the forward strand
trg_intervals = trg_comp.intervals(reverse = trg_comp.strand == '-')
qr_intervals = qr_comp.intervals(reverse = trg_comp.strand == '-')
# the target strand of the chain must be on the forward strand
trg_intervals = trg_comp.intervals(reverse=trg_comp.strand == '-')
qr_intervals = qr_comp.intervals(reverse=trg_comp.strand == '-')
if len(trg_intervals) == 0 or len(qr_intervals) == 0:
log.warning("deletion/insertion only intervals")
return None
......@@ -80,46 +85,51 @@ class Chain( namedtuple('Chain', 'score tName tSize tStrand tStart tEnd qName qS
# intervals are 0-base, halfo-open => lengths = coordinate difference
while A or B:
if a[1] < b[1]:
T.append(0); Q.append( A[0][0] - a[1] ); S.append( min(a[1], b[1]) - max(a[0], b[0]) )
T.append(0)
Q.append(A[0][0] - a[1])
S.append(min(a[1], b[1]) - max(a[0], b[0]))
a = A.pop(0)
elif b[1] < a[1]:
Q.append(0); T.append( B[0][0] - b[1] ); S.append( min(a[1], b[1]) - max(a[0], b[0]) )
Q.append(0)
T.append(B[0][0] - b[1])
S.append(min(a[1], b[1]) - max(a[0], b[0]))
b = B.pop(0)
elif A and B:
assert 1 > 2, "there are dash columns"
else:
break
S.append( min(a[1], b[1]) - max(a[0], b[0]) )
S.append(min(a[1], b[1]) - max(a[0], b[0]))
assert len(T) == len(Q) == len(S) - 1, "(S, T, Q) = (%d, %d, %d)" % tuple(map(len, (S, T, Q)))
tSize = trg_chrom_sizes[trg_comp.chrom]
qSize = qr_chrom_sizes[qr_comp.chrom]
## UCSC coordinates are 0-based, half-open and e! coordinates are 1-base, closed
## chain_start = epo_start - 1 and chain_end = epo_end
# UCSC coordinates are 0-based, half-open and e! coordinates are 1-base, closed
# chain_start = epo_start - 1 and chain_end = epo_end
if qr_comp.strand == '+':
chain = Chain(0,
trg_comp.chrom, tSize, "+",
(trg_comp.start - 1) + tr_start_correction, trg_comp.end - tr_end_correction,
qr_comp.chrom, qSize, (qr_comp.strand == trg_comp.strand and '+' or '-'),
(qr_comp.start - 1) + qr_start_correction, qr_comp.end - qr_end_correction,
qr_comp.gabid)
chain = Chain(
0, trg_comp.chrom, tSize, "+",
(trg_comp.start - 1) + tr_start_correction, trg_comp.end - tr_end_correction,
qr_comp.chrom, qSize, (qr_comp.strand == trg_comp.strand and '+' or '-'),
(qr_comp.start - 1) + qr_start_correction, qr_comp.end - qr_end_correction,
qr_comp.gabid)
else:
chain = Chain(0,
trg_comp.chrom, tSize, "+",
(trg_comp.start - 1) + tr_start_correction, trg_comp.end - tr_end_correction,
qr_comp.chrom, qSize, (qr_comp.strand == trg_comp.strand and '+' or '-'),
(qr_comp.start - 1) + qr_end_correction, qr_comp.end - qr_start_correction,
qr_comp.gabid)
chain = Chain(
0, trg_comp.chrom, tSize, "+",
(trg_comp.start - 1) + tr_start_correction, trg_comp.end - tr_end_correction,
qr_comp.chrom, qSize, (qr_comp.strand == trg_comp.strand and '+' or '-'),
(qr_comp.start - 1) + qr_end_correction, qr_comp.end - qr_start_correction,
qr_comp.gabid)
# strand correction. in UCSC coordinates this is: size - coord
if chain.qStrand == '-':
chain = chain._replace(qEnd = chain.qSize - chain.qStart,
qStart = chain.qSize - chain.qEnd)
assert chain.tEnd - chain.tStart == sum(S) + sum(T), "[%s] %d != %d" % (str(chain),
chain.tEnd - chain.tStart, sum(S) + sum(T))
assert chain.qEnd - chain.qStart == sum(S) + sum(Q), "[%s] %d != %d" % (str(chain),
chain.qEnd - chain.qStart, sum(S) + sum(Q))
chain = chain._replace(
qEnd=chain.qSize - chain.qStart,
qStart=chain.qSize - chain.qEnd)
assert chain.tEnd - chain.tStart == sum(S) + sum(T), "[%s] %d != %d" % (
str(chain), chain.tEnd - chain.tStart, sum(S) + sum(T))
assert chain.qEnd - chain.qStart == sum(S) + sum(Q), "[%s] %d != %d" % (
str(chain), chain.qEnd - chain.qStart, sum(S) + sum(Q))
return chain, S, T, Q
def slice(self, who):
......@@ -158,15 +168,15 @@ class Chain( namedtuple('Chain', 'score tName tSize tStrand tStart tEnd qName qS
fname = path[:-3]
if fname.endswith('.pkl'):
#you asked for the pickled file. I'll give it to you
# you asked for the pickled file. I'll give it to you
log.debug("loading pickled file %s ..." % fname)
return cPickle.load( open(fname, "rb") )
return cPickle.load(open(fname, "rb"))
elif os.path.isfile("%s.pkl" % fname):
#there is a cached version I can give to you
# there is a cached version I can give to you
log.info("loading pickled file %s.pkl ..." % fname)
if os.stat(path).st_mtime > os.stat("%s.pkl" % fname).st_mtime:
log.critical("*** pickled file %s.pkl is not up to date ***" % (path))
return cPickle.load( open("%s.pkl" % fname, "rb") )
return cPickle.load(open("%s.pkl" % fname, "rb"))
data = fastLoadChain(path, cls._strfactory)
if pickle and not os.path.isfile('%s.pkl' % fname):
......@@ -175,14 +185,16 @@ class Chain( namedtuple('Chain', 'score tName tSize tStrand tStart tEnd qName qS
cPickle.dump(data, fd)
return data
class EPOitem(namedtuple('Epo_item', 'species gabid chrom start end strand cigar')):
"this format is how alignments are delivered from e!"
__slots__ = ()
cigar_pattern = re.compile("(\d*)([MD])")
cigar_pattern = re.compile(r"(\d*)([MD])")
def __repr__(self): return str(self)
def __repr__(self):
return str(self)
def __str__(self):
c = self.cigar[:5] + "..." + self.cigar[-5:]
......@@ -198,12 +210,11 @@ class EPOitem(namedtuple('Epo_item', 'species gabid chrom start end strand cigar
chrom = cmp[2]
if not chrom.startswith("chr"):
chrom = "chr%s" % chrom
instance = tuple.__new__(cls,
(cmp[0], cmp[1],
chrom, int(cmp[3]), int(cmp[4]),
{'1' : '+', '-1' : '-'}[cmp[5]], cmp[6]))
instance = tuple.__new__(
cls,
(cmp[0], cmp[1], chrom, int(cmp[3]), int(cmp[4]), {'1': '+', '-1': '-'}[cmp[5]], cmp[6]))
span = instance.end - instance.start + 1
m_num = sum( (t[1] == "M" and [t[0]] or [0])[0] for t in instance.cigar_iter(False) )
m_num = sum((t[1] == "M" and [t[0]] or [0])[0] for t in instance.cigar_iter(False))
if span != m_num:
log.warning("[{gabid}] {species}.{chrom}:{start}-{end}.".format(**instance._asdict()) + "(span) %d != %d (matches)" % (span, m_num))
return None
......@@ -219,7 +230,7 @@ class EPOitem(namedtuple('Epo_item', 'species gabid chrom start end strand cigar
with open(fname) as fd:
for el in (cls._strfactory(_) for _ in fd):
if el:
data.setdefault(el.gabid, []).append( el )
data.setdefault(el.gabid, []).append(el)
log.info("parsed %d elements from %s" % (len(data), fname))
return data
......@@ -242,8 +253,8 @@ class EPOitem(namedtuple('Epo_item', 'species gabid chrom start end strand cigar
parsed_cigar = parsed_cigar[::-1]
for _l, t in parsed_cigar:
# 1M is encoded as M
l = (_l and int(_l) or 1) # int(_l) cannot be 0
data.append( (l, t) )
l = (_l and int(_l) or 1) # int(_l) cannot be 0
data.append((l, t))
return data
def intervals(self, reverse, thr=0):
......@@ -259,22 +270,20 @@ class EPOitem(namedtuple('Epo_item', 'species gabid chrom start end strand cigar
:return: list of pairs"""
d = [(thr,thr)]
d = [(thr, thr)]
dl = 0
for tup in self.cigar_iter(reverse):
if tup[1] == "D":
dl = tup[0]
else:
s = d[-1][1] + dl
d.append( (s, s+tup[0]) )
d.append((s, s+tup[0]))
assert d[0] == (thr, thr)
# assert that nr. of Ms in the interval == sum of produced intervals
assert sum( t[0] for t in self.cigar_iter(False) if t[1] == "M" ) == sum( t[1]-t[0] for t in d )
d_sum = sum( t[1]-t[0] for t in d )
assert self.end - self.start + 1 == d_sum, "[ (%d, %d) = %d ] != %d" % (self.start, self.end,
self.end-self.start+1, d_sum)
return d[1:] #clip the (thr, thr) entry
assert sum(t[0] for t in self.cigar_iter(False) if t[1] == "M") == sum(t[1]-t[0] for t in d)
d_sum = sum(t[1]-t[0] for t in d)
assert self.end - self.start + 1 == d_sum, "[ (%d, %d) = %d ] != %d" % (
self.start, self.end, self.end-self.start+1, d_sum)
return d[1:] # clip the (thr, thr) entry
"tests for bx.align.epo"
import unittest, logging, pdb
import pdb
import random
import unittest
import numpy as np
from bx.align._epo import cummulative_intervals, bed_union
from bx.align.epo import *
from bx.align._epo import (
bed_union,
cummulative_intervals,
)
from bx.align.epo import (
Chain,
EPOitem
)
class TestBed( unittest.TestCase ):
class TestBed(unittest.TestCase):
def setUp(self):
self.N = random.randint(1, 1000)
def test_ci(self):
S, D = [], []
for i in range(self.N):
S.append( random.randint(10, 50) )
D.append( random.randint(10, 50) )
S.append(random.randint(10, 50))
D.append(random.randint(10, 50))
D[-1] = 0
C = cummulative_intervals(np.array(S, dtype=np.int64),
np.array(D, dtype=np.int64))
C = cummulative_intervals(np.array(S, dtype=np.int64), np.array(D, dtype=np.int64))
for i in range(self.N):
assert C[i,1] - C[i,0] == S[i]
assert C[i, 1] - C[i, 0] == S[i]
for i in range(1, self.N):
assert C[i,0] - C[i-1,1] == D[i-1], "[%d] %d != %d" % (i, C[i,0] - C[i-1,1], D[i-1])
assert C[i, 0] - C[i-1, 1] == D[i-1], "[%d] %d != %d" % (i, C[i, 0] - C[i-1, 1], D[i-1])
def test_elem_u(self):
# back to back, so should return a single interval
......@@ -30,75 +38,68 @@ class TestBed( unittest.TestCase ):
th = 0
for i in range(self.N):
size = random.randint(1, 20)
EL.append( (th, th+size) )
EL.append((th, th+size))
th += size
U = bed_union( np.array(EL, dtype=np.uint64) )
assert U[0,0] == 0 and U[0,1] == th
U = bed_union(np.array(EL, dtype=np.uint64))
assert U[0, 0] == 0 and U[0, 1] == th
# disjoint
EL = []
th = 0
for i in range(self.N):
size = random.randint(1, 20)
EL.append( (th, th+size) )
EL.append((th, th+size))
th += (size + 1)
U = bed_union( np.array(EL, dtype=np.uint64) )
U = bed_union(np.array(EL, dtype=np.uint64))
for i in range(U.shape[0]):
assert (U[i,0], U[i,1]) == EL[i]
assert (U[i, 0], U[i, 1]) == EL[i]
# random with some empty elements
EL = []
th = 0
for i in range(self.N):
size = random.randint(1, 20)
EL.append( (th, th+size) )
th += random.randint(1, size+size) #50% of overlapping
U = bed_union( np.array(EL, dtype=np.uint64) )
EL.append((th, th+size))
th += random.randint(1, size+size) # 50% of overlapping
U = bed_union(np.array(EL, dtype=np.uint64))
assert U[0,1] > U[0,0]
assert U[0, 1] > U[0, 0]
for i in range(1, U.shape[0]):
assert U[i,1] > U[i,0]
assert U[i,0] > U[i-1,1]
cigar_pairs = [
("GGACCTGGAGAGATCAG---------------------------GACTTCAACTGTGTG-------------TCTTAGACTGGG--------AGGGTGTTA",
"AGGCCAGGAGAGATCAGGTAAGTCTTAATTTAATAAAGAGATAGGACCTGAACTGTGTCTAACAATAGGTAATATTAGACTGGGGGAGAGAGAAGACTTTC"),
("TTT--------------------------------------------------------------------------------------------------------------------T",
"CTTGTACCAAGGACAGTACTGGCAGCCTAATTGCTAACACTTTGTGGTGGATTGGTCCACTCAATATTTGTTCCCACCTCTTTTCAGTCCAGTTCTATAAAGGACAGAAAGTTGAAAACT"),
("A-------------------------------------------------ACACTGGACACAGCACTAACACGATTACTTA",
"ACATTTCCCACACTCCCTTGCAGCTAGGTTTCTAGATATAATTTAGATTCCA----------------------------A"),
("TTTGGTCCTCTGGA------CGAGCAGCCAGTGCT---------------------------------------------------------------------------AAAAAAAA",
"T---CATTCTAGCAGGTGCTGCAGCAGCAGGTAGCCCTGGAGCCAACAGTTGTGGCTATGATTCTTGATCATCAGATTTGGCTCAAGTGATGTGTTCCTCTAGCATGCACTTGAGATA"),
assert U[i, 1] > U[i, 0]
assert U[i, 0] > U[i-1, 1]
("G-----------------------C----------------------------------------------------------------------------------------A",
"GGCCTGCACTGCCAGTAATTTTAACAAATTTTTAGGCACTGAATTCCCTGTATTAAATCTGTTTTCCTTAGCGTAAACAGATCTCTGTTAAATGAAACTAAACCCTGACTGATA"),
("TATT----------------------------------T",
"TCCTTCATTTTATTTCTCCCTTAAAATTTTTTTTATTACT"),
("TAAAAA--A------A------------------------------------------------------------TTTTTTTTTTT",
"T---AATTATTTTGCAGCAGGTCCTTGATAACATATCATCTATAAATATTTCAGCAAGAATCTCTAAAAGGCAAGAACCTCCTTCTT"),
("AAACAA---------------------------------------TT---T",
"AAACAATACCACTGCATCACTATCAAACCCAAAAAATAACAAAAATTGGGT"),
("TCTTAAC---TGCTGAGCCATCCCTCCAGCTCCTGTTTTATTTTTATTATGAAGTAATAATA--ATAG--TAATAATAATGATG",
"TACACTTAATTCTAAAACTTGTTATGAATCATCA----------TTGG--TTTTTTATTGTGAAGAACTAATATAATCAGA--G"),
("ATGATAATGGTATCCTAGCTCAACACCTG-GAGTTCACCCCAACAGTTAACTAA----GTTTGAGGAAGTGTTAACAAGCCTA---ACAAAGAGGACATGCCAATAGCTGACAGAGTCAC",
"A-------CCTCTGCTAGCTCAACTCCTGAGAATCAATTATATAAGCTAGGTCAGTGGTTTTGAGAAAGTATTAGTAGACATTTCTCCAAAGAATACATAAAAATGGCC-A--CAAGTAT")
cigar_pairs = [
("GGACCTGGAGAGATCAG---------------------------GACTTCAACTGTGTG-------------TCTTAGACTGGG--------AGGGTGTTA",
"AGGCCAGGAGAGATCAGGTAAGTCTTAATTTAATAAAGAGATAGGACCTGAACTGTGTCTAACAATAGGTAATATTAGACTGGGGGAGAGAGAAGACTTTC"),
("TTT--------------------------------------------------------------------------------------------------------------------T",
"CTTGTACCAAGGACAGTACTGGCAGCCTAATTGCTAACACTTTGTGGTGGATTGGTCCACTCAATATTTGTTCCCACCTCTTTTCAGTCCAGTTCTATAAAGGACAGAAAGTTGAAAACT"),
("A-------------------------------------------------ACACTGGACACAGCACTAACACGATTACTTA",
"ACATTTCCCACACTCCCTTGCAGCTAGGTTTCTAGATATAATTTAGATTCCA----------------------------A"),
("TTTGGTCCTCTGGA------CGAGCAGCCAGTGCT---------------------------------------------------------------------------AAAAAAAA",
"T---CATTCTAGCAGGTGCTGCAGCAGCAGGTAGCCCTGGAGCCAACAGTTGTGGCTATGATTCTTGATCATCAGATTTGGCTCAAGTGATGTGTTCCTCTAGCATGCACTTGAGATA"),
("G-----------------------C----------------------------------------------------------------------------------------A",
"GGCCTGCACTGCCAGTAATTTTAACAAATTTTTAGGCACTGAATTCCCTGTATTAAATCTGTTTTCCTTAGCGTAAACAGATCTCTGTTAAATGAAACTAAACCCTGACTGATA"),
("TATT----------------------------------T",
"TCCTTCATTTTATTTCTCCCTTAAAATTTTTTTTATTACT"),
("TAAAAA--A------A------------------------------------------------------------TTTTTTTTTTT",
"T---AATTATTTTGCAGCAGGTCCTTGATAACATATCATCTATAAATATTTCAGCAAGAATCTCTAAAAGGCAAGAACCTCCTTCTT"),
("AAACAA---------------------------------------TT---T",
"AAACAATACCACTGCATCACTATCAAACCCAAAAAATAACAAAAATTGGGT"),
("TCTTAAC---TGCTGAGCCATCCCTCCAGCTCCTGTTTTATTTTTATTATGAAGTAATAATA--ATAG--TAATAATAATGATG",
"TACACTTAATTCTAAAACTTGTTATGAATCATCA----------TTGG--TTTTTTATTGTGAAGAACTAATATAATCAGA--G"),
("ATGATAATGGTATCCTAGCTCAACACCTG-GAGTTCACCCCAACAGTTAACTAA----GTTTGAGGAAGTGTTAACAAGCCTA---ACAAAGAGGACATGCCAATAGCTGACAGAGTCAC",
"A-------CCTCTGCTAGCTCAACTCCTGAGAATCAATTATATAAGCTAGGTCAGTGGTTTTGAGAAAGTATTAGTAGACATTTCTCCAAAGAATACATAAAAATGGCC-A--CAAGTAT")
]
def toCigar(species, id, s):
I = [(0,0)]
I = [(0, 0)]
L = [len(_) for _ in s.split("-")]
NZ = [_ for _ in L if _]
if L[0] > 0:
I.append( (0, L[0]) )
I.append((0, L[0]))
NZ = NZ[1:]
L = L[1:]
......@@ -106,13 +107,12 @@ def toCigar(species, id, s):
L.insert(0, 0)
size = NZ[i]
start = L.index(size)
I.append( (I[-1][1] + start, I[-1][1]+start+size) )
I.append((I[-1][1] + start, I[-1][1]+start+size))
L = L[start+1:]
if len(L):
I.append((I[-1][1] + len(L), I[-1][1] + len(L)))
#print I[1:]
C = []
for i in range(1,len(I)):
for i in range(1, len(I)):
dl = I[i][0] - I[i-1][1]
ml = I[i][1] - I[i][0]
......@@ -124,38 +124,34 @@ def toCigar(species, id, s):
if ml:
mc = (ml > 1 and str(ml) or "") + "M"
C.append( dc+mc )
C.append(dc+mc)
MSUM = sum(i[1]-i[0] for i in I)
start = random.randint(50, 10000)
return "%s\t%d\t1\t%d\t%d\t%d\t%s" % (species, id, start, start+MSUM-1,
random.choice((-1,1)), "".join(C))
return "%s\t%d\t1\t%d\t%d\t%d\t%s" % (species, id, start, start+MSUM-1, random.choice((-1, 1)), "".join(C))
class TestEpo( unittest.TestCase ):
class TestEpo(unittest.TestCase):
def setUp(self):
self.epo_records = []
for i, (t,q) in enumerate(cigar_pairs):
for i, (t, q) in enumerate(cigar_pairs):
gab_pair = (toCigar("homo_sapiens", i, t), toCigar("mus_musculus", i, q))
A = EPOitem._strfactory(gab_pair[0])
B = EPOitem._strfactory(gab_pair[1])
if A and B:
self.epo_records.append( (A,B) )
self.epo_records.append((A, B))
def test_out(self):
def ch(c, ci):
th = 0
for l,t in ci:
for l, t in ci:
if t == 'M':
assert c[th:th+l].find('-') == -1
else:
assert c[th:th+l] == '-' * l
th += l
for (a,b) in self.epo_records:
for (a, b) in self.epo_records:
ca, cb = cigar_pairs[int(a.gabid)]
#if a.strand == '-': ca = ca[::-1]
#if b.strand == '-': cb = cb[::-1]
ch(ca, a.cigar_iter(False))
ch(cb, b.cigar_iter(False))
......@@ -164,11 +160,11 @@ class TestEpo( unittest.TestCase ):
return cigar[s:e].find('-') == -1
for p in self.epo_records:
chain = Chain._make_from_epo(p[0], p[1], {"chr1" : 500}, {"chr1" : 800})
chain = Chain._make_from_epo(p[0], p[1], {"chr1": 500}, {"chr1": 800})
if not chain:
continue
ch, S, T, Q = chain
i = int( ch.id )
i = int(ch.id)
c1, c2 = cigar_pairs[i]
if p[0].strand == '-':
c1 = c1[::-1]
......@@ -182,7 +178,7 @@ class TestEpo( unittest.TestCase ):
cch(c1, th+s, th+s+t) and c1[th+s:th+s+t] == '-'*t
else:
cch(c2, th+s, th+s+q) and c1[th+s:th+s+q] == '-'*q
th = th + s + max(t,q)
th = th + s + max(t, q)
def test_rem_dash(self):
# ****--****-------**** 4M2D4M7D4M
......@@ -195,9 +191,10 @@ class TestEpo( unittest.TestCase ):
dash_cols = random.randint(0, 10)
tStart = random.randint(0, 1000)
qStart = random.randint(0, 1000)
epo_pair = (EPOitem._strfactory("homo_sapiens\t0\t1\t%d\t%d\t1\t%s" % (tStart, tStart+12-1, "4M2D4M%dD4M" % (dash_cols+3))),
EPOitem._strfactory("mus_musculus\t0\t1\t%d\t%d\t1\t%s" % (qStart, qStart+14-1, "7M%dD7M" % (dash_cols+3))))
chain = Chain._make_from_epo(epo_pair[0], epo_pair[1], {"chr1":500}, {"chr1":800})
epo_pair = (
EPOitem._strfactory("homo_sapiens\t0\t1\t%d\t%d\t1\t%s" % (tStart, tStart+12-1, "4M2D4M%dD4M" % (dash_cols+3))),
EPOitem._strfactory("mus_musculus\t0\t1\t%d\t%d\t1\t%s" % (qStart, qStart+14-1, "7M%dD7M" % (dash_cols+3))))
chain = Chain._make_from_epo(epo_pair[0], epo_pair[1], {"chr1": 500}, {"chr1": 800})
ti = epo_pair[0].intervals(False)
qi = epo_pair[1].intervals(False)
assert ti[2][0] - ti[1][1] - dash_cols == chain[2][1]
......@@ -218,11 +215,10 @@ class TestEpo( unittest.TestCase ):
tStart = random.randint(0, 1000)
qStart = random.randint(0, 1000)
epo_pair = (EPOitem._strfactory("homo_sapiens\t0\t1\t%d\t%d\t1\t%s" % (tStart, tStart+tm-1,
"%dD%dM" % (dash_cols+1, tm))),
EPOitem._strfactory("mus_musculus\t0\t1\t%d\t%d\t1\t%s" % (qStart, qStart+qm+1-1,
"M%dD%dM" % (dash_cols+tm-qm, qm))))
chain = Chain._make_from_epo(epo_pair[0], epo_pair[1], {"chr1":500}, {"chr1":800})
epo_pair = (
EPOitem._strfactory("homo_sapiens\t0\t1\t%d\t%d\t1\t%s" % (tStart, tStart+tm-1, "%dD%dM" % (dash_cols+1, tm))),
EPOitem._strfactory("mus_musculus\t0\t1\t%d\t%d\t1\t%s" % (qStart, qStart+qm+1-1, "M%dD%dM" % (dash_cols+tm-qm, qm))))
chain = Chain._make_from_epo(epo_pair[0], epo_pair[1], {"chr1": 500}, {"chr1": 800})
if chain[1][-1] != qm:
pdb.set_trace()
assert chain[1][-1] == qm
......
This diff is collapsed.
......@@ -3,12 +3,12 @@ Tests for `bx.align.lav`.
"""
import unittest
import sys
import bx.align as align
import bx.align.lav as lav
test_lav = "test_data/lav_tests/apple_orange.lav"
class lavTestCase(unittest.TestCase):
def testReader(self):
......@@ -18,25 +18,26 @@ class lavTestCase(unittest.TestCase):
a = next(reader)
assert a.score == 10286, "a.score is wrong: %s" % a.score
assert len(a.components) == 2
check_component(a.components[0], "apple", 106, 252, "+", 411, "GTCCGGCCGGCTGAGAGCTACAATACACATGCACGCAGTTTGGCCACTCACATTAAGTATATGAGGAAGGGTTAGCATGAGTTGTACTATAAGGCAGCGGATAGCAGGTTGTGGAAAAATATCCTCCCGATTCAAATCCCCAGGTGCCTAAA----------------GTAGGGCCGGTAGTTGAATGCTTGCCTGTCAGACTGGATGACCAAGTTCAGTATCAACACAATATAGTGCCAGGAGCTAATTGTTCCCCAGCAGCGTGAC")
check_component(a.components[1], "lav_tests.orange", 53, 252, "+", 361, "GTCCGGCCGGCTGTGTGCTACAATACACGTTCACGCAGTTTGGCCAATCACTTTAAGTATATACGAAATGGTTACCATGAGTTGTACTGTAAGGCAGCGGAAAGC---TTGTTAA--------CTCCTGGGCGACATT----GGGGCTGCAACATCGTTTATCCTCCTCTACAACCAATAGCTG-TTGCTTCTTGGTTCAAGTATATCCCATGGATTAGTATCAACACGATATAGTGTCAGGAGCTAATTGTTCCCCAGCAGCGTGAC")
check_component(a.components[0], "apple", 106, 252, "+", 411, "GTCCGGCCGGCTGAGAGCTACAATACACATGCACGCAGTTTGGCCACTCACATTAAGTATATGAGGAAGGGTTAGCATGAGTTGTACTATAAGGCAGCGGATAGCAGGTTGTGGAAAAATATCCTCCCGATTCAAATCCCCAGGTGCCTAAA----------------GTAGGGCCGGTAGTTGAATGCTTGCCTGTCAGACTGGATGACCAAGTTCAGTATCAACACAATATAGTGCCAGGAGCTAATTGTTCCCCAGCAGCGTGAC")
check_component(a.components[1], "lav_tests.orange", 53, 252, "+", 361, "GTCCGGCCGGCTGTGTGCTACAATACACGTTCACGCAGTTTGGCCAATCACTTTAAGTATATACGAAATGGTTACCATGAGTTGTACTGTAAGGCAGCGGAAAGC---TTGTTAA--------CTCCTGGGCGACATT----GGGGCTGCAACATCGTTTATCCTCCTCTACAACCAATAGCTG-TTGCTTCTTGGTTCAAGTATATCCCATGGATTAGTATCAACACGATATAGTGTCAGGAGCTAATTGTTCCCCAGCAGCGTGAC")
a = next(reader)
assert a.score == 3586, "a.score is wrong: %s" % a.score
assert len(a.components) == 2
check_component(a.components[0], "apple", 52, 72, "+", 411, "TGCATATCGACTATTACAGCCACGCGAGTTACATTCCTCTTTTTTTTTGCTGGCGTCCGGCCGGCTGAGAGC")
check_component(a.components[1], "lav_tests.orange", 2, 72, "-", 361, "TGCATATCGACTAGTACAGCCTCTCGAGTTACCCCCCCCATTCCTCTTGCTGACGTCACGCTGCTGGGGAAC")
check_component(a.components[0], "apple", 52, 72, "+", 411, "TGCATATCGACTATTACAGCCACGCGAGTTACATTCCTCTTTTTTTTTGCTGGCGTCCGGCCGGCTGAGAGC")
check_component(a.components[1], "lav_tests.orange", 2, 72, "-", 361, "TGCATATCGACTAGTACAGCCTCTCGAGTTACCCCCCCCATTCCTCTTGCTGACGTCACGCTGCTGGGGAAC")
a = next(reader)
assert a is None
reader.close()
def check_component( c, src, start, size, strand, src_size, text ):
#..print "\"%s\" == \"%s\"" % (c.src,src)
assert c.src == src, "c.src = %s (expected %s)" % (c.src, src)
assert c.start == start, "c.start = %s (expected %s)" % (c.start, start)
assert c.size == size, "c.size = %s (expected %s)" % (c.size, size)
assert c.strand == strand, "c.strand = %s (expected %s)" % (c.strand, strand)
assert c.src_size == src_size, "c.src_size = %s (expected %s)" % (c.src_size,src_size)
assert c.text == text, "c.text = \"%s\" (expected \"%s\")" % (c.text, text)
def check_component(c, src, start, size, strand, src_size, text):
# ..print "\"%s\" == \"%s\"" % (c.src,src)
assert c.src == src, "c.src = %s (expected %s)" % (c.src, src)
assert c.start == start, "c.start = %s (expected %s)" % (c.start, start)
assert c.size == size, "c.size = %s (expected %s)" % (c.size, size)
assert c.strand == strand, "c.strand = %s (expected %s)" % (c.strand, strand)
assert c.src_size == src_size, "c.src_size = %s (expected %s)" % (c.src_size, src_size)
assert c.text == text, "c.text = \"%s\" (expected \"%s\")" % (c.text, text)
......@@ -4,15 +4,19 @@ Support for the `MAF`_ multiple sequence alignment format used by `multiz`_.
.. _MAF: http://genome.ucsc.edu/FAQ/FAQformat.html#format5
.. _multiz: http://www.bx.psu.edu/miller_lab/
"""
import itertools
from io import TextIOWrapper
import os
from six import Iterator, StringIO, PY3
from six import (
Iterator,
PY3,
StringIO,
)
from bx import interval_index_file
from bx.align import *
from bx.misc.seekbzip2 import SeekableBzip2File
from bx.align import (
Alignment,
Component
)
MAF_INVERSE_STATUS = 'V'
MAF_INSERT_STATUS = 'I'
......@@ -24,16 +28,18 @@ MAF_MAYBE_NEW_STATUS = 'S'
MAF_MAYBE_NEW_NESTED_STATUS = 's'
MAF_MISSING_STATUS = 'M'
class MAFIndexedAccess( interval_index_file.AbstractIndexedAccess ):
class MAFIndexedAccess(interval_index_file.AbstractIndexedAccess):
"""
Indexed access to a MAF file.
"""
def read_at_current_offset( self, file, **kwargs ):
def read_at_current_offset(self, file, **kwargs):
"""
Read the MAF block at the current position in `file` and return an
instance of `Alignment`.
"""
return read_next_maf( file, **kwargs )
return read_next_maf(file, **kwargs)
def open_data(self):
data = super(MAFIndexedAccess, self).open_data()
......@@ -41,108 +47,125 @@ class MAFIndexedAccess( interval_index_file.AbstractIndexedAccess ):
return TextIOWrapper(data, encoding="ascii")
return data
class MAFMultiIndexedAccess( interval_index_file.AbstractMultiIndexedAccess ):
class MAFMultiIndexedAccess(interval_index_file.AbstractMultiIndexedAccess):
"""
Indexed access to multiple MAF files.
"""
indexed_access_class = MAFIndexedAccess
Indexed = MAFIndexedAccess
"""Deprecated: `MAFIndexedAccess` is also available under the name `Indexed`."""
MultiIndexed = MAFMultiIndexedAccess
"""Deprecated: `MAFMultiIndexedAccess` is also available under the name `MultiIndexed`."""
class Reader( Iterator ):
class Reader(Iterator):
"""
Iterate over all maf blocks in a file in order
"""
def __init__( self, file, **kwargs ):
def __init__(self, file, **kwargs):
self.file = file
self.maf_kwargs = kwargs
# Read and verify maf header, store any attributes
fields = self.file.readline().split()
if fields[0] != '##maf': raise Exception("File does not have MAF header")
self.attributes = parse_attributes( fields[1:] )
if fields[0] != '##maf':
raise Exception("File does not have MAF header")
self.attributes = parse_attributes(fields[1:])
def __next__( self ):
return read_next_maf( self.file, **self.maf_kwargs )
def __next__(self):
return read_next_maf(self.file, **self.maf_kwargs)
def __iter__( self ):
return ReaderIter( self )
def __iter__(self):
return ReaderIter(self)
def close( self ):
def close(self):
self.file.close()
class ReaderIter( Iterator ):
class ReaderIter(Iterator):
"""
Adapts a `Reader` to the iterator protocol.
"""
def __init__( self, reader ):
def __init__(self, reader):
self.reader = reader
def __iter__( self ):
def __iter__(self):
return self
def __next__( self ):
def __next__(self):
v = next(self.reader)
if not v: raise StopIteration
if not v:
raise StopIteration
return v
class Writer( object ):
def __init__( self, file, attributes={} ):
class Writer(object):
def __init__(self, file, attributes={}):
self.file = file
# Write header, Webb's maf code wants version first, we accomodate
if 'version' not in attributes: attributes['version'] = 1
self.file.write( "##maf version=%s" % attributes['version'] )
for key in attributes:
if key == 'version': continue
self.file.writelines( " %s=%s" % ( key, attributes[key] ) )
self.file.write( "\n" )
def write( self, alignment ):
self.file.write( "a score=" + str( alignment.score ) )
if 'version' not in attributes:
attributes['version'] = 1
self.file.write("##maf version=%s" % attributes['version'])
for key in attributes:
if key == 'version':
continue
self.file.writelines(" %s=%s" % (key, attributes[key]))
self.file.write("\n")
def write(self, alignment):
self.file.write("a score=" + str(alignment.score))
for key in alignment.attributes:
self.file.write( " %s=%s" % ( key, alignment.attributes[key] ) )
self.file.write( "\n" )
self.file.write(" %s=%s" % (key, alignment.attributes[key]))
self.file.write("\n")
# Components
rows = []
for c in alignment.components:
# "Empty component" generates an 'e' row
if c.empty:
rows.append( ( "e", c.src, str( c.start ), str( c.size ), c.strand, str( c.src_size ), c.synteny_empty ) )
rows.append(("e", c.src, str(c.start), str(c.size), c.strand, str(c.src_size), c.synteny_empty))
continue
# Regular component
rows.append( ( "s", c.src, str( c.start ), str( c.size ), c.strand, str( c.src_size ), c.text ) )
rows.append(("s", c.src, str(c.start), str(c.size), c.strand, str(c.src_size), c.text))
# If component has quality, write a q row
if c.quality is not None:
rows.append( ( "q", c.src, "", "", "", "", c.quality ) )
rows.append(("q", c.src, "", "", "", "", c.quality))
# If component has synteny follow up with an 'i' row
if c.synteny_left and c.synteny_right:
rows.append( ( "i", c.src, "", "", "", "", " ".join( map( str, c.synteny_left + c.synteny_right ) ) ) )
self.file.write( format_tabular( rows, "llrrrrl" ) )
self.file.write( "\n" )
rows.append(("i", c.src, "", "", "", "", " ".join(map(str, c.synteny_left + c.synteny_right))))
self.file.write(format_tabular(rows, "llrrrrl"))
self.file.write("\n")
def close( self ):
def close(self):
self.file.close()
# ---- Helper methods -------------------------------------------------------
def from_string( string, **kwargs ):
return read_next_maf( StringIO( string ), **kwargs )
def read_next_maf( file, species_to_lengths=None, parse_e_rows=False ):
def from_string(string, **kwargs):
return read_next_maf(StringIO(string), **kwargs)
def read_next_maf(file, species_to_lengths=None, parse_e_rows=False):
"""
Read the next MAF block from `file` and return as an `Alignment`
instance. If `parse_i_rows` is true, empty components will be created
Read the next MAF block from `file` and return as an `Alignment`
instance. If `parse_i_rows` is true, empty components will be created
when e rows are encountered.
"""
alignment = Alignment(species_to_lengths=species_to_lengths)
# Attributes line
line = readline( file, skip_blank=True )
if not line: return None
fields = line.split()
if fields[0] != 'a': raise Exception("Expected 'a ...' line")
alignment.attributes = parse_attributes( fields[1:] )
line = readline(file, skip_blank=True)
if not line:
return None
fields = line.split()
if fields[0] != 'a':
raise Exception("Expected 'a ...' line")
alignment.attributes = parse_attributes(fields[1:])
if 'score' in alignment.attributes:
alignment.score = alignment.attributes['score']
del alignment.attributes['score']
......@@ -150,87 +173,93 @@ def read_next_maf( file, species_to_lengths=None, parse_e_rows=False ):
alignment.score = 0
# Sequence lines
last_component = None
while 1:
line = readline( file )
while True:
line = readline(file)
# EOF or Blank line terminates alignment components
if not line or line.isspace(): break
if line.isspace(): break
if not line or line.isspace():
break
if line.isspace():
break
# Parse row
fields = line.split()
if fields[0] == 's':
# An 's' row contains sequence for a component
component = Component()
component.src = fields[1]
component.start = int( fields[2] )
component.size = int( fields[3] )
component.start = int(fields[2])
component.size = int(fields[3])
component.strand = fields[4]
component.src_size = int( fields[5] )
if len(fields) > 6: component.text = fields[6].strip()
component.src_size = int(fields[5])
if len(fields) > 6:
component.text = fields[6].strip()
# Add to set
alignment.add_component( component )
alignment.add_component(component)
last_component = component
elif fields[0] == 'e':
# An 'e' row, when no bases align for a given species this tells
# us something about the synteny
# us something about the synteny
if parse_e_rows:
component = Component()
component.empty = True
component.src = fields[1]
component.start = int( fields[2] )
component.size = int( fields[3] )
component.start = int(fields[2])
component.size = int(fields[3])
component.strand = fields[4]
component.src_size = int( fields[5] )
component.src_size = int(fields[5])
component.text = None
synteny = fields[6].strip()
assert len( synteny ) == 1, \
assert len(synteny) == 1, \
"Synteny status in 'e' rows should be denoted with a single character code"
component.synteny_empty = synteny
alignment.add_component( component )
alignment.add_component(component)
last_component = component
elif fields[0] == 'i':
# An 'i' row, indicates left and right synteny status for the
# An 'i' row, indicates left and right synteny status for the
# previous component, we hope ;)
assert fields[1] == last_component.src, "'i' row does not follow matching 's' row"
last_component.synteny_left = ( fields[2], int( fields[3] ) )
last_component.synteny_right = ( fields[4], int( fields[5] ) )
last_component.synteny_left = (fields[2], int(fields[3]))
last_component.synteny_right = (fields[4], int(fields[5]))
elif fields[0] == 'q':
assert fields[1] == last_component.src, "'q' row does not follow matching 's' row"
# TODO: Should convert this to an integer array?
last_component.quality = fields[2]
return alignment
def readline( file, skip_blank=False ):
def readline(file, skip_blank=False):
"""Read a line from provided file, skipping any blank or comment lines"""
while 1:
while True:
line = file.readline()
#print "every line: %r" % line
if not line: return None
if line[0] != '#' and not ( skip_blank and line.isspace() ):
if not line:
return None
if line[0] != '#' and not (skip_blank and line.isspace()):
return line
def parse_attributes( fields ):
def parse_attributes(fields):
"""Parse list of key=value strings into a dict"""
attributes = {}
for field in fields:
pair = field.split( '=' )
attributes[ pair[0] ] = pair[1]
pair = field.split('=')
attributes[pair[0]] = pair[1]
return attributes
def format_tabular( rows, align=None ):
if len( rows ) == 0: return ""
lengths = [ len( col ) for col in rows[ 0 ] ]
def format_tabular(rows, align=None):
if len(rows) == 0:
return ""
lengths = [len(col) for col in rows[0]]
for row in rows[1:]:
for i in range( 0, len( row ) ):
lengths[ i ] = max( lengths[ i ], len( row[ i ] ) )
for i in range(0, len(row)):
lengths[i] = max(lengths[i], len(row[i]))
rval = ""
for row in rows:
for i in range( 0, len( row ) ):
if align and align[ i ] == "l":
rval += row[ i ].ljust( lengths[ i ] )
for i in range(0, len(row)):
if align and align[i] == "l":
rval += row[i].ljust(lengths[i])
else:
rval += row[ i ].rjust( lengths[ i ] )
rval += row[i].rjust(lengths[i])
rval += " "
rval += "\n"
return rval
......@@ -3,8 +3,6 @@ Tests for `bx.align.maf`.
"""
from __future__ import print_function
import sys
import unittest
from six import StringIO
......@@ -59,104 +57,106 @@ s orange 19 61 - 100 AGGGATGCGTT--TCACTGCTATCGTCGTA----TTCAGACTTCG-CTATCT------G
"""
def test_reader():
reader = maf.Reader( StringIO( test_maf ) )
assert reader.attributes["version"] == "1"
assert reader.attributes["scoring"] == "humor.v4"
reader = maf.Reader(StringIO(test_maf))
assert reader.attributes["version"] == "1"
assert reader.attributes["scoring"] == "humor.v4"
a = next(reader)
assert a.score == 0.128
assert len( a.components ) == 3
check_component( a.components[0], "human_hoxa", 100, 8, "+", 100257, "ACA-TTACT" )
check_component( a.components[1], "horse_hoxa", 120, 9, "-", 98892, "ACAATTGCT" )
check_component( a.components[2], "fugu_hoxa", 88, 7, "+", 90788, "ACA--TGCT" )
assert len(a.components) == 3
check_component(a.components[0], "human_hoxa", 100, 8, "+", 100257, "ACA-TTACT")
check_component(a.components[1], "horse_hoxa", 120, 9, "-", 98892, "ACAATTGCT")
check_component(a.components[2], "fugu_hoxa", 88, 7, "+", 90788, "ACA--TGCT")
a = next(reader)
assert a.score == 0.071
assert len( a.components ) == 3
check_component( a.components[0], "human_unc", 9077, 8, "+", 10998, "ACAGTATT" )
check_component( a.components[1], "horse_unc", 4555, 6, "-", 5099, "ACA--ATT" )
check_component( a.components[2], "fugu_unc", 4000, 4, "+", 4038, "AC----TT" )
assert len(a.components) == 3
check_component(a.components[0], "human_unc", 9077, 8, "+", 10998, "ACAGTATT")
check_component(a.components[1], "horse_unc", 4555, 6, "-", 5099, "ACA--ATT")
check_component(a.components[2], "fugu_unc", 4000, 4, "+", 4038, "AC----TT")
a = next(reader)
assert a is None
reader.close()
def test_writer():
val = StringIO()
writer = maf.Writer( val, { 'scoring':'foobar' } )
writer = maf.Writer(val, {'scoring': 'foobar'})
a = align.Alignment()
a.score = 7009
a.components.append( align.Component( src="human_hoxa", start=100, size=9, strand="+", src_size=1000257, text="ACA-TTACT" ) )
a.components.append( align.Component( src="horse_hoxa", start=120, size=10, strand="-", src_size=98892, text="ACAATTGCT" ) )
a.components.append(align.Component(src="human_hoxa", start=100, size=9, strand="+", src_size=1000257, text="ACA-TTACT"))
a.components.append(align.Component(src="horse_hoxa", start=120, size=10, strand="-", src_size=98892, text="ACAATTGCT"))
check_component( a.components[0], "human_hoxa", 100, 9, "+", 1000257, "ACA-TTACT" )
check_component( a.components[1], "horse_hoxa", 120, 10, "-", 98892, "ACAATTGCT" )
check_component(a.components[0], "human_hoxa", 100, 9, "+", 1000257, "ACA-TTACT")
check_component(a.components[1], "horse_hoxa", 120, 10, "-", 98892, "ACAATTGCT")
writer.write( a )
writer.write(a)
assert val.getvalue() == """##maf version=1 scoring=foobar
a score=7009
s human_hoxa 100 9 + 1000257 ACA-TTACT
s horse_hoxa 120 10 - 98892 ACAATTGCT
"""
""" # noqa: W291
def test_slice():
a = align.Alignment()
a.score = "7009"
a.components.append( align.Component( src="human_hoxa", start=100, size=9, strand="+", src_size=100257, text="ACA-TTACT" ) )
a.components.append( align.Component( src="horse_hoxa", start=120, size=10, strand="-", src_size=98892, text="ACAATTGCT" ) )
a.components.append(align.Component(src="human_hoxa", start=100, size=9, strand="+", src_size=100257, text="ACA-TTACT"))
a.components.append(align.Component(src="horse_hoxa", start=120, size=10, strand="-", src_size=98892, text="ACAATTGCT"))
b = a.slice_by_component( 0, 101, 105 )
b = a.slice_by_component(0, 101, 105)
check_component( b.components[0], src="human_hoxa", start=101, size=4, strand="+", src_size=100257, text="CA-TT" )
check_component( b.components[1], src="horse_hoxa", start=121, size=5, strand="-", src_size=98892, text ="CAATT" )
check_component(b.components[0], src="human_hoxa", start=101, size=4, strand="+", src_size=100257, text="CA-TT")
check_component(b.components[1], src="horse_hoxa", start=121, size=5, strand="-", src_size=98892, text="CAATT")
# test slicing with + strand src
reader = maf.Reader( StringIO( test_maf_3 ) )
# test slicing with + strand src
reader = maf.Reader(StringIO(test_maf_3))
a = next(reader)
b = a.slice_by_component( 0, 40, 62 )
check_component( b.components[0], src="apple", start=40, size=22, strand="+", src_size=110, text="TTCGTCACT------GTCGTAAGGGTTC" )
check_component( b.components[1], src="orange", start=28, size=22, strand="-", src_size=100, text="TT--TCACTGCTATCGTCGTA----TTC" )
b = a.slice_by_component(0, 40, 62)
check_component(b.components[0], src="apple", start=40, size=22, strand="+", src_size=110, text="TTCGTCACT------GTCGTAAGGGTTC")
check_component(b.components[1], src="orange", start=28, size=22, strand="-", src_size=100, text="TT--TCACTGCTATCGTCGTA----TTC")
# test slicing with - strand src
b = a.slice_by_component( 1, 30, 68 )
check_component( b.components[0], src="apple", start=46, size=41, strand="+", src_size=110, text="ACT------GTCGTAAGGGTTCAGA--CTGTCTATGTATACACAAGTTG" )
check_component( b.components[1], src="orange", start=32, size=38, strand="-", src_size=100, text="ACTGCTATCGTCGTA----TTCAGACTTCG-CTATCT------GAGTTG" )
# test slicing with - strand src
b = a.slice_by_component(1, 30, 68)
check_component(b.components[0], src="apple", start=46, size=41, strand="+", src_size=110, text="ACT------GTCGTAAGGGTTCAGA--CTGTCTATGTATACACAAGTTG")
check_component(b.components[1], src="orange", start=32, size=38, strand="-", src_size=100, text="ACTGCTATCGTCGTA----TTCAGACTTCG-CTATCT------GAGTTG")
a = next(reader)
assert a is None
def test_with_synteny():
reader = maf.Reader( StringIO( test_maf_2 ), parse_e_rows=True )
reader = maf.Reader(StringIO(test_maf_2), parse_e_rows=True)
a = next(reader)
check_component( a.components[0], "hg17.chr1", 2005, 34, "+", 245522847, "TGTAACTTAATACCACAACCAGGCATAGGGG--AAA-------------")
check_component( a.components[1], "rheMac2.chr11", 9625228, 31, "+", 134511895, "TGTAACCTCTTACTGCAACAAGGCACAGGGG------------------")
check_component(a.components[0], "hg17.chr1", 2005, 34, "+", 245522847, "TGTAACTTAATACCACAACCAGGCATAGGGG--AAA-------------")
check_component(a.components[1], "rheMac2.chr11", 9625228, 31, "+", 134511895, "TGTAACCTCTTACTGCAACAAGGCACAGGGG------------------")
print(a.components[1].synteny_left)
assert a.components[1].synteny_left == ( maf.MAF_CONTIG_STATUS, 0 )
assert a.components[1].synteny_right == ( maf.MAF_INSERT_STATUS, 1678 )
assert a.components[1].synteny_left == (maf.MAF_CONTIG_STATUS, 0)
assert a.components[1].synteny_right == (maf.MAF_INSERT_STATUS, 1678)
rat = a.get_component_by_src_start( "rn3." )
check_component( rat, "rn3.chr4", 29161032, 1524, "-", 187371129, None )
rat = a.get_component_by_src_start("rn3.")
check_component(rat, "rn3.chr4", 29161032, 1524, "-", 187371129, None)
assert rat.synteny_empty == maf.MAF_INSERT_STATUS
def test_write_with_synteny():
reader = maf.Reader( StringIO( test_maf_2 ), parse_e_rows=True )
reader = maf.Reader(StringIO(test_maf_2), parse_e_rows=True)
a = next(reader)
val = StringIO()
writer = maf.Writer( val, { 'scoring':'foobar' } )
writer.write( a )
writer = maf.Writer(val, {'scoring': 'foobar'})
writer.write(a)
actual = val.getvalue()
expected = """##maf version=1 scoring=foobar
a score=3656.0
......@@ -177,16 +177,17 @@ e echTel1.scaffold_212365 4641 1430 + 9822 I
e rn3.chr4 29161032 1524 - 187371129 I
e mm7.chr6 28091695 3290 - 149646834 I
"""
""" # noqa: W291
print(actual)
print("---")
print(expected)
assert actual == expected
def check_component( c, src, start, size, strand, src_size, text ):
def check_component(c, src, start, size, strand, src_size, text):
assert c.src == src
assert c.start == start
assert c.size == size
assert c.strand == strand
assert c.src_size == src_size
assert c.start == start
assert c.size == size
assert c.strand == strand
assert c.src_size == src_size
assert c.text == text
......@@ -3,17 +3,28 @@ Support for scoring alignments using arbitrary scoring matrices, arbitrary
alphabets, and affine gap penalties.
"""
from numpy import *
from numpy import (
float32,
int32,
ones,
zeros
)
class ScoringScheme( object ):
# note that gap_open and gap_extend are penalties, which means you should make them positive
def __init__( self, gap_open, gap_extend, default=-100, alphabet1="ACGT", alphabet2=None, gap1="-", gap2=None, text1_range=128, text2_range=None, typecode=int32 ):
if (text2_range == None): text2_range = text1_range
if (alphabet2 == None): alphabet2 = alphabet1
if (gap2 == None): gap2 = gap1 # (scheme with gap1=gap2=None is legit)
if type(alphabet1) == str: alphabet1 = [ch for ch in alphabet1]
if type(alphabet2) == str: alphabet2 = [ch for ch in alphabet2]
self.table = ones( (text1_range, text2_range), typecode )
class ScoringScheme(object):
# note that gap_open and gap_extend are penalties, which means you should make them positive
def __init__(self, gap_open, gap_extend, default=-100, alphabet1="ACGT", alphabet2=None, gap1="-", gap2=None, text1_range=128, text2_range=None, typecode=int32):
if text2_range is None:
text2_range = text1_range
if alphabet2 is None:
alphabet2 = alphabet1
if gap2 is None:
gap2 = gap1 # (scheme with gap1=gap2=None is legit)
if isinstance(alphabet1, str):
alphabet1 = [ch for ch in alphabet1]
if isinstance(alphabet2, str):
alphabet2 = [ch for ch in alphabet2]
self.table = ones((text1_range, text2_range), typecode)
self.table *= default
self.gap_open = gap_open
self.gap_extend = gap_extend
......@@ -21,88 +32,112 @@ class ScoringScheme( object ):
self.gap2 = gap2
self.alphabet1 = alphabet1
self.alphabet2 = alphabet2
# private _set_score and _get_score allow subclasses to override them to
# implement a different underlying table object
def _set_score(self, a_b_pair,val):
(a,b) = a_b_pair
self.table[a,b] = val
# private _set_score and _get_score allow subclasses to override them to
# implement a different underlying table object
def _set_score(self, a_b_pair, val):
(a, b) = a_b_pair
self.table[a, b] = val
def _get_score(self, a_b_pair):
(a,b) = a_b_pair
return self.table[a,b]
def set_score( self, a, b, val, foldcase1=False, foldcase2=False ):
self._set_score((a,b),val)
(a, b) = a_b_pair
return self.table[a, b]
def set_score(self, a, b, val, foldcase1=False, foldcase2=False):
self._set_score((a, b), val)
if foldcase1:
aCh = chr(a)
if (aCh.isupper()): aa = ord(aCh.lower())
elif (aCh.islower()): aa = ord(aCh.upper())
else: foldcase1 = False
if (aCh.isupper()):
aa = ord(aCh.lower())
elif (aCh.islower()):
aa = ord(aCh.upper())
else:
foldcase1 = False
if foldcase2:
bCh = chr(b)
if (bCh.isupper()): bb = ord(bCh.lower())
elif (bCh.islower()): bb = ord(bCh.upper())
else: foldcase2 = False
if (bCh.isupper()):
bb = ord(bCh.lower())
elif (bCh.islower()):
bb = ord(bCh.upper())
else:
foldcase2 = False
if foldcase1 and foldcase2:
self._set_score((aa,b ),val)
self._set_score((a ,bb),val)
self._set_score((aa,bb),val)
self._set_score((aa, b), val)
self._set_score((a, bb), val)
self._set_score((aa, bb), val)
elif foldcase1:
self._set_score((aa,b ),val)
self._set_score((aa, b), val)
elif foldcase2:
self._set_score((a ,bb),val)
def score_alignment( self, a ):
return score_alignment(self,a)
def score_texts( self, text1, text2 ):
return score_texts( self, text1, text2 )
def __str__ (self):
isDna1 = "".join( self.alphabet1 ) == "ACGT"
isDna2 = "".join( self.alphabet2 ) == "ACGT"
labelRows = not ( isDna1 and isDna2 )
self._set_score((a, bb), val)
def score_alignment(self, a):
return score_alignment(self, a)
def score_texts(self, text1, text2):
return score_texts(self, text1, text2)
def __str__(self):
isDna1 = "".join(self.alphabet1) == "ACGT"
isDna2 = "".join(self.alphabet2) == "ACGT"
labelRows = not (isDna1 and isDna2)
width = 3
for a in self.alphabet1:
for b in self.alphabet2:
score = self._get_score((ord(a),ord(b)))
if (type(score) == float): s = "%8.6f" % score
else: s = "%s" % score
score = self._get_score((ord(a), ord(b)))
if (isinstance(score, float)):
s = "%8.6f" % score
else:
s = "%s" % score
if (len(s)+1 > width):
width = len(s)+1
lines = []
line = []
if labelRows:
if isDna1: line.append(" ")
else: line.append(" ")
if isDna1:
line.append(" ")
else:
line.append(" ")
for b in self.alphabet2:
if isDna2: s = b
else: s = "%02X" % ord(b)
line.append("%*s" % (width,s))
if isDna2:
s = b
else:
s = "%02X" % ord(b)
line.append("%*s" % (width, s))
lines.append(("".join(line))+"\n")
for a in self.alphabet1:
line = []
if labelRows:
if isDna1: line.append(a)
else: line.append("%02X" % ord(a))
if isDna1:
line.append(a)
else:
line.append("%02X" % ord(a))
for b in self.alphabet2:
score = self._get_score((ord(a),ord(b)))
if (type(score) == float): s = "%8.6f" % score
else: s = "%s" % score
line.append("%*s" % (width,s))
score = self._get_score((ord(a), ord(b)))
if (isinstance(score, float)):
s = "%8.6f" % score
else:
s = "%s" % score
line.append("%*s" % (width, s))
lines.append(("".join(line))+"\n")
return "".join(lines)
def read_scoring_scheme( f, gap_open, gap_extend, gap1="-", gap2=None, **kwargs ):
def read_scoring_scheme(f, gap_open, gap_extend, gap1="-", gap2=None, **kwargs):
"""
Initialize scoring scheme from a file containint a blastz style text blob.
f can be either a file or the name of a file.
"""
close_it = False
if (type(f) == str):
f = file(f,"rt")
if (isinstance(f, str)):
f = open(f, "rt")
close_it = True
ss = build_scoring_scheme("".join([line for line in f]),gap_open, gap_extend, gap1=gap1, gap2=gap2, **kwargs)
ss = build_scoring_scheme("".join([line for line in f]), gap_open, gap_extend, gap1=gap1, gap2=gap2, **kwargs)
if (close_it):
f.close()
return ss
def build_scoring_scheme( s, gap_open, gap_extend, gap1="-", gap2=None, **kwargs ):
def build_scoring_scheme(s, gap_open, gap_extend, gap1="-", gap2=None, **kwargs):
"""
Initialize scoring scheme from a blastz style text blob, first line
specifies the bases for each row/col, subsequent lines contain the
......@@ -123,37 +158,37 @@ def build_scoring_scheme( s, gap_open, gap_extend, gap1="-", gap2=None, **kwargs
"""
# perform initial parse to determine alphabets and locate scores
bad_matrix = "invalid scoring matrix"
s = s.rstrip( "\n" )
lines = s.split( "\n" )
rows = []
s = s.rstrip("\n")
lines = s.split("\n")
rows = []
symbols2 = lines.pop(0).split()
symbols1 = None
rows_have_syms = False
a_la_blastz = True
for i, line in enumerate( lines ):
for i, line in enumerate(lines):
row_scores = line.split()
if len( row_scores ) == len( symbols2 ): # blastz-style row
if symbols1 == None:
if len( lines ) != len( symbols2 ):
if len(row_scores) == len(symbols2): # blastz-style row
if symbols1 is None:
if len(lines) != len(symbols2):
raise bad_matrix
symbols1 = symbols2
elif (rows_have_syms):
raise bad_matrix
elif len( row_scores ) == len( symbols2 ) + 1: # row starts with symbol
if symbols1 == None:
elif len(row_scores) == len(symbols2) + 1: # row starts with symbol
if symbols1 is None:
symbols1 = []
rows_have_syms = True
a_la_blastz = False
elif not rows_have_syms:
raise bad_matrix
symbols1.append( row_scores.pop(0) )
symbols1.append(row_scores.pop(0))
else:
raise bad_matrix
rows.append( row_scores )
rows.append(row_scores)
# convert alphabets from strings to characters
try:
alphabet1 = [sym_to_char( sym ) for sym in symbols1]
alphabet2 = [sym_to_char( sym ) for sym in symbols2]
alphabet1 = [sym_to_char(sym) for sym in symbols1]
alphabet2 = [sym_to_char(sym) for sym in symbols2]
except ValueError:
raise bad_matrix
if (alphabet1 != symbols1) or (alphabet2 != symbols2):
......@@ -165,129 +200,143 @@ def build_scoring_scheme( s, gap_open, gap_extend, gap1="-", gap2=None, **kwargs
if a_la_blastz:
foldcase1 = foldcase2 = True
else:
foldcase1 = "".join( alphabet1 ) == "ACGT"
foldcase2 = "".join( alphabet2 ) == "ACGT"
foldcase1 = "".join(alphabet1) == "ACGT"
foldcase2 = "".join(alphabet2) == "ACGT"
# create appropriately sized matrix
text1_range = text2_range = 128
if ord( max( alphabet1 ) ) >= 128: text1_range = 256
if ord( max( alphabet2 ) ) >= 128: text2_range = 256
if ord(max(alphabet1)) >= 128:
text1_range = 256
if ord(max(alphabet2)) >= 128:
text2_range = 256
typecode = int32
for i, row_scores in enumerate( rows ):
for j, score in enumerate( map( int_or_float, row_scores ) ):
if type( score ) == float:
for i, row_scores in enumerate(rows):
for j, score in enumerate(map(int_or_float, row_scores)):
if isinstance(score, float):
typecode = float32
if type( gap_open ) == float:
if isinstance(gap_open, float):
typecode = float32
if type( gap_extend ) == float:
if isinstance(gap_extend, float):
typecode = float32
ss = ScoringScheme( gap_open, gap_extend, alphabet1=alphabet1, alphabet2=alphabet2, gap1=gap1, gap2=gap2, text1_range=text1_range, text2_range=text2_range, typecode=typecode, **kwargs )
ss = ScoringScheme(gap_open, gap_extend, alphabet1=alphabet1, alphabet2=alphabet2, gap1=gap1, gap2=gap2, text1_range=text1_range, text2_range=text2_range, typecode=typecode, **kwargs)
# fill matrix
for i, row_scores in enumerate( rows ):
for j, score in enumerate( map( int_or_float, row_scores ) ):
ss.set_score( ord( alphabet1[i] ), ord( alphabet2[j] ), score )
for i, row_scores in enumerate(rows):
for j, score in enumerate(map(int_or_float, row_scores)):
ss.set_score(ord(alphabet1[i]), ord(alphabet2[j]), score)
if foldcase1 and foldcase2:
ss.set_score( ord( alphabet1[i].lower() ), ord( alphabet2[j].upper() ), score )
ss.set_score( ord( alphabet1[i].upper() ), ord( alphabet2[j].lower() ), score )
ss.set_score( ord( alphabet1[i].lower() ), ord( alphabet2[j].lower() ), score )
ss.set_score(ord(alphabet1[i].lower()), ord(alphabet2[j].upper()), score)
ss.set_score(ord(alphabet1[i].upper()), ord(alphabet2[j].lower()), score)
ss.set_score(ord(alphabet1[i].lower()), ord(alphabet2[j].lower()), score)
elif foldcase1:
ss.set_score( ord( alphabet1[i].lower() ), ord( alphabet2[j] ), score )
ss.set_score(ord(alphabet1[i].lower()), ord(alphabet2[j]), score)
elif foldcase2:
ss.set_score( ord( alphabet1[i] ), ord( alphabet2[j].lower() ), score )
ss.set_score(ord(alphabet1[i]), ord(alphabet2[j].lower()), score)
return ss
def int_or_float( s ):
try: return int( s )
except: return float( s )
def int_or_float(s):
try:
return int(s)
except ValueError:
return float(s)
# convert possible two-char symbol to a single character
def sym_to_char( sym ):
if len( sym ) == 1: return sym
elif len( sym ) != 2: raise ValueError
else: return chr(int(sym,base=16))
def score_alignment( scoring_scheme, a ):
def sym_to_char(sym):
if len(sym) == 1:
return sym
elif len(sym) != 2:
raise ValueError
else:
return chr(int(sym, base=16))
def score_alignment(scoring_scheme, a):
score = 0
ncomps = len( a.components )
for i in range( ncomps ):
for j in range( i+1, ncomps ):
score += score_texts( scoring_scheme, a.components[i].text, a.components[j].text )
ncomps = len(a.components)
for i in range(ncomps):
for j in range(i+1, ncomps):
score += score_texts(scoring_scheme, a.components[i].text, a.components[j].text)
return score
def score_texts( scoring_scheme, text1, text2 ):
def score_texts(scoring_scheme, text1, text2):
rval = 0
last_gap_a = last_gap_b = False
for i in range( len( text1 ) ):
for i in range(len(text1)):
a = text1[i]
b = text2[i]
# Ignore gap/gap pair
if a == scoring_scheme.gap1 and b == scoring_scheme.gap2:
if a == scoring_scheme.gap1 and b == scoring_scheme.gap2:
continue
# Gap in first species
elif a == scoring_scheme.gap1:
rval -= scoring_scheme.gap_extend
if not last_gap_a:
rval -= scoring_scheme.gap_open
last_gap_a = True
last_gap_b = False
rval -= scoring_scheme.gap_open
last_gap_a = True
last_gap_b = False
# Gap in second species
elif b == scoring_scheme.gap2:
rval -= scoring_scheme.gap_extend
if not last_gap_b:
rval -= scoring_scheme.gap_open
last_gap_a = False
last_gap_b = True
rval -= scoring_scheme.gap_open
last_gap_a = False
last_gap_b = True
# Aligned base
else:
rval += scoring_scheme._get_score((ord(a),ord(b)))
else:
rval += scoring_scheme._get_score((ord(a), ord(b)))
last_gap_a = last_gap_b = False
return rval
def accumulate_scores( scoring_scheme, text1, text2, skip_ref_gaps=False ):
def accumulate_scores(scoring_scheme, text1, text2, skip_ref_gaps=False):
"""
Return cumulative scores for each position in alignment as a 1d array.
If `skip_ref_gaps` is False positions in returned array correspond to each
column in alignment, if True they correspond to each non-gap position (each
base) in text1.
"""
if skip_ref_gaps:
rval = zeros( len( text1 ) - text1.count( scoring_scheme.gap1 ) )
rval = zeros(len(text1) - text1.count(scoring_scheme.gap1))
else:
rval = zeros( len( text1 ) )
rval = zeros(len(text1))
score = 0
pos = 0
last_gap_a = last_gap_b = False
for i in range( len( text1 ) ):
for i in range(len(text1)):
a = text1[i]
b = text2[i]
# Ignore gap/gap pair
if a == scoring_scheme.gap1 and b == scoring_scheme.gap2:
if a == scoring_scheme.gap1 and b == scoring_scheme.gap2:
continue
# Gap in first species
elif a == scoring_scheme.gap1:
score -= scoring_scheme.gap_extend
if not last_gap_a:
score -= scoring_scheme.gap_open
last_gap_a = True
last_gap_b = False
score -= scoring_scheme.gap_open
last_gap_a = True
last_gap_b = False
# Gap in second species
elif b == scoring_scheme.gap2:
score -= scoring_scheme.gap_extend
if not last_gap_b:
score -= scoring_scheme.gap_open
last_gap_a = False
last_gap_b = True
score -= scoring_scheme.gap_open
last_gap_a = False
last_gap_b = True
# Aligned base
else:
score += scoring_scheme._get_score((ord(a),ord(b)))
else:
score += scoring_scheme._get_score((ord(a), ord(b)))
last_gap_a = last_gap_b = False
if not( skip_ref_gaps ) or a != scoring_scheme.gap1:
if not(skip_ref_gaps) or a != scoring_scheme.gap1:
rval[pos] = score
pos += 1
return rval
hox70 = build_scoring_scheme( """ A C G T
hox70 = build_scoring_scheme(""" A C G T
91 -114 -31 -123
-114 100 -125 -31
-31 -125 100 -114
-123 -31 -114 91 """, 400, 30 )
-123 -31 -114 91 """, 400, 30)
......@@ -3,7 +3,11 @@ Tests for `bx.align.score`.
"""
import unittest
from numpy import array, cumsum, allclose
from numpy import (
allclose,
array,
cumsum,
)
from six import StringIO
import bx.align.maf
......