Skip to content
Commits on Source (10)
......@@ -6,6 +6,12 @@ python:
- '2.7'
- '3.5'
stages:
- lint
- test
- name: deploy
if: tag IS present
env:
global:
- PYTHON=python
......@@ -13,12 +19,19 @@ env:
- secure: 'kFoqHCxat/ETS2SUc2q9M7YvzvnlR7sgHmx7SRvVgTyLkk1efpJ++YPwDBEYZ3v+GLf2nRfc20GxtZkH6ey1f//aj4CT2q2CJiUsKAlkFAOHzKo/3mTLl/WDHkPAr9MW7AdnbNk6W8sIPCKqFsyKL2FTH70dBcxa1e7trQ2RC64hnOOkt/tm2cQhj6sX0gROggN5QrpHE8tDZb9ugF0uf92L/CGxeClAebWgb7zVChHDMTNsmnOvWUF9m6LZOvkgFmuIeh70EPuOWh6LxU/n5JyevYIGO5vVDbjgfmNELlG2KUTm6dWeoyofcj6hUqYmQsmI1ATrf7ThY1+b6asQGy+Exp/76MBXiYRh+RgVKifwaZMOWehzfjDQvPYOGvf6rXOVGeVZ+nBkskr0HARsX1KnyDE+k+XPoP7zqvW6mCic9ZQ+IdQJtxMHOTxxFjuPAlunvaUqDNM9VP6YEWOI4UqIOO1nQh4E2zkPhXI2yY744q+BV/5+3HHqNQj1+5qFPoZeyDEuNXwgDCjrJ8i3hna/LTTvRigx6/YQL1PF/C30R4h/nkqp8ghA4VpNRPnQ8nOO+oD6AdN7Pswc3C4qGPEwoeqfNzEIR1KfEWzB7HsfTFbgyGFFNGuQ/P26DMK+kPBNZ6GhZ9wb5/xT226OA+ovcAmVGn/Hnt/qVaylXNk='
_deploy_common: &deploy_common
if: tag IS present
install:
- $PYTHON -m pip install cibuildwheel twine
matrix:
include:
- stage: lint
python: '3.5'
addons: {}
install:
- $PYTHON -m pip install flake8 flake8-import-order
script:
- flake8 .
- stage: deploy
python: '3.5'
services:
......@@ -32,6 +45,7 @@ matrix:
- $PYTHON setup.py sdist
- twine check dist/*
- twine upload --skip-existing dist/*
- stage: deploy
os: osx
language: generic
......
python-bx (0.8.5-2) UNRELEASED; urgency=medium
python-bx (0.8.6-1) unstable; urgency=medium
* Team upload.
[ Michael R. Crusoe ]
* Disable the bx.interval_index_file_tests.test_interval_index_file test on
i386 Closes: #944409
-- Michael R. Crusoe <michael.crusoe@gmail.com> Sat, 07 Dec 2019 13:25:33 +0100
[ Andreas Tille ]
* New upstream version
* Simplify lintian-override
* Trim trailing whitespace.
* debian/copyright: use spaces rather than tabs to start continuation
lines.
* Set upstream metadata fields: Bug-Database, Repository, Repository-
Browse.
* Enhance description
-- Andreas Tille <tille@debian.org> Wed, 18 Dec 2019 09:25:52 +0100
python-bx (0.8.5-1) unstable; urgency=medium
......
......@@ -30,18 +30,18 @@ Breaks: python3-bx-tools
Provides: python3-bx-tools
Replaces: python3-bx-tools
Description: library to manage genomic data and its alignment
The bx-python project is a python library and associated set of scripts to
The bx-python project is a Python3 library and associated set of scripts to
allow for rapid implementation of genome scale analyses. The library contains
a variety of useful modules, but the particular strengths are:
* Classes for reading and working with genome-scale multiple local
alignments (in MAF, AXT, and LAV formats)
* Generic data structure for indexing on disk files that contain blocks of
data associated with intervals on various sequences (used, for example, to
provide random access to individual alignments in huge files; optimized for
use over network filesystems)
provide random access to individual alignments in huge files; optimized
for use over network filesystems)
* Data structures for working with intervals on sequences
* "Binned bitsets" which act just like chromosome sized bit arrays, but
lazily allocate regions and allow large blocks of all set or all unset bits
to be stored compactly
lazily allocate regions and allow large blocks of all set or all unset
bits to be stored compactly
* "Intersecter" for performing fast intersection tests that preserve both
query and target intervals and associated annotation
# Not fiddling with .py endings because of reverse-dependencies expected from toil
# We shall revisit this decision once all packages for toil are in the distribution
# and we have learned more about how bx is used in the community in 3rd party scripts.
python3-bx: script-with-language-extension usr/bin/aggregate_scores_in_intervals.py
python3-bx: script-with-language-extension usr/bin/align_print_template.py
python3-bx: script-with-language-extension usr/bin/axt_extract_ranges.py
python3-bx: script-with-language-extension usr/bin/axt_to_fasta.py
python3-bx: script-with-language-extension usr/bin/axt_to_lav.py
python3-bx: script-with-language-extension usr/bin/axt_to_maf.py
python3-bx: script-with-language-extension usr/bin/bed_bigwig_profile.py
python3-bx: script-with-language-extension usr/bin/bed_build_windows.py
python3-bx: script-with-language-extension usr/bin/bed_complement.py
python3-bx: script-with-language-extension usr/bin/bed_count_by_interval.py
python3-bx: script-with-language-extension usr/bin/bed_count_overlapping.py
python3-bx: script-with-language-extension usr/bin/bed_coverage.py
python3-bx: script-with-language-extension usr/bin/bed_coverage_by_interval.py
python3-bx: script-with-language-extension usr/bin/bed_diff_basewise_summary.py
python3-bx: script-with-language-extension usr/bin/bed_extend_to.py
python3-bx: script-with-language-extension usr/bin/bed_intersect.py
python3-bx: script-with-language-extension usr/bin/bed_intersect_basewise.py
python3-bx: script-with-language-extension usr/bin/bed_merge_overlapping.py
python3-bx: script-with-language-extension usr/bin/bed_rand_intersect.py
python3-bx: script-with-language-extension usr/bin/bed_subtract_basewise.py
python3-bx: script-with-language-extension usr/bin/bnMapper.py
python3-bx: script-with-language-extension usr/bin/div_snp_table_chr.py
python3-bx: script-with-language-extension usr/bin/find_in_sorted_file.py
python3-bx: script-with-language-extension usr/bin/gene_fourfold_sites.py
python3-bx: script-with-language-extension usr/bin/get_scores_in_intervals.py
python3-bx: script-with-language-extension usr/bin/int_seqs_to_char_strings.py
python3-bx: script-with-language-extension usr/bin/interval_count_intersections.py
python3-bx: script-with-language-extension usr/bin/interval_join.py
python3-bx: script-with-language-extension usr/bin/lav_to_axt.py
python3-bx: script-with-language-extension usr/bin/lav_to_maf.py
python3-bx: script-with-language-extension usr/bin/line_select.py
python3-bx: script-with-language-extension usr/bin/lzop_build_offset_table.py
python3-bx: script-with-language-extension usr/bin/mMK_bitset.py
python3-bx: script-with-language-extension usr/bin/maf_build_index.py
python3-bx: script-with-language-extension usr/bin/maf_chop.py
python3-bx: script-with-language-extension usr/bin/maf_chunk.py
python3-bx: script-with-language-extension usr/bin/maf_col_counts.py
python3-bx: script-with-language-extension usr/bin/maf_col_counts_all.py
python3-bx: script-with-language-extension usr/bin/maf_count.py
python3-bx: script-with-language-extension usr/bin/maf_covered_ranges.py
python3-bx: script-with-language-extension usr/bin/maf_covered_regions.py
python3-bx: script-with-language-extension usr/bin/maf_div_sites.py
python3-bx: script-with-language-extension usr/bin/maf_drop_overlapping.py
python3-bx: script-with-language-extension usr/bin/maf_extract_chrom_ranges.py
python3-bx: script-with-language-extension usr/bin/maf_extract_ranges.py
python3-bx: script-with-language-extension usr/bin/maf_extract_ranges_indexed.py
python3-bx: script-with-language-extension usr/bin/maf_filter.py
python3-bx: script-with-language-extension usr/bin/maf_filter_max_wc.py
python3-bx: script-with-language-extension usr/bin/maf_gap_frequency.py
python3-bx: script-with-language-extension usr/bin/maf_gc_content.py
python3-bx: script-with-language-extension usr/bin/maf_interval_alignibility.py
python3-bx: script-with-language-extension usr/bin/maf_limit_to_species.py
python3-bx: script-with-language-extension usr/bin/maf_mapping_word_frequency.py
python3-bx: script-with-language-extension usr/bin/maf_mask_cpg.py
python3-bx: script-with-language-extension usr/bin/maf_mean_length_ungapped_piece.py
python3-bx: script-with-language-extension usr/bin/maf_percent_columns_matching.py
python3-bx: script-with-language-extension usr/bin/maf_percent_identity.py
python3-bx: script-with-language-extension usr/bin/maf_print_chroms.py
python3-bx: script-with-language-extension usr/bin/maf_print_scores.py
python3-bx: script-with-language-extension usr/bin/maf_randomize.py
python3-bx: script-with-language-extension usr/bin/maf_region_coverage_by_src.py
python3-bx: script-with-language-extension usr/bin/maf_select.py
python3-bx: script-with-language-extension usr/bin/maf_shuffle_columns.py
python3-bx: script-with-language-extension usr/bin/maf_species_in_all_files.py
python3-bx: script-with-language-extension usr/bin/maf_split_by_src.py
python3-bx: script-with-language-extension usr/bin/maf_thread_for_species.py
python3-bx: script-with-language-extension usr/bin/maf_tile.py
python3-bx: script-with-language-extension usr/bin/maf_tile_2.py
python3-bx: script-with-language-extension usr/bin/maf_tile_2bit.py
python3-bx: script-with-language-extension usr/bin/maf_to_axt.py
python3-bx: script-with-language-extension usr/bin/maf_to_concat_fasta.py
python3-bx: script-with-language-extension usr/bin/maf_to_fasta.py
python3-bx: script-with-language-extension usr/bin/maf_to_int_seqs.py
python3-bx: script-with-language-extension usr/bin/maf_translate_chars.py
python3-bx: script-with-language-extension usr/bin/maf_truncate.py
python3-bx: script-with-language-extension usr/bin/maf_word_frequency.py
python3-bx: script-with-language-extension usr/bin/mask_quality.py
python3-bx: script-with-language-extension usr/bin/nib_chrom_intervals_to_fasta.py
python3-bx: script-with-language-extension usr/bin/nib_intervals_to_fasta.py
python3-bx: script-with-language-extension usr/bin/nib_length.py
python3-bx: script-with-language-extension usr/bin/one_field_per_line.py
python3-bx: script-with-language-extension usr/bin/out_to_chain.py
python3-bx: script-with-language-extension usr/bin/prefix_lines.py
python3-bx: script-with-language-extension usr/bin/pretty_table.py
python3-bx: script-with-language-extension usr/bin/qv_to_bqv.py
python3-bx: script-with-language-extension usr/bin/random_lines.py
python3-bx: script-with-language-extension usr/bin/table_add_column.py
python3-bx: script-with-language-extension usr/bin/table_filter.py
python3-bx: script-with-language-extension usr/bin/tfloc_summary.py
python3-bx: script-with-language-extension usr/bin/ucsc_gene_table_to_intervals.py
python3-bx: script-with-language-extension usr/bin/wiggle_to_array_tree.py
python3-bx: script-with-language-extension usr/bin/wiggle_to_binned_array.py
python3-bx: script-with-language-extension usr/bin/wiggle_to_chr_binned_array.py
python3-bx: script-with-language-extension usr/bin/wiggle_to_simple.py
# see https://lists.debian.org/debian-med/2018/06/msg00043.html
python3-bx: script-with-language-extension usr/bin/*.py
......@@ -26,5 +26,3 @@ override_dh_auto_clean:
grep -rli "Generated by Cython" lib/ | xargs -r rm
# Changes perpetually redone
rm -rf lib/bx_python.egg-info
......@@ -7,3 +7,6 @@ Registry:
Entry: NA
- Name: conda:bioconda
Entry: bx-python
Bug-Database: https://github.com/bxlab/bx-python/issues
Repository: https://github.com/bxlab/bx-python.git
Repository-Browse: https://github.com/bxlab/bx-python
......@@ -14,15 +14,9 @@
# All configuration values have a default; values that are commented out
# serve to show the default.
import sys, os
# If your extensions are in another directory, add it here. If the directory
# is relative to the documentation root, use os.path.abspath to make it
# absolute, like shown here.
#sys.path.append(os.path.abspath('.'))
## curr_dir = os.path.dirname( __file__ )
## bx_dir = os.path.join( curr_dir, '..', '..', 'lib')
## sys.path.insert( 0, bx_dir )
import bx
# General configuration
......@@ -175,10 +169,9 @@ htmlhelp_basename = 'bx-doc'
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, author, document class [howto/manual]).
latex_documents = [
('index', 'bx-python.tex', ur'bx-python Documentation',
ur'James Taylor', 'manual'),
]
latex_documents = [(
'index', 'bx-python.tex', u'bx-python Documentation',
u'James Taylor', 'manual'), ]
# The name of an image file (relative to this directory) to place at the top of
# the title page.
......
......@@ -6,18 +6,16 @@ Setuptools bootstrapping installer.
Run this script to install or upgrade setuptools.
"""
import contextlib
import optparse
import os
import platform
import shutil
import subprocess
import sys
import tempfile
import zipfile
import optparse
import subprocess
import platform
import textwrap
import contextlib
import warnings
import zipfile
from distutils import log
try:
......@@ -244,6 +242,8 @@ def has_powershell():
except Exception:
return False
return True
download_file_powershell.viable = has_powershell
......@@ -260,6 +260,8 @@ def has_curl():
except Exception:
return False
return True
download_file_curl.viable = has_curl
......@@ -276,6 +278,8 @@ def has_wget():
except Exception:
return False
return True
download_file_wget.viable = has_wget
......@@ -291,6 +295,8 @@ def download_file_insecure(url, target):
# Write all the data in one block to avoid creating a partial file.
with open(target, "wb") as dst:
dst.write(data)
download_file_insecure.viable = lambda: True
......@@ -387,5 +393,6 @@ def main():
archive = download_setuptools(**_download_args(options))
return _install(archive, _build_install_args(options))
if __name__ == '__main__':
sys.exit(main())
......@@ -4,4 +4,4 @@ the abstract alignment classes and `maf`, `axt`, and `lav` for readers and
writers in various formats.
"""
from bx.align.core import *
from bx.align.core import * # noqa
......@@ -4,22 +4,29 @@ alignments.
.. _AXT: http://genome.ucsc.edu/goldenPath/help/axt.html
"""
import itertools
from six import Iterator
from bx import interval_index_file
from bx.align import *
from bx.align import (
Alignment,
Component,
src_split
)
# Tools for dealing with pairwise alignments in AXT format
class MultiIndexed(object):
"""Similar to 'indexed' but wraps more than one axt_file"""
def __init__(self, axt_filenames, keep_open=False):
self.indexes = [Indexed(axt_file, axt_file + ".index") for axt_file in axt_filenames]
def get(self, src, start, end):
blocks = []
for index in self.indexes: blocks += index.get( src, start, end )
for index in self.indexes:
blocks += index.get(src, start, end)
return blocks
......@@ -27,14 +34,17 @@ class Indexed( object ):
"""Indexed access to a axt using overlap queries, requires an index file"""
def __init__(self, axt_filename, index_filename=None, keep_open=False, species1=None, species2=None, species_to_lengths=None, support_ids=False):
if index_filename is None: index_filename = axt_filename + ".index"
if index_filename is None:
index_filename = axt_filename + ".index"
self.indexes = interval_index_file.Indexes(filename=index_filename)
self.axt_filename = axt_filename
# nota bene: (self.species1 = species1 or "species1") is incorrect if species1=""
self.species1 = species1
if (self.species1 == None): self.species1 = "species1"
if self.species1 is None:
self.species1 = "species1"
self.species2 = species2
if (self.species2 == None): self.species2 = "species2"
if self.species2 is None:
self.species2 = "species2"
self.species_to_lengths = species_to_lengths
self.support_ids = support_ids # for extra text at end of axt header lines
if keep_open:
......@@ -58,6 +68,7 @@ class Indexed( object ):
finally:
f.close()
class Reader(Iterator):
"""Iterate over all axt blocks in a file in order"""
......@@ -65,9 +76,11 @@ class Reader( Iterator ):
self.file = file
# nota bene: (self.species1 = species1 or "species1") is incorrect if species1=""
self.species1 = species1
if (self.species1 == None): self.species1 = "species1"
if self.species1 is None:
self.species1 = "species1"
self.species2 = species2
if (self.species2 == None): self.species2 = "species2"
if self.species2 is None:
self.species2 = "species2"
self.species_to_lengths = species_to_lengths
self.support_ids = support_ids # for extra text at end of axt header lines
self.attributes = {}
......@@ -81,28 +94,34 @@ class Reader( Iterator ):
def close(self):
self.file.close()
class ReaderIter(Iterator):
def __init__(self, reader):
self.reader = reader
def __iter__(self):
return self
def __next__(self):
v = next(self.reader)
if not v: raise StopIteration
if not v:
raise StopIteration
return v
class Writer(object):
def __init__(self, file, attributes={}):
self.file = file
self.block = 0
self.src_split = True
if ("src_split" in attributes):
if "src_split" in attributes:
self.src_split = attributes["src_split"]
def write(self, alignment):
if (len(alignment.components) != 2):
raise ValueError("%d-component alignment is not compatible with axt" % \
if len(alignment.components) != 2:
raise ValueError(
"%d-component alignment is not compatible with axt" %
len(alignment.components))
c1 = alignment.components[0]
c2 = alignment.components[1]
......@@ -111,13 +130,14 @@ class Writer( object ):
c1 = c1.reverse_complement()
c2 = c2.reverse_complement()
if (self.src_split):
if self.src_split:
spec1, chr1 = src_split(c1.src)
spec2, chr2 = src_split(c2.src)
else:
chr1, chr2 = c1.src, c2.src
self.file.write( "%d %s %d %d %s %d %d %s %s\n" % \
self.file.write(
"%d %s %d %d %s %d %d %s %s\n" %
(self.block, chr1, c1.start+1, c1.start+c1.size,
chr2, c2.start+1, c2.start+c2.size, c2.strand,
alignment.score))
......@@ -139,25 +159,30 @@ class Writer( object ):
# first species is always on plus strand
# when second species is on minus strand, start and stop are counted from sequence end
def read_next_axt(file, species1, species2, species_to_lengths=None, support_ids=False):
line = readline(file, skip_blank=True)
if not line: return
if not line:
return
fields = line.split()
if (len(fields) < 9) or ((not support_ids) and (len(fields) > 9)):
if len(fields) < 9 or (not support_ids and len(fields) > 9):
raise ValueError("bad axt-block header: %s" % line)
attributes = {}
if (len(fields) > 9):
if len(fields) > 9:
attributes["id"] = "_".join(fields[9:])
seq1 = readline(file)
if not line or line.isspace(): raise ValueError("incomplete axt-block; header: %s" % line)
if not line or line.isspace():
raise ValueError("incomplete axt-block; header: %s" % line)
seq2 = readline(file)
if not line or line.isspace(): raise ValueError("incomplete axt-block; header: %s" % line)
if not line or line.isspace():
raise ValueError("incomplete axt-block; header: %s" % line)
# Build 2 component alignment
alignment = Alignment(attributes=attributes, species_to_lengths=species_to_lengths)
# Build component for species 1
component = Component()
component.src = fields[1]
if (species1 != ""): component.src = species1 + "." + component.src
if species1 != "":
component.src = species1 + "." + component.src
component.start = int(fields[2]) - 1 # (axt intervals are origin-1
end = int(fields[3]) # and inclusive on both ends)
component.size = end - component.start
......@@ -167,7 +192,8 @@ def read_next_axt( file, species1, species2, species_to_lengths=None, support_id
# Build component for species 2
component = Component()
component.src = fields[4]
if (species2 != ""): component.src = species2 + "." + component.src
if species2 != "":
component.src = species2 + "." + component.src
component.start = int(fields[5]) - 1
end = int(fields[6])
component.size = end - component.start
......@@ -177,18 +203,19 @@ def read_next_axt( file, species1, species2, species_to_lengths=None, support_id
# add score
try:
alignment.score = int(fields[8])
except:
except ValueError:
try:
alignment.score = float(fields[8])
except:
except ValueError:
alignment.score = fields[8]
return alignment
def readline(file, skip_blank=False):
"""Read a line from provided file, skipping any blank or comment lines"""
while 1:
while True:
line = file.readline()
if not line: return None
if not line:
return None
if line[0] != '#' and not (skip_blank and line.isspace()):
return line
......@@ -10,10 +10,10 @@ import six
from bx.misc.readlengths import read_lengths_file
# DNA reverse complement table
## DNA_COMP = " - " \
## " TVGH CD M KN YSA BWXR tvgh cd m kn ysa bwxr " \
## " " \
## " "
# DNA_COMP = " - " \
# " TVGH CD M KN YSA BWXR tvgh cd m kn ysa bwxr " \
# " " \
# " "
if six.PY2:
DNA_COMP = string.maketrans("ACGTacgt", "TGCAtgca")
......@@ -34,8 +34,10 @@ class Alignment( object ):
self.score = score
self.text_size = 0
self.attributes = attributes
if species_to_lengths == None: self.species_to_lengths = {}
else: self.species_to_lengths = species_to_lengths
if species_to_lengths is None:
self.species_to_lengths = {}
else:
self.species_to_lengths = species_to_lengths
self.components = []
def add_component(self, component):
......@@ -49,14 +51,15 @@ class Alignment( object ):
def get_score(self):
return self.__score
def set_score(self, score):
if type( score ) == str:
if isinstance(score, str):
try:
score = int(score)
except:
except ValueError:
try:
score = float(score)
except:
except ValueError:
pass
self.__score = score
score = property(fget=get_score, fset=set_score)
......@@ -80,26 +83,30 @@ class Alignment( object ):
chrom_to_length = self.species_to_lengths
else:
raise ValueError("no src_size (no length file for %s)" % species)
if type( chrom_to_length ) == int: # (if it's a single length)
if isinstance(chrom_to_length, int): # (if it's a single length)
return chrom_to_length
if type( chrom_to_length ) == type( "" ): # (if it's a file name)
if isinstance(chrom_to_length, str): # (if it's a file name)
chrom_to_length = read_lengths_file(chrom_to_length)
self.species_to_lengths[species] = chrom_to_length
if chrom not in chrom_to_length: raise ValueError("no src_size (%s has no length for %s)" % ( species, chrom ))
if chrom not in chrom_to_length:
raise ValueError("no src_size (%s has no length for %s)" % (species, chrom))
return chrom_to_length[chrom]
def get_component_by_src(self, src):
for c in self.components:
if c.src == src: return c
if c.src == src:
return c
return None
def get_components_by_src(self, src):
for c in self.components:
if c.src == src: yield c
if c.src == src:
yield c
def get_component_by_src_start(self, src):
for c in self.components:
if c.src.startswith( src ): return c
if c.src.startswith(src):
return c
return None
def slice(self, start, end):
......@@ -131,17 +138,17 @@ class Alignment( object ):
start and end are relative to the + strand, regardless of the component's strand.
"""
if type( component_index ) == type( 0 ):
if isinstance(component_index, int):
ref = self.components[component_index]
elif type( component_index ) == type( "" ):
elif isinstance(component_index, str):
ref = self.get_component_by_src(component_index)
elif type( component_index ) == Component:
elif isinstance(component_index, Component):
ref = component_index
else:
raise ValueError("can't figure out what to do")
start_col = ref.coord_to_col(start)
end_col = ref.coord_to_col(end)
if (ref.strand == '-'):
if ref.strand == '-':
(start_col, end_col) = (end_col, start_col)
return self.slice(start_col, end_col)
......@@ -173,22 +180,26 @@ class Alignment( object ):
while i < text_size:
all_gap = True
for seq in seqs:
if seq is None: continue
if seq[i] != '-': all_gap = False
if seq is None:
continue
if seq[i] != '-':
all_gap = False
if all_gap:
for seq in seqs:
if seq is None: continue
if seq is None:
continue
del seq[i]
text_size -= 1
else:
i += 1
for i in range(len(self.components)):
if seqs[i] is None: continue
if seqs[i] is None:
continue
self.components[i].text = ''.join(seqs[i])
self.text_size = text_size
def __eq__(self, other):
if other is None or type( other ) != type( self ):
if other is None or not isinstance(other, type(self)):
return False
if self.score != other.score:
return False
......@@ -211,6 +222,7 @@ class Alignment( object ):
new.add_component(deepcopy(component))
return new
class Component(object):
def __init__(self, src='', start=0, size=0, strand=None, src_size=None, text=''):
......@@ -235,16 +247,14 @@ class Component( object ):
def __str__(self):
if self.empty:
rval = "e %s %d %d %s %d %s" % ( self.src, self.start,
self.size, self.strand,
self.src_size, self.synteny_empty )
rval = "e %s %d %d %s %d %s" % (
self.src, self.start, self.size, self.strand, self.src_size, self.synteny_empty)
else:
rval = "s %s %d %d %s %d %s" % ( self.src, self.start,
self.size, self.strand,
self.src_size, self.text )
rval = "s %s %d %d %s %d %s" % (
self.src, self.start, self.size, self.strand, self.src_size, self.text)
if self.synteny_left and self.synteny_right:
rval += "\ni %s %s %d %s %d" % ( self.src,
self.synteny_left[0], self.synteny_left[1],
rval += "\ni %s %s %d %s %d" % (
self.src, self.synteny_left[0], self.synteny_left[1],
self.synteny_right[0], self.synteny_right[1])
return rval
......@@ -253,29 +263,36 @@ class Component( object ):
end = property(fget=get_end)
def get_src_size(self):
if self._src_size == None:
if self._alignment == None:
if self._src_size is None:
if self._alignment is None:
raise Exception("component has no src_size")
self._src_size = self._alignment().src_size(self.src)
return self._src_size
def set_src_size(self, src_size):
self._src_size = src_size
src_size = property(fget=get_src_size, fset=set_src_size)
def get_forward_strand_start(self):
if self.strand == '-': return self.src_size - self.end
else: return self.start
if self.strand == '-':
return self.src_size - self.end
else:
return self.start
forward_strand_start = property(fget=get_forward_strand_start)
def get_forward_strand_end(self):
if self.strand == '-': return self.src_size - self.start
else: return self.end
if self.strand == '-':
return self.src_size - self.start
else:
return self.end
forward_strand_end = property(fget=get_forward_strand_end)
def reverse_complement(self):
start = self.src_size - self.end
if self.strand == "+": strand = "-"
else: strand = "+"
if self.strand == "+":
strand = "-"
else:
strand = "+"
comp = [ch for ch in self.text.translate(DNA_COMP)]
comp.reverse()
text = "".join(comp)
......@@ -329,7 +346,7 @@ class Component( object ):
raise ValueError("Range error: %d not in %d-%d" % (pos, start, end))
if not self.index:
self.index = list()
if (self.strand == '-'):
if self.strand == '-':
# nota bene: for - strand self.index[x] maps to one column
# higher than is actually associated with the position; thus
# when slice_by_component() and slice_by_coord() flip the ends,
......@@ -346,13 +363,12 @@ class Component( object ):
x = None
try:
x = self.index[pos - start]
except:
except IndexError:
raise Exception("Error in index.")
return x
def __eq__(self, other):
if other is None or type( other ) != type( self ):
if other is None or not isinstance(other, type(self)):
return False
return (self.src == other.src
and self.start == other.start
......@@ -379,26 +395,48 @@ class Component( object ):
new.index = self.index
return new
def get_reader(format, infile, species_to_lengths=None):
import bx.align.maf, bx.align.axt, bx.align.lav
if format == "maf": return bx.align.maf.Reader( infile, species_to_lengths )
elif format == "axt": return bx.align.axt.Reader( infile, species_to_lengths )
elif format == "lav": return bx.align.lav.Reader( infile )
else: raise ValueError("Unknown alignment format %s" % format)
import bx.align.axt
import bx.align.lav
import bx.align.maf
if format == "maf":
return bx.align.maf.Reader(infile, species_to_lengths)
elif format == "axt":
return bx.align.axt.Reader(infile, species_to_lengths)
elif format == "lav":
return bx.align.lav.Reader(infile)
else:
raise ValueError("Unknown alignment format %s" % format)
def get_writer(format, outfile, attributes={}):
import bx.align.maf, bx.align.axt, bx.align.lav
if format == "maf": return bx.align.maf.Writer( outfile, attributes )
elif format == "axt": return bx.align.axt.Writer( outfile, attributes )
elif format == "lav": return bx.align.lav.Writer( outfile, attributes )
else: raise ValueError("Unknown alignment format %s" % format)
import bx.align.axt
import bx.align.lav
import bx.align.maf
if format == "maf":
return bx.align.maf.Writer(outfile, attributes)
elif format == "axt":
return bx.align.axt.Writer(outfile, attributes)
elif format == "lav":
return bx.align.lav.Writer(outfile, attributes)
else:
raise ValueError("Unknown alignment format %s" % format)
def get_indexed(format, filename, index_filename=None, keep_open=False, species_to_lengths=None):
import bx.align.maf, bx.align.axt, bx.align.lav
if format == "maf": return bx.align.maf.Indexed( filename, index_filename, keep_open, species_to_lengths )
elif format == "axt": return bx.align.axt.Indexed( filename, index_filename, keep_open, species_to_lengths )
elif format == "lav": raise Exception("LAV support for Indexed has not been implemented")
else: raise ValueError("Unknown alignment format %s" % format)
import bx.align.axt
import bx.align.lav
import bx.align.maf
if format == "maf":
return bx.align.maf.Indexed(filename, index_filename, keep_open, species_to_lengths)
elif format == "axt":
return bx.align.axt.Indexed(filename, index_filename, keep_open, species_to_lengths)
elif format == "lav":
raise Exception("LAV support for Indexed has not been implemented")
else:
raise ValueError("Unknown alignment format %s" % format)
def shuffle_columns(a):
"""Randomize the columns of an alignment"""
......@@ -407,22 +445,30 @@ def shuffle_columns( a ):
for c in a.components:
c.text = ''.join([c.text[i] for i in mask])
def src_split(src): # splits src into species,chrom
dot = src.rfind(".")
if dot == -1: return None,src
else: return src[:dot],src[dot+1:]
if dot == -1:
return None, src
else:
return src[:dot], src[dot+1:]
def src_merge(species, chrom, contig=None): # creates src (inverse of src_split)
if species == None: src = chrom
else: src = species + "." + chrom
if contig != None: src += "[%s]" % contig
if species is None:
src = chrom
else:
src = species + "." + chrom
if contig is not None:
src += "[%s]" % contig
return src
# ---- Read C extension if available ---------------------------------------
try:
from ._core import coord_to_col
except:
except ImportError:
def coord_to_col(start, text, pos):
col = 0
while start < pos:
......
......@@ -4,13 +4,18 @@
from __future__ import with_statement
import logging
import re
import os
import re
from collections import namedtuple
from six.moves import cPickle
from ._epo import rem_dash, fastLoadChain, bed_union, cummulative_intervals
from ._epo import ( # noqa: F401
bed_union,
cummulative_intervals,
fastLoadChain,
rem_dash
)
log = logging.getLogger(__name__)
......@@ -33,7 +38,7 @@ class Chain( namedtuple('Chain', 'score tName tSize tStrand tStart tEnd qName qS
:param line: header of a chain (in .chain format)
"""
assert type(line) == str, "this is a factory from string"
assert isinstance(line, str), "this is a factory from string"
line = line.rstrip().split()[1:] # the first component is the keyword "chain"
tup = [t[0](t[1]) for t in zip([int, str, int, str, int, int, str, int, str, int, int, str], line)]
......@@ -80,10 +85,14 @@ class Chain( namedtuple('Chain', 'score tName tSize tStrand tStart tEnd qName qS
# intervals are 0-base, halfo-open => lengths = coordinate difference
while A or B:
if a[1] < b[1]:
T.append(0); Q.append( A[0][0] - a[1] ); S.append( min(a[1], b[1]) - max(a[0], b[0]) )
T.append(0)
Q.append(A[0][0] - a[1])
S.append(min(a[1], b[1]) - max(a[0], b[0]))
a = A.pop(0)
elif b[1] < a[1]:
Q.append(0); T.append( B[0][0] - b[1] ); S.append( min(a[1], b[1]) - max(a[0], b[0]) )
Q.append(0)
T.append(B[0][0] - b[1])
S.append(min(a[1], b[1]) - max(a[0], b[0]))
b = B.pop(0)
elif A and B:
assert 1 > 2, "there are dash columns"
......@@ -94,18 +103,18 @@ class Chain( namedtuple('Chain', 'score tName tSize tStrand tStart tEnd qName qS
tSize = trg_chrom_sizes[trg_comp.chrom]
qSize = qr_chrom_sizes[qr_comp.chrom]
## UCSC coordinates are 0-based, half-open and e! coordinates are 1-base, closed
## chain_start = epo_start - 1 and chain_end = epo_end
# UCSC coordinates are 0-based, half-open and e! coordinates are 1-base, closed
# chain_start = epo_start - 1 and chain_end = epo_end
if qr_comp.strand == '+':
chain = Chain(0,
trg_comp.chrom, tSize, "+",
chain = Chain(
0, trg_comp.chrom, tSize, "+",
(trg_comp.start - 1) + tr_start_correction, trg_comp.end - tr_end_correction,
qr_comp.chrom, qSize, (qr_comp.strand == trg_comp.strand and '+' or '-'),
(qr_comp.start - 1) + qr_start_correction, qr_comp.end - qr_end_correction,
qr_comp.gabid)
else:
chain = Chain(0,
trg_comp.chrom, tSize, "+",
chain = Chain(
0, trg_comp.chrom, tSize, "+",
(trg_comp.start - 1) + tr_start_correction, trg_comp.end - tr_end_correction,
qr_comp.chrom, qSize, (qr_comp.strand == trg_comp.strand and '+' or '-'),
(qr_comp.start - 1) + qr_end_correction, qr_comp.end - qr_start_correction,
......@@ -113,13 +122,14 @@ class Chain( namedtuple('Chain', 'score tName tSize tStrand tStart tEnd qName qS
# strand correction. in UCSC coordinates this is: size - coord
if chain.qStrand == '-':
chain = chain._replace(qEnd = chain.qSize - chain.qStart,
chain = chain._replace(
qEnd=chain.qSize - chain.qStart,
qStart=chain.qSize - chain.qEnd)
assert chain.tEnd - chain.tStart == sum(S) + sum(T), "[%s] %d != %d" % (str(chain),
chain.tEnd - chain.tStart, sum(S) + sum(T))
assert chain.qEnd - chain.qStart == sum(S) + sum(Q), "[%s] %d != %d" % (str(chain),
chain.qEnd - chain.qStart, sum(S) + sum(Q))
assert chain.tEnd - chain.tStart == sum(S) + sum(T), "[%s] %d != %d" % (
str(chain), chain.tEnd - chain.tStart, sum(S) + sum(T))
assert chain.qEnd - chain.qStart == sum(S) + sum(Q), "[%s] %d != %d" % (
str(chain), chain.qEnd - chain.qStart, sum(S) + sum(Q))
return chain, S, T, Q
def slice(self, who):
......@@ -175,14 +185,16 @@ class Chain( namedtuple('Chain', 'score tName tSize tStrand tStart tEnd qName qS
cPickle.dump(data, fd)
return data
class EPOitem(namedtuple('Epo_item', 'species gabid chrom start end strand cigar')):
"this format is how alignments are delivered from e!"
__slots__ = ()
cigar_pattern = re.compile("(\d*)([MD])")
cigar_pattern = re.compile(r"(\d*)([MD])")
def __repr__(self): return str(self)
def __repr__(self):
return str(self)
def __str__(self):
c = self.cigar[:5] + "..." + self.cigar[-5:]
......@@ -198,10 +210,9 @@ class EPOitem(namedtuple('Epo_item', 'species gabid chrom start end strand cigar
chrom = cmp[2]
if not chrom.startswith("chr"):
chrom = "chr%s" % chrom
instance = tuple.__new__(cls,
(cmp[0], cmp[1],
chrom, int(cmp[3]), int(cmp[4]),
{'1' : '+', '-1' : '-'}[cmp[5]], cmp[6]))
instance = tuple.__new__(
cls,
(cmp[0], cmp[1], chrom, int(cmp[3]), int(cmp[4]), {'1': '+', '-1': '-'}[cmp[5]], cmp[6]))
span = instance.end - instance.start + 1
m_num = sum((t[1] == "M" and [t[0]] or [0])[0] for t in instance.cigar_iter(False))
if span != m_num:
......@@ -273,8 +284,6 @@ class EPOitem(namedtuple('Epo_item', 'species gabid chrom start end strand cigar
assert sum(t[0] for t in self.cigar_iter(False) if t[1] == "M") == sum(t[1]-t[0] for t in d)
d_sum = sum(t[1]-t[0] for t in d)
assert self.end - self.start + 1 == d_sum, "[ (%d, %d) = %d ] != %d" % (self.start, self.end,
self.end-self.start+1, d_sum)
assert self.end - self.start + 1 == d_sum, "[ (%d, %d) = %d ] != %d" % (
self.start, self.end, self.end-self.start+1, d_sum)
return d[1:] # clip the (thr, thr) entry
"tests for bx.align.epo"
import unittest, logging, pdb
import pdb
import random
import unittest
import numpy as np
from bx.align._epo import cummulative_intervals, bed_union
from bx.align.epo import *
from bx.align._epo import (
bed_union,
cummulative_intervals,
)
from bx.align.epo import (
Chain,
EPOitem
)
class TestBed(unittest.TestCase):
def setUp(self):
......@@ -17,8 +26,7 @@ class TestBed( unittest.TestCase ):
S.append(random.randint(10, 50))
D.append(random.randint(10, 50))
D[-1] = 0
C = cummulative_intervals(np.array(S, dtype=np.int64),
np.array(D, dtype=np.int64))
C = cummulative_intervals(np.array(S, dtype=np.int64), np.array(D, dtype=np.int64))
for i in range(self.N):
assert C[i, 1] - C[i, 0] == S[i]
for i in range(1, self.N):
......@@ -60,38 +68,31 @@ class TestBed( unittest.TestCase ):
assert U[i, 1] > U[i, 0]
assert U[i, 0] > U[i-1, 1]
cigar_pairs = [
("GGACCTGGAGAGATCAG---------------------------GACTTCAACTGTGTG-------------TCTTAGACTGGG--------AGGGTGTTA",
"AGGCCAGGAGAGATCAGGTAAGTCTTAATTTAATAAAGAGATAGGACCTGAACTGTGTCTAACAATAGGTAATATTAGACTGGGGGAGAGAGAAGACTTTC"),
("TTT--------------------------------------------------------------------------------------------------------------------T",
"CTTGTACCAAGGACAGTACTGGCAGCCTAATTGCTAACACTTTGTGGTGGATTGGTCCACTCAATATTTGTTCCCACCTCTTTTCAGTCCAGTTCTATAAAGGACAGAAAGTTGAAAACT"),
("A-------------------------------------------------ACACTGGACACAGCACTAACACGATTACTTA",
"ACATTTCCCACACTCCCTTGCAGCTAGGTTTCTAGATATAATTTAGATTCCA----------------------------A"),
("TTTGGTCCTCTGGA------CGAGCAGCCAGTGCT---------------------------------------------------------------------------AAAAAAAA",
"T---CATTCTAGCAGGTGCTGCAGCAGCAGGTAGCCCTGGAGCCAACAGTTGTGGCTATGATTCTTGATCATCAGATTTGGCTCAAGTGATGTGTTCCTCTAGCATGCACTTGAGATA"),
("G-----------------------C----------------------------------------------------------------------------------------A",
"GGCCTGCACTGCCAGTAATTTTAACAAATTTTTAGGCACTGAATTCCCTGTATTAAATCTGTTTTCCTTAGCGTAAACAGATCTCTGTTAAATGAAACTAAACCCTGACTGATA"),
("TATT----------------------------------T",
"TCCTTCATTTTATTTCTCCCTTAAAATTTTTTTTATTACT"),
("TAAAAA--A------A------------------------------------------------------------TTTTTTTTTTT",
"T---AATTATTTTGCAGCAGGTCCTTGATAACATATCATCTATAAATATTTCAGCAAGAATCTCTAAAAGGCAAGAACCTCCTTCTT"),
("AAACAA---------------------------------------TT---T",
"AAACAATACCACTGCATCACTATCAAACCCAAAAAATAACAAAAATTGGGT"),
("TCTTAAC---TGCTGAGCCATCCCTCCAGCTCCTGTTTTATTTTTATTATGAAGTAATAATA--ATAG--TAATAATAATGATG",
"TACACTTAATTCTAAAACTTGTTATGAATCATCA----------TTGG--TTTTTTATTGTGAAGAACTAATATAATCAGA--G"),
("ATGATAATGGTATCCTAGCTCAACACCTG-GAGTTCACCCCAACAGTTAACTAA----GTTTGAGGAAGTGTTAACAAGCCTA---ACAAAGAGGACATGCCAATAGCTGACAGAGTCAC",
"A-------CCTCTGCTAGCTCAACTCCTGAGAATCAATTATATAAGCTAGGTCAGTGGTTTTGAGAAAGTATTAGTAGACATTTCTCCAAAGAATACATAAAAATGGCC-A--CAAGTAT")
]
def toCigar(species, id, s):
I = [(0, 0)]
L = [len(_) for _ in s.split("-")]
......@@ -110,7 +111,6 @@ def toCigar(species, id, s):
L = L[start+1:]
if len(L):
I.append((I[-1][1] + len(L), I[-1][1] + len(L)))
#print I[1:]
C = []
for i in range(1, len(I)):
dl = I[i][0] - I[i-1][1]
......@@ -127,9 +127,7 @@ def toCigar(species, id, s):
C.append(dc+mc)
MSUM = sum(i[1]-i[0] for i in I)
start = random.randint(50, 10000)
return "%s\t%d\t1\t%d\t%d\t%d\t%s" % (species, id, start, start+MSUM-1,
random.choice((-1,1)), "".join(C))
return "%s\t%d\t1\t%d\t%d\t%d\t%s" % (species, id, start, start+MSUM-1, random.choice((-1, 1)), "".join(C))
class TestEpo(unittest.TestCase):
......@@ -154,8 +152,6 @@ class TestEpo( unittest.TestCase ):
for (a, b) in self.epo_records:
ca, cb = cigar_pairs[int(a.gabid)]
#if a.strand == '-': ca = ca[::-1]
#if b.strand == '-': cb = cb[::-1]
ch(ca, a.cigar_iter(False))
ch(cb, b.cigar_iter(False))
......@@ -195,7 +191,8 @@ class TestEpo( unittest.TestCase ):
dash_cols = random.randint(0, 10)
tStart = random.randint(0, 1000)
qStart = random.randint(0, 1000)
epo_pair = (EPOitem._strfactory("homo_sapiens\t0\t1\t%d\t%d\t1\t%s" % (tStart, tStart+12-1, "4M2D4M%dD4M" % (dash_cols+3))),
epo_pair = (
EPOitem._strfactory("homo_sapiens\t0\t1\t%d\t%d\t1\t%s" % (tStart, tStart+12-1, "4M2D4M%dD4M" % (dash_cols+3))),
EPOitem._strfactory("mus_musculus\t0\t1\t%d\t%d\t1\t%s" % (qStart, qStart+14-1, "7M%dD7M" % (dash_cols+3))))
chain = Chain._make_from_epo(epo_pair[0], epo_pair[1], {"chr1": 500}, {"chr1": 800})
ti = epo_pair[0].intervals(False)
......@@ -218,10 +215,9 @@ class TestEpo( unittest.TestCase ):
tStart = random.randint(0, 1000)
qStart = random.randint(0, 1000)
epo_pair = (EPOitem._strfactory("homo_sapiens\t0\t1\t%d\t%d\t1\t%s" % (tStart, tStart+tm-1,
"%dD%dM" % (dash_cols+1, tm))),
EPOitem._strfactory("mus_musculus\t0\t1\t%d\t%d\t1\t%s" % (qStart, qStart+qm+1-1,
"M%dD%dM" % (dash_cols+tm-qm, qm))))
epo_pair = (
EPOitem._strfactory("homo_sapiens\t0\t1\t%d\t%d\t1\t%s" % (tStart, tStart+tm-1, "%dD%dM" % (dash_cols+1, tm))),
EPOitem._strfactory("mus_musculus\t0\t1\t%d\t%d\t1\t%s" % (qStart, qStart+qm+1-1, "M%dD%dM" % (dash_cols+tm-qm, qm))))
chain = Chain._make_from_epo(epo_pair[0], epo_pair[1], {"chr1": 500}, {"chr1": 800})
if chain[1][-1] != qm:
pdb.set_trace()
......
This diff is collapsed.
......@@ -3,12 +3,12 @@ Tests for `bx.align.lav`.
"""
import unittest
import sys
import bx.align as align
import bx.align.lav as lav
test_lav = "test_data/lav_tests/apple_orange.lav"
class lavTestCase(unittest.TestCase):
def testReader(self):
......@@ -32,6 +32,7 @@ class lavTestCase(unittest.TestCase):
reader.close()
def check_component(c, src, start, size, strand, src_size, text):
# ..print "\"%s\" == \"%s\"" % (c.src,src)
assert c.src == src, "c.src = %s (expected %s)" % (c.src, src)
......
......@@ -4,15 +4,19 @@ Support for the `MAF`_ multiple sequence alignment format used by `multiz`_.
.. _MAF: http://genome.ucsc.edu/FAQ/FAQformat.html#format5
.. _multiz: http://www.bx.psu.edu/miller_lab/
"""
import itertools
from io import TextIOWrapper
import os
from six import Iterator, StringIO, PY3
from six import (
Iterator,
PY3,
StringIO,
)
from bx import interval_index_file
from bx.align import *
from bx.misc.seekbzip2 import SeekableBzip2File
from bx.align import (
Alignment,
Component
)
MAF_INVERSE_STATUS = 'V'
MAF_INSERT_STATUS = 'I'
......@@ -24,10 +28,12 @@ MAF_MAYBE_NEW_STATUS = 'S'
MAF_MAYBE_NEW_NESTED_STATUS = 's'
MAF_MISSING_STATUS = 'M'
class MAFIndexedAccess(interval_index_file.AbstractIndexedAccess):
"""
Indexed access to a MAF file.
"""
def read_at_current_offset(self, file, **kwargs):
"""
Read the MAF block at the current position in `file` and return an
......@@ -41,28 +47,33 @@ class MAFIndexedAccess( interval_index_file.AbstractIndexedAccess ):
return TextIOWrapper(data, encoding="ascii")
return data
class MAFMultiIndexedAccess(interval_index_file.AbstractMultiIndexedAccess):
"""
Indexed access to multiple MAF files.
"""
indexed_access_class = MAFIndexedAccess
Indexed = MAFIndexedAccess
"""Deprecated: `MAFIndexedAccess` is also available under the name `Indexed`."""
MultiIndexed = MAFMultiIndexedAccess
"""Deprecated: `MAFMultiIndexedAccess` is also available under the name `MultiIndexed`."""
class Reader(Iterator):
"""
Iterate over all maf blocks in a file in order
"""
def __init__(self, file, **kwargs):
self.file = file
self.maf_kwargs = kwargs
# Read and verify maf header, store any attributes
fields = self.file.readline().split()
if fields[0] != '##maf': raise Exception("File does not have MAF header")
if fields[0] != '##maf':
raise Exception("File does not have MAF header")
self.attributes = parse_attributes(fields[1:])
def __next__(self):
......@@ -74,28 +85,36 @@ class Reader( Iterator ):
def close(self):
self.file.close()
class ReaderIter(Iterator):
"""
Adapts a `Reader` to the iterator protocol.
"""
def __init__(self, reader):
self.reader = reader
def __iter__(self):
return self
def __next__(self):
v = next(self.reader)
if not v: raise StopIteration
if not v:
raise StopIteration
return v
class Writer(object):
def __init__(self, file, attributes={}):
self.file = file
# Write header, Webb's maf code wants version first, we accomodate
if 'version' not in attributes: attributes['version'] = 1
if 'version' not in attributes:
attributes['version'] = 1
self.file.write("##maf version=%s" % attributes['version'])
for key in attributes:
if key == 'version': continue
if key == 'version':
continue
self.file.writelines(" %s=%s" % (key, attributes[key]))
self.file.write("\n")
......@@ -127,9 +146,11 @@ class Writer( object ):
# ---- Helper methods -------------------------------------------------------
def from_string(string, **kwargs):
return read_next_maf(StringIO(string), **kwargs)
def read_next_maf(file, species_to_lengths=None, parse_e_rows=False):
"""
Read the next MAF block from `file` and return as an `Alignment`
......@@ -139,9 +160,11 @@ def read_next_maf( file, species_to_lengths=None, parse_e_rows=False ):
alignment = Alignment(species_to_lengths=species_to_lengths)
# Attributes line
line = readline(file, skip_blank=True)
if not line: return None
if not line:
return None
fields = line.split()
if fields[0] != 'a': raise Exception("Expected 'a ...' line")
if fields[0] != 'a':
raise Exception("Expected 'a ...' line")
alignment.attributes = parse_attributes(fields[1:])
if 'score' in alignment.attributes:
alignment.score = alignment.attributes['score']
......@@ -150,11 +173,13 @@ def read_next_maf( file, species_to_lengths=None, parse_e_rows=False ):
alignment.score = 0
# Sequence lines
last_component = None
while 1:
while True:
line = readline(file)
# EOF or Blank line terminates alignment components
if not line or line.isspace(): break
if line.isspace(): break
if not line or line.isspace():
break
if line.isspace():
break
# Parse row
fields = line.split()
if fields[0] == 's':
......@@ -165,7 +190,8 @@ def read_next_maf( file, species_to_lengths=None, parse_e_rows=False ):
component.size = int(fields[3])
component.strand = fields[4]
component.src_size = int(fields[5])
if len(fields) > 6: component.text = fields[6].strip()
if len(fields) > 6:
component.text = fields[6].strip()
# Add to set
alignment.add_component(component)
last_component = component
......@@ -200,15 +226,17 @@ def read_next_maf( file, species_to_lengths=None, parse_e_rows=False ):
return alignment
def readline(file, skip_blank=False):
"""Read a line from provided file, skipping any blank or comment lines"""
while 1:
while True:
line = file.readline()
#print "every line: %r" % line
if not line: return None
if not line:
return None
if line[0] != '#' and not (skip_blank and line.isspace()):
return line
def parse_attributes(fields):
"""Parse list of key=value strings into a dict"""
attributes = {}
......@@ -217,8 +245,10 @@ def parse_attributes( fields ):
attributes[pair[0]] = pair[1]
return attributes
def format_tabular(rows, align=None):
if len( rows ) == 0: return ""
if len(rows) == 0:
return ""
lengths = [len(col) for col in rows[0]]
for row in rows[1:]:
for i in range(0, len(row)):
......@@ -233,4 +263,3 @@ def format_tabular( rows, align=None ):
rval += " "
rval += "\n"
return rval
......@@ -3,8 +3,6 @@ Tests for `bx.align.maf`.
"""
from __future__ import print_function
import sys
import unittest
from six import StringIO
......@@ -59,6 +57,7 @@ s orange 19 61 - 100 AGGGATGCGTT--TCACTGCTATCGTCGTA----TTCAGACTTCG-CTATCT------G
"""
def test_reader():
reader = maf.Reader(StringIO(test_maf))
......@@ -84,6 +83,7 @@ def test_reader():
reader.close()
def test_writer():
val = StringIO()
......@@ -105,8 +105,7 @@ a score=7009
s human_hoxa 100 9 + 1000257 ACA-TTACT
s horse_hoxa 120 10 - 98892 ACAATTGCT
"""
""" # noqa: W291
def test_slice():
......@@ -151,6 +150,7 @@ def test_with_synteny():
check_component(rat, "rn3.chr4", 29161032, 1524, "-", 187371129, None)
assert rat.synteny_empty == maf.MAF_INSERT_STATUS
def test_write_with_synteny():
reader = maf.Reader(StringIO(test_maf_2), parse_e_rows=True)
a = next(reader)
......@@ -177,12 +177,13 @@ e echTel1.scaffold_212365 4641 1430 + 9822 I
e rn3.chr4 29161032 1524 - 187371129 I
e mm7.chr6 28091695 3290 - 149646834 I
"""
""" # noqa: W291
print(actual)
print("---")
print(expected)
assert actual == expected
def check_component(c, src, start, size, strand, src_size, text):
assert c.src == src
assert c.start == start
......
......@@ -3,16 +3,27 @@ Support for scoring alignments using arbitrary scoring matrices, arbitrary
alphabets, and affine gap penalties.
"""
from numpy import *
from numpy import (
float32,
int32,
ones,
zeros
)
class ScoringScheme(object):
# note that gap_open and gap_extend are penalties, which means you should make them positive
def __init__(self, gap_open, gap_extend, default=-100, alphabet1="ACGT", alphabet2=None, gap1="-", gap2=None, text1_range=128, text2_range=None, typecode=int32):
if (text2_range == None): text2_range = text1_range
if (alphabet2 == None): alphabet2 = alphabet1
if (gap2 == None): gap2 = gap1 # (scheme with gap1=gap2=None is legit)
if type(alphabet1) == str: alphabet1 = [ch for ch in alphabet1]
if type(alphabet2) == str: alphabet2 = [ch for ch in alphabet2]
if text2_range is None:
text2_range = text1_range
if alphabet2 is None:
alphabet2 = alphabet1
if gap2 is None:
gap2 = gap1 # (scheme with gap1=gap2=None is legit)
if isinstance(alphabet1, str):
alphabet1 = [ch for ch in alphabet1]
if isinstance(alphabet2, str):
alphabet2 = [ch for ch in alphabet2]
self.table = ones((text1_range, text2_range), typecode)
self.table *= default
self.gap_open = gap_open
......@@ -23,24 +34,33 @@ class ScoringScheme( object ):
self.alphabet2 = alphabet2
# private _set_score and _get_score allow subclasses to override them to
# implement a different underlying table object
def _set_score(self, a_b_pair, val):
(a, b) = a_b_pair
self.table[a, b] = val
def _get_score(self, a_b_pair):
(a, b) = a_b_pair
return self.table[a, b]
def set_score(self, a, b, val, foldcase1=False, foldcase2=False):
self._set_score((a, b), val)
if foldcase1:
aCh = chr(a)
if (aCh.isupper()): aa = ord(aCh.lower())
elif (aCh.islower()): aa = ord(aCh.upper())
else: foldcase1 = False
if (aCh.isupper()):
aa = ord(aCh.lower())
elif (aCh.islower()):
aa = ord(aCh.upper())
else:
foldcase1 = False
if foldcase2:
bCh = chr(b)
if (bCh.isupper()): bb = ord(bCh.lower())
elif (bCh.islower()): bb = ord(bCh.upper())
else: foldcase2 = False
if (bCh.isupper()):
bb = ord(bCh.lower())
elif (bCh.islower()):
bb = ord(bCh.upper())
else:
foldcase2 = False
if foldcase1 and foldcase2:
self._set_score((aa, b), val)
self._set_score((a, bb), val)
......@@ -49,10 +69,13 @@ class ScoringScheme( object ):
self._set_score((aa, b), val)
elif foldcase2:
self._set_score((a, bb), val)
def score_alignment(self, a):
return score_alignment(self, a)
def score_texts(self, text1, text2):
return score_texts(self, text1, text2)
def __str__(self):
isDna1 = "".join(self.alphabet1) == "ACGT"
isDna2 = "".join(self.alphabet2) == "ACGT"
......@@ -61,47 +84,59 @@ class ScoringScheme( object ):
for a in self.alphabet1:
for b in self.alphabet2:
score = self._get_score((ord(a), ord(b)))
if (type(score) == float): s = "%8.6f" % score
else: s = "%s" % score
if (isinstance(score, float)):
s = "%8.6f" % score
else:
s = "%s" % score
if (len(s)+1 > width):
width = len(s)+1
lines = []
line = []
if labelRows:
if isDna1: line.append(" ")
else: line.append(" ")
if isDna1:
line.append(" ")
else:
line.append(" ")
for b in self.alphabet2:
if isDna2: s = b
else: s = "%02X" % ord(b)
if isDna2:
s = b
else:
s = "%02X" % ord(b)
line.append("%*s" % (width, s))
lines.append(("".join(line))+"\n")
for a in self.alphabet1:
line = []
if labelRows:
if isDna1: line.append(a)
else: line.append("%02X" % ord(a))
if isDna1:
line.append(a)
else:
line.append("%02X" % ord(a))
for b in self.alphabet2:
score = self._get_score((ord(a), ord(b)))
if (type(score) == float): s = "%8.6f" % score
else: s = "%s" % score
if (isinstance(score, float)):
s = "%8.6f" % score
else:
s = "%s" % score
line.append("%*s" % (width, s))
lines.append(("".join(line))+"\n")
return "".join(lines)
def read_scoring_scheme(f, gap_open, gap_extend, gap1="-", gap2=None, **kwargs):
"""
Initialize scoring scheme from a file containint a blastz style text blob.
f can be either a file or the name of a file.
"""
close_it = False
if (type(f) == str):
f = file(f,"rt")
if (isinstance(f, str)):
f = open(f, "rt")
close_it = True
ss = build_scoring_scheme("".join([line for line in f]), gap_open, gap_extend, gap1=gap1, gap2=gap2, **kwargs)
if (close_it):
f.close()
return ss
def build_scoring_scheme(s, gap_open, gap_extend, gap1="-", gap2=None, **kwargs):
"""
Initialize scoring scheme from a blastz style text blob, first line
......@@ -133,14 +168,14 @@ def build_scoring_scheme( s, gap_open, gap_extend, gap1="-", gap2=None, **kwargs
for i, line in enumerate(lines):
row_scores = line.split()
if len(row_scores) == len(symbols2): # blastz-style row
if symbols1 == None:
if symbols1 is None:
if len(lines) != len(symbols2):
raise bad_matrix
symbols1 = symbols2
elif (rows_have_syms):
raise bad_matrix
elif len(row_scores) == len(symbols2) + 1: # row starts with symbol
if symbols1 == None:
if symbols1 is None:
symbols1 = []
rows_have_syms = True
a_la_blastz = False
......@@ -169,16 +204,18 @@ def build_scoring_scheme( s, gap_open, gap_extend, gap1="-", gap2=None, **kwargs
foldcase2 = "".join(alphabet2) == "ACGT"
# create appropriately sized matrix
text1_range = text2_range = 128
if ord( max( alphabet1 ) ) >= 128: text1_range = 256
if ord( max( alphabet2 ) ) >= 128: text2_range = 256
if ord(max(alphabet1)) >= 128:
text1_range = 256
if ord(max(alphabet2)) >= 128:
text2_range = 256
typecode = int32
for i, row_scores in enumerate(rows):
for j, score in enumerate(map(int_or_float, row_scores)):
if type( score ) == float:
if isinstance(score, float):
typecode = float32
if type( gap_open ) == float:
if isinstance(gap_open, float):
typecode = float32
if type( gap_extend ) == float:
if isinstance(gap_extend, float):
typecode = float32
ss = ScoringScheme(gap_open, gap_extend, alphabet1=alphabet1, alphabet2=alphabet2, gap1=gap1, gap2=gap2, text1_range=text1_range, text2_range=text2_range, typecode=typecode, **kwargs)
# fill matrix
......@@ -195,15 +232,24 @@ def build_scoring_scheme( s, gap_open, gap_extend, gap1="-", gap2=None, **kwargs
ss.set_score(ord(alphabet1[i]), ord(alphabet2[j].lower()), score)
return ss
def int_or_float(s):
try: return int( s )
except: return float( s )
try:
return int(s)
except ValueError:
return float(s)
# convert possible two-char symbol to a single character
def sym_to_char(sym):
if len( sym ) == 1: return sym
elif len( sym ) != 2: raise ValueError
else: return chr(int(sym,base=16))
if len(sym) == 1:
return sym
elif len(sym) != 2:
raise ValueError
else:
return chr(int(sym, base=16))
def score_alignment(scoring_scheme, a):
score = 0
......@@ -213,6 +259,7 @@ def score_alignment( scoring_scheme, a ):
score += score_texts(scoring_scheme, a.components[i].text, a.components[j].text)
return score
def score_texts(scoring_scheme, text1, text2):
rval = 0
last_gap_a = last_gap_b = False
......@@ -242,6 +289,7 @@ def score_texts( scoring_scheme, text1, text2 ):
last_gap_a = last_gap_b = False
return rval
def accumulate_scores(scoring_scheme, text1, text2, skip_ref_gaps=False):
"""
Return cumulative scores for each position in alignment as a 1d array.
......@@ -286,6 +334,7 @@ def accumulate_scores( scoring_scheme, text1, text2, skip_ref_gaps=False ):
pos += 1
return rval
hox70 = build_scoring_scheme(""" A C G T
91 -114 -31 -123
-114 100 -125 -31
......
......@@ -3,7 +3,11 @@ Tests for `bx.align.score`.
"""
import unittest
from numpy import array, cumsum, allclose
from numpy import (
allclose,
array,
cumsum,
)
from six import StringIO
import bx.align.maf
......