#!/usr/bin/env python
# -*- coding: utf-8 -*-
# (c) The James Hutton Institute 2013-2019
# (c) The University of Strathclude 2019-2020
# Author: Leighton Pritchard
#
# Contact:
# leighton.pritchard@strath.ac.uk
#
# Leighton Pritchard,
# Strathclyde Institute of Pharmaceutical and Biomedical Sciences
# The University of Strathclyde
# 161 Cathedral Street
# Glasgow
# G4 0RE
# Scotland,
# UK
#
# The MIT License
#
# Copyright (c) 2013-2019 The James Hutton Institute
# (c) The University of Strathclude 2019-2020
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""Code to support pyani."""
import shutil
from logging import Logger
from pathlib import Path
from typing import (
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
NamedTuple,
Optional,
Tuple,
)
import pandas as pd # type: ignore
from Bio import SeqIO # type: ignore
from pyani import pyani_config
[docs]class MatrixData(NamedTuple):
"""Convenience struct for matrix data returned by ORM."""
name: str
data: pd.DataFrame
graphic_args: Dict
[docs]class Dependencies(NamedTuple):
"""Convenience struct for third-party dependency presence."""
blast: Optional[str]
legacy_blast: Optional[str]
mummer: Optional[str]
# Class to hold ANI dataframe results
[docs]class ANIResults:
"""Holds ANI dataframe results."""
def __init__(self, labels: List[str], mode: str) -> None:
"""Initialise with four empty, labelled dataframes.
:param labels:
:param mode:
"""
self.alignment_lengths = pd.DataFrame(index=labels, columns=labels, dtype=float)
self.similarity_errors = pd.DataFrame(
index=labels, columns=labels, dtype=float
).fillna(0)
self.percentage_identity = pd.DataFrame(
index=labels, columns=labels, dtype=float
).fillna(1.0)
self.alignment_coverage = pd.DataFrame(
index=labels, columns=labels, dtype=float
).fillna(1.0)
self.zero_error = False
self.mode = mode
[docs] def add_tot_length(
self, qname: str, sname: str, value: float, sym: bool = True
) -> None:
"""Add a total length value to self.alignment_lengths.
:param qname:
:param sname:
:param value:
:param sym:
"""
self.alignment_lengths.loc[qname, sname] = value
if sym:
self.alignment_lengths.loc[sname, qname] = value
[docs] def add_sim_errors(
self, qname: str, sname: str, value: float, sym: bool = True
) -> None:
"""Add a similarity error value to self.similarity_errors.
:param qname:
:param sname:
:param value:
:param sym:
"""
self.similarity_errors.loc[qname, sname] = value
if sym:
self.similarity_errors.loc[sname, qname] = value
[docs] def add_pid(self, qname: str, sname: str, value: float, sym: bool = True) -> None:
"""Add a percentage identity value to self.percentage_identity.
:param qname:
:param sname:
:param value:
:param sym:
"""
self.percentage_identity.loc[qname, sname] = value
if sym:
self.percentage_identity.loc[sname, qname] = value
[docs] def add_coverage(
self, qname: str, sname: str, qcover: float, scover: Optional[float] = None
) -> None:
"""Add percentage coverage values to self.alignment_coverage.
:param qname:
:param sname:
:param value:
:param sym:
"""
self.alignment_coverage.loc[qname, sname] = qcover
if scover:
self.alignment_coverage.loc[sname, qname] = scover
@property
def hadamard(self) -> float:
"""Return Hadamard matrix (identity * coverage)."""
return self.percentage_identity * self.alignment_coverage
@property
def data(self) -> Iterator[Tuple[Any, str]]:
"""Return list of (dataframe, filestem) tuples."""
stemdict = {
"ANIm": pyani_config.ANIM_FILESTEMS,
"ANIb": pyani_config.ANIB_FILESTEMS,
"ANIblastall": pyani_config.ANIBLASTALL_FILESTEMS,
}
return zip(
(
self.alignment_lengths,
self.percentage_identity,
self.alignment_coverage,
self.similarity_errors,
self.hadamard,
),
stemdict[self.mode],
)
# return [(self.alignment_lengths, "ANIm_alignment_lengths"),
# (self.percentage_identity, "ANIm_percentage_identity"),
# (self.alignment_coverage, "ANIm_alignment_coverage"),
# (self.similarity_errors, "ANIm_similarity_errors"),
# (self.hadamard, "ANIm_hadamard")]
[docs]class BLASTfunctions(NamedTuple):
"""Convenience structure to hold BLAST functions."""
db_func: Callable
blastn_func: Callable
[docs]class BLASTexes(NamedTuple):
"""Convenience structure to hold BLAST executables."""
format_exe: Path
blast_exe: Path
# Class to hold/build BLAST commands
[docs]class BLASTcmds:
"""Class for construction of BLASTN and database formatting commands."""
def __init__(
self, funcs: BLASTfunctions, exes: BLASTexes, prefix: str, outdir: Path
) -> None:
"""Instantiate class.
:param funcs: BLASTfunctions, containing functions for this BLAST analysis
:param exes: BLASTexes, containing executables for this BLAST analysis
:param prefix: str, prefix for outputs from this BLAST analysis
:param outdir: Path to output directory for this BLAST analysis
"""
self.funcs = funcs
self.exes = exes
self.prefix = prefix
self.outdir = outdir
[docs] def build_db_cmd(self, fname: Path) -> str:
"""Return database format/build command.
:param fname:
"""
return self.funcs.db_func(fname, self.outdir, self.exes.format_exe)[0]
[docs] def get_db_name(self, fname: Path) -> str:
"""Return database filename.
:param fname:
"""
return self.funcs.db_func(fname, self.outdir, self.exes.format_exe)[1]
[docs] def build_blast_cmd(self, fname: Path, dbname: Path):
"""Return BLASTN command.
:param fname: Path to query file
:param dbname: Path to database
"""
return self.funcs.blastn_func(fname, dbname, self.outdir, self.exes.blast_exe)
# UTILITY FUNCTIONS
# =================
# Read sequence annotations in from file
[docs]def get_labels(filename: Path, logger: Optional[Logger] = None) -> Dict:
r"""Return dictionary of alternative sequence labels, or None.
:param filename: path to file containing tab-separated table of labels
:param logger: logging object
Input files should be formatted as <hash>\t<key>\t<label>, one pair per line.
"""
labeldict = {}
if filename is not None:
if logger:
logger.info("Reading labels from %s", filename)
with open(filename, "r") as ifh:
count = 0
for line in ifh.readlines():
count += 1
try:
_, key, label = line.strip().split("\t")
except ValueError:
if logger:
logger.warning("Problem with class file: %s", filename)
logger.warning("line %d: %s", count, line.strip())
logger.warning("(skipping line)")
continue
else:
labeldict[key] = label
return labeldict
# Return the total length of sequences in a passed FASTA file
[docs]def get_genome_length(filename: Path) -> int:
"""Return total length of all sequences in a FASTA file.
:param filename: path to FASTA file
"""
with open(filename, "r") as ifh:
return sum([len(record) for record in SeqIO.parse(ifh, "fasta")])
# Helper function to label results matrices from Run objects
[docs]def label_results_matrix(matrix: pd.DataFrame, labels: Dict) -> pd.DataFrame:
"""Return results matrix dataframe with labels.
:param matrix: results dataframe deriving from Run object
:param labels: dictionary of genome labels
labels must be keyed by index/col values from matrix
Applies the labels from the dictionary to the dataframe in
matrix, and returns the result.
"""
# The dictionary uses string keys!
# Create a label function that produces <label>:<genome_id>
# when a label is available; and just Genome_id:<genome_id> when no
# label exists
label = lambda gen_id: f"{labels.get(str(gen_id), 'Genome_id')}:{gen_id}"
matrix.columns = [label(_) for _ in matrix.columns]
matrix.index = [label(_) for _ in matrix.index]
return matrix
# Helper function that establishes whether dependencies are present
# This caches the most recent result
[docs]def has_dependencies() -> Dependencies:
"""Return NamedTuple indicating if 3rd dependencies are available."""
return Dependencies(
shutil.which("blastn"), shutil.which("blastall"), shutil.which("nucmer")
)
# Escape a string with terminal formatting codes
[docs]def termcolor(
logstr: str, color: Optional[str] = None, bold: Optional[bool] = False
) -> str:
"""Return the passed logstr, wrapped in terminal colouring."""
# For terminal colouring
termcolors = {
"BLACK": 0,
"RED": 1,
"GREEN": 2,
"YELLOW": 3,
"BLUE": 4,
"MAGENTA": 5,
"CYAN": 6,
"WHITE": 7,
}
reset = "\033[0m"
# Colour the string
if isinstance(color, str) and color.upper() in termcolors:
logstr = f"\033[1;{30 + termcolors[color.upper()]}m{logstr}{reset}"
# Make the string bold
if bold is True:
logstr = f"\033[1m{logstr}"
if not logstr.endswith(reset):
logstr += reset
return logstr