Source code for pyani.pyani_tools

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# (c) The James Hutton Institute 2013-2019
# (c) The University of Strathclude 2019-2020
# Author: Leighton Pritchard
#
# Contact:
# leighton.pritchard@strath.ac.uk
#
# Leighton Pritchard,
# Strathclyde Institute of Pharmaceutical and Biomedical Sciences
# The University of Strathclyde
# 161 Cathedral Street
# Glasgow
# G4 0RE
# Scotland,
# UK
#
# The MIT License
#
# Copyright (c) 2013-2019 The James Hutton Institute
# (c) The University of Strathclude 2019-2020
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""Code to support pyani."""

import shutil

from logging import Logger
from pathlib import Path
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    Iterator,
    List,
    NamedTuple,
    Optional,
    Tuple,
)

import pandas as pd  # type: ignore

from Bio import SeqIO  # type: ignore

from pyani import pyani_config


[docs]class MatrixData(NamedTuple): """Convenience struct for matrix data returned by ORM.""" name: str data: pd.DataFrame graphic_args: Dict
[docs]class Dependencies(NamedTuple): """Convenience struct for third-party dependency presence.""" blast: Optional[str] legacy_blast: Optional[str] mummer: Optional[str]
# Class to hold ANI dataframe results
[docs]class ANIResults: """Holds ANI dataframe results.""" def __init__(self, labels: List[str], mode: str) -> None: """Initialise with four empty, labelled dataframes. :param labels: :param mode: """ self.alignment_lengths = pd.DataFrame(index=labels, columns=labels, dtype=float) self.similarity_errors = pd.DataFrame( index=labels, columns=labels, dtype=float ).fillna(0) self.percentage_identity = pd.DataFrame( index=labels, columns=labels, dtype=float ).fillna(1.0) self.alignment_coverage = pd.DataFrame( index=labels, columns=labels, dtype=float ).fillna(1.0) self.zero_error = False self.mode = mode
[docs] def add_tot_length( self, qname: str, sname: str, value: float, sym: bool = True ) -> None: """Add a total length value to self.alignment_lengths. :param qname: :param sname: :param value: :param sym: """ self.alignment_lengths.loc[qname, sname] = value if sym: self.alignment_lengths.loc[sname, qname] = value
[docs] def add_sim_errors( self, qname: str, sname: str, value: float, sym: bool = True ) -> None: """Add a similarity error value to self.similarity_errors. :param qname: :param sname: :param value: :param sym: """ self.similarity_errors.loc[qname, sname] = value if sym: self.similarity_errors.loc[sname, qname] = value
[docs] def add_pid(self, qname: str, sname: str, value: float, sym: bool = True) -> None: """Add a percentage identity value to self.percentage_identity. :param qname: :param sname: :param value: :param sym: """ self.percentage_identity.loc[qname, sname] = value if sym: self.percentage_identity.loc[sname, qname] = value
[docs] def add_coverage( self, qname: str, sname: str, qcover: float, scover: Optional[float] = None ) -> None: """Add percentage coverage values to self.alignment_coverage. :param qname: :param sname: :param value: :param sym: """ self.alignment_coverage.loc[qname, sname] = qcover if scover: self.alignment_coverage.loc[sname, qname] = scover
@property def hadamard(self) -> float: """Return Hadamard matrix (identity * coverage).""" return self.percentage_identity * self.alignment_coverage @property def data(self) -> Iterator[Tuple[Any, str]]: """Return list of (dataframe, filestem) tuples.""" stemdict = { "ANIm": pyani_config.ANIM_FILESTEMS, "ANIb": pyani_config.ANIB_FILESTEMS, "ANIblastall": pyani_config.ANIBLASTALL_FILESTEMS, } return zip( ( self.alignment_lengths, self.percentage_identity, self.alignment_coverage, self.similarity_errors, self.hadamard, ), stemdict[self.mode], )
# return [(self.alignment_lengths, "ANIm_alignment_lengths"), # (self.percentage_identity, "ANIm_percentage_identity"), # (self.alignment_coverage, "ANIm_alignment_coverage"), # (self.similarity_errors, "ANIm_similarity_errors"), # (self.hadamard, "ANIm_hadamard")]
[docs]class BLASTfunctions(NamedTuple): """Convenience structure to hold BLAST functions.""" db_func: Callable blastn_func: Callable
[docs]class BLASTexes(NamedTuple): """Convenience structure to hold BLAST executables.""" format_exe: Path blast_exe: Path
# Class to hold/build BLAST commands
[docs]class BLASTcmds: """Class for construction of BLASTN and database formatting commands.""" def __init__( self, funcs: BLASTfunctions, exes: BLASTexes, prefix: str, outdir: Path ) -> None: """Instantiate class. :param funcs: BLASTfunctions, containing functions for this BLAST analysis :param exes: BLASTexes, containing executables for this BLAST analysis :param prefix: str, prefix for outputs from this BLAST analysis :param outdir: Path to output directory for this BLAST analysis """ self.funcs = funcs self.exes = exes self.prefix = prefix self.outdir = outdir
[docs] def build_db_cmd(self, fname: Path) -> str: """Return database format/build command. :param fname: """ return self.funcs.db_func(fname, self.outdir, self.exes.format_exe)[0]
[docs] def get_db_name(self, fname: Path) -> str: """Return database filename. :param fname: """ return self.funcs.db_func(fname, self.outdir, self.exes.format_exe)[1]
[docs] def build_blast_cmd(self, fname: Path, dbname: Path): """Return BLASTN command. :param fname: Path to query file :param dbname: Path to database """ return self.funcs.blastn_func(fname, dbname, self.outdir, self.exes.blast_exe)
# UTILITY FUNCTIONS # ================= # Read sequence annotations in from file
[docs]def get_labels(filename: Path, logger: Optional[Logger] = None) -> Dict: r"""Return dictionary of alternative sequence labels, or None. :param filename: path to file containing tab-separated table of labels :param logger: logging object Input files should be formatted as <hash>\t<key>\t<label>, one pair per line. """ labeldict = {} if filename is not None: if logger: logger.info("Reading labels from %s", filename) with open(filename, "r") as ifh: count = 0 for line in ifh.readlines(): count += 1 try: _, key, label = line.strip().split("\t") except ValueError: if logger: logger.warning("Problem with class file: %s", filename) logger.warning("line %d: %s", count, line.strip()) logger.warning("(skipping line)") continue else: labeldict[key] = label return labeldict
# Return the total length of sequences in a passed FASTA file
[docs]def get_genome_length(filename: Path) -> int: """Return total length of all sequences in a FASTA file. :param filename: path to FASTA file """ with open(filename, "r") as ifh: return sum([len(record) for record in SeqIO.parse(ifh, "fasta")])
# Helper function to label results matrices from Run objects
[docs]def label_results_matrix(matrix: pd.DataFrame, labels: Dict) -> pd.DataFrame: """Return results matrix dataframe with labels. :param matrix: results dataframe deriving from Run object :param labels: dictionary of genome labels labels must be keyed by index/col values from matrix Applies the labels from the dictionary to the dataframe in matrix, and returns the result. """ # The dictionary uses string keys! # Create a label function that produces <label>:<genome_id> # when a label is available; and just Genome_id:<genome_id> when no # label exists label = lambda gen_id: f"{labels.get(str(gen_id), 'Genome_id')}:{gen_id}" matrix.columns = [label(_) for _ in matrix.columns] matrix.index = [label(_) for _ in matrix.index] return matrix
# Helper function that establishes whether dependencies are present # This caches the most recent result
[docs]def has_dependencies() -> Dependencies: """Return NamedTuple indicating if 3rd dependencies are available.""" return Dependencies( shutil.which("blastn"), shutil.which("blastall"), shutil.which("nucmer") )
# Escape a string with terminal formatting codes
[docs]def termcolor( logstr: str, color: Optional[str] = None, bold: Optional[bool] = False ) -> str: """Return the passed logstr, wrapped in terminal colouring.""" # For terminal colouring termcolors = { "BLACK": 0, "RED": 1, "GREEN": 2, "YELLOW": 3, "BLUE": 4, "MAGENTA": 5, "CYAN": 6, "WHITE": 7, } reset = "\033[0m" # Colour the string if isinstance(color, str) and color.upper() in termcolors: logstr = f"\033[1;{30 + termcolors[color.upper()]}m{logstr}{reset}" # Make the string bold if bold is True: logstr = f"\033[1m{logstr}" if not logstr.endswith(reset): logstr += reset return logstr