Source code for pyani.pyani_orm

# -*- coding: utf-8 -*-
# (c) The James Hutton Institute 2018-2019
# (c) The University of Strathclyde 2019-2020
# Author: Leighton Pritchard
#
# Contact:
# leighton.pritchard@strath.ac.uk
#
# Leighton Pritchard,
# Strathclyde Institute of Pharmacy and Biomedical Sciences
# University of Strathclyde
# 161 Cathedral Street
# Glasgow
# Scotland,
# G4 0RE
# UK
#
# The MIT License
#
# Copyright (c) 2018-2019 The James Hutton Institute
# Copyright (c) 2019-2020 The University of Strathclyde
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""Module providing useful functions for manipulating pyani's SQLite3 db.

This SQLAlchemy-based ORM replaces the previous SQL-based module
"""

import logging

from pathlib import Path
from typing import Any, Dict, List, NamedTuple, Optional, Tuple

import numpy as np  # type: ignore
import pandas as pd  # type: ignore

from sqlalchemy import and_  # type: ignore
import sqlalchemy
from sqlalchemy import UniqueConstraint, create_engine, Table
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, Float, Boolean
from sqlalchemy.ext.declarative import declarative_base  # type: ignore
from sqlalchemy.orm import relationship, sessionmaker  # type: ignore

from pyani import PyaniException
from pyani.pyani_files import (
    get_fasta_and_hash_paths,
    load_classes_labels,
    read_fasta_description,
    read_hash_string,
)
from pyani.pyani_tools import get_genome_length


[docs]class PyaniORMException(PyaniException): """Exception raised when ORM or database interaction fails."""
# Using the declarative system # We follow Flask-like naming conventions, so override some of the pyline errors # Mypy doesn't like dynamic base classes, see https://github.com/python/mypy/issues/2477 Base = declarative_base() # type: Any Session = sessionmaker() # pylint: disable=C0103 # Linker table between genomes and runs tables rungenome = Table( # pylint: disable=C0103 "runs_genomes", Base.metadata, Column("genome_id", Integer, ForeignKey("genomes.genome_id")), Column("run_id", Integer, ForeignKey("runs.run_id")), ) # Linker table between comparisons and runs tables runcomparison = Table( # pylint: disable=C0103 "runs_comparisons", Base.metadata, Column("comparison_id", Integer, ForeignKey("comparisons.comparison_id")), Column("run_id", Integer, ForeignKey("runs.run_id")), ) # Convenience struct for labels and classes
[docs]class LabelTuple(NamedTuple): """Label and Class for each file.""" label: str class_label: str
[docs]class Label(Base): """Describes relationship between genome, run and genome label. Each genome and run combination can be assigned a single label """ __tablename__ = "labels" label_id = Column(Integer, primary_key=True) genome_id = Column(Integer, ForeignKey("genomes.genome_id")) run_id = Column(Integer, ForeignKey("runs.run_id")) label = Column(String) class_label = Column(String) genome = relationship("Genome", back_populates="labels") run = relationship("Run", back_populates="labels") def __str__(self) -> str: """Return string representation of Label table row.""" return str( "Genome ID: {}, Run ID: {}, Label ID: {}, Label: {}, Class: {}".format( self.genome_id, self.run_id, self.label_id, self.label, self.class_label ) ) def __repr__(self) -> str: """Return string representation of Label table object.""" return "<Label(key=({}, {}, {}))>".format( self.label_id, self.run_id, self.genome_id )
[docs]class BlastDB(Base): """Describes relationship between genome, run, source BLAST database and query fragments. Each genome and run combination can be assigned a single BLAST database for the comparisons - fragpath path to fragmented genome (query in ANIb) - dbpath path to source genome database (subject in ANIb) - fragsizes JSONified dict of fragment sizes - dbcmd command used to generate database """ __tablename__ = "blastdbs" blastdb_id = Column(Integer, primary_key=True) genome_id = Column(Integer, ForeignKey("genomes.genome_id")) run_id = Column(Integer, ForeignKey("runs.run_id")) fragpath = Column(String) dbpath = Column(String) fragsizes = Column(String) dbcmd = Column(String) genome = relationship("Genome", back_populates="blastdbs") run = relationship("Run", back_populates="blastdbs") def __str__(self) -> str: """Return string representation of BlastDB table row.""" return str( "BlastDB: {}, Run ID: {}, Label ID: {}, Label: {}, Class: {}".format( self.genome_id, self.run_id, self.label_id, self.label, self.class_label ) ) def __repr__(self) -> str: """Return string representation of BlastDB table object.""" return "<BlastDB(key=({}, {}, {}))>".format( self.label_id, self.run_id, self.genome_id )
[docs]class Genome(Base): """Describes an input genome for a pyani run. - genome_id primary key - genome_hash MD5 hash of input genome file (in ``path``) - path path to FASTA genome file - length length of genome (total bases) - description genome description """ __tablename__ = "genomes" __table_args__ = (UniqueConstraint("genome_hash"),) genome_id = Column(Integer, primary_key=True) genome_hash = Column(String) path = Column(String) length = Column(Integer) description = Column(String) labels = relationship("Label", back_populates="genome", lazy="dynamic") blastdbs = relationship("BlastDB", back_populates="genome", lazy="dynamic") runs = relationship( "Run", secondary=rungenome, back_populates="genomes", lazy="dynamic" ) query_comparisons = relationship( "Comparison", back_populates="query", primaryjoin="Genome.genome_id == Comparison.query_id", ) subject_comparisons = relationship( "Comparison", back_populates="subject", primaryjoin="Genome.genome_id == Comparison.subject_id", ) def __str__(self) -> str: """Return string representation of Genome table row.""" return str("Genome {}: {}".format(self.genome_id, self.description)) def __repr__(self) -> str: """Return string representation of Genome table object.""" return "<Genome(id='{}',desc='{}')>".format(self.genome_id, self.description)
[docs]class Run(Base): """Describes a single pyani run.""" __tablename__ = "runs" run_id = Column(Integer, primary_key=True) method = Column(String) cmdline = Column(String) date = Column(DateTime) status = Column(String) name = Column(String) df_identity = Column(String) # JSON-encoded Pandas dataframe df_coverage = Column(String) # JSON-encoded Pandas dataframe df_alnlength = Column(String) # JSON-encoded Pandas dataframe df_simerrors = Column(String) # JSON-encoded Pandas dataframe df_hadamard = Column(String) # JSON-encoded Pandas dataframe genomes = relationship( "Genome", secondary=rungenome, back_populates="runs", lazy="dynamic" ) comparisons = relationship( "Comparison", secondary=runcomparison, back_populates="runs", lazy="dynamic" ) labels = relationship("Label", back_populates="run", lazy="dynamic") blastdbs = relationship("BlastDB", back_populates="run", lazy="dynamic") def __str__(self) -> str: """Return string representation of Run table row.""" return str("Run {}: {} ({})".format(self.run_id, self.name, self.date)) def __repr__(self) -> str: """Return string representation of Run table object.""" return "<Run(run_id={})>".format(self.run_id)
[docs]class Comparison(Base): """Describes a single pairwise comparison between two genomes.""" __tablename__ = "comparisons" __table_args__ = ( UniqueConstraint( "query_id", "subject_id", "program", "version", "fragsize", "maxmatch", "kmersize", "minmatch", ), ) comparison_id = Column(Integer, primary_key=True) query_id = Column(Integer, ForeignKey("genomes.genome_id"), nullable=False) subject_id = Column(Integer, ForeignKey("genomes.genome_id"), nullable=False) aln_length = Column(Integer) # in fastANI this is matchedfrags * fragLength sim_errs = Column(Integer) # in fastANI this is allfrags - matchedfrags identity = Column(Float) cov_query = Column(Float) # in fastANI this is matchedfrags/allfrags cov_subject = Column(Float) # in fastANI this is Null program = Column(String) version = Column(String) fragsize = Column(Integer) # in fastANI this is fragLength maxmatch = Column(Boolean) # in fastANi this is Null kmersize = Column(Integer) minmatch = Column(Float) query = relationship( "Genome", foreign_keys=[query_id], back_populates="query_comparisons" ) subject = relationship( "Genome", foreign_keys=[subject_id], back_populates="subject_comparisons" ) runs = relationship( "Run", secondary=runcomparison, back_populates="comparisons", lazy="dynamic" ) def __str__(self) -> str: """Return string representation of Comparison table row.""" return str( "Query: {}, Subject: {}, %%ID={}, ({} {}), FragSize: {}, MaxMatch: {}, KmerSize: {}, MinMatch: {}".format( self.query_id, self.subject_id, self.identity, self.program, self.version, self.fragsize, self.maxmatch, self.kmersize, self.minmatch, ) ) def __repr__(self) -> str: """Return string representation of Comparison table object.""" return "<Comparison(comparison_id={})>".format(self.comparison_id)
[docs]def create_db(dbpath: Path) -> None: """Create an empty pyani SQLite3 database at the passed path. :param dbpath: path to pyani database """ engine = create_engine("sqlite:///{}".format(dbpath), echo=False) Base.metadata.create_all(engine)
[docs]def get_session(dbpath: Path) -> Any: """Connect to an existing pyani SQLite3 database and return a session. :param dbpath: path to pyani database """ engine = create_engine("sqlite:///{}".format(dbpath), echo=False) Session.configure(bind=engine) return Session()
[docs]def get_comparison_dict(session: Any) -> Dict[Tuple, Any]: """Return a dictionary of comparisons in the session database. :param session: live SQLAlchemy session of pyani database Returns Comparison objects, keyed by (_.query_id, _.subject_id, _.program, _.version, _.fragsize, _.maxmatch) tuple """ return { ( _.query_id, _.subject_id, _.program, _.version, _.fragsize, _.maxmatch, _.kmersize, _.minmatch, ): _ for _ in session.query(Comparison).all() }
[docs]def get_matrix_labels_for_run(session: Any, run_id: int) -> Dict: """Return dictionary of genome labels, keyed by row/column ID. :param session: live SQLAlchemy session :param run_id: the Run.run_id value for matrices The labels should be valid for identity, coverage and other complete matrix results accessed via the .df_* attributes of a run. Labels are returned keyed by the string of the genome ID, for compatibility with matplotlib. """ results = ( session.query(Genome.genome_id, Label.label) .join(rungenome, Run) .join( Label, and_(Genome.genome_id == Label.genome_id, Run.run_id == Label.run_id) ) .filter(Run.run_id == run_id) .all() ) return {str(_.genome_id): _.label for _ in results}
[docs]def get_matrix_classes_for_run(session: Any, run_id: int) -> Dict[str, List]: """Return dictionary of genome classes, keyed by row/column ID. :param session: live SQLAlchemy session :param run_id: the Run.run_id value for matrices The class labels should be valid for identity, coverage and other complete matrix results accessed via the .df_* attributes of a run Labels are returned keyed by the string of the genome ID, for compatibility with matplotlib. """ results = ( session.query(Genome.genome_id, Label.class_label) .join(rungenome, Run) .join( Label, and_(Genome.genome_id == Label.genome_id, Run.run_id == Label.run_id) ) .filter(Run.run_id == run_id) .all() ) return {str(_.genome_id): _.class_label for _ in results}
[docs]def filter_existing_comparisons( session, run, comparisons, program, version, fragsize: Optional[int] = None, maxmatch: Optional[bool] = False, kmersize: Optional[int] = None, minmatch: Optional[float] = None, ) -> List: """Filter list of (Genome, Genome) comparisons for those not in the session db. :param session: live SQLAlchemy session of pyani database :param run: Run object describing parent pyani run :param comparisons: list of (Genome, Genome) query vs subject comparisons :param program: program used for comparison :param version: version of program for comparison :param fragsize: fragment size for BLAST databases :param maxmatch: maxmatch used with nucmer comparison When passed a list of (Genome, Genome) comparisons as comparisons, check whether the comparison exists in the database and, if so, associate it with the passed run. If not, then add the (Genome, Genome) pair to a list for returning as the comparisons that still need to be run. """ logger = logging.getLogger(__name__) existing_comparisons = get_comparison_dict(session) logger.debug("Existing comparisons\n%s", existing_comparisons) comparisons_to_run = [] logger.debug( ( "Checking for existing comparisons, with unique constraints \n" "\tprogram: %s\n" "\tversion: %s\n" "\tfragsize: %s\n" "\tmaxmatch: %s\n" "\tkmersize: %s\n" "\tminmatch: %s\n" ), program, version, fragsize, maxmatch, kmersize, minmatch, ) for (qgenome, sgenome) in comparisons: logger.debug( "Checking for existing comparison: %s (%s) vs %s (%s)", qgenome, qgenome.genome_id, sgenome, sgenome.genome_id, ) try: # Associate run with existing comparisons run.comparisons.append( existing_comparisons[ ( qgenome.genome_id, sgenome.genome_id, program, version, fragsize, maxmatch, kmersize, minmatch, ) ] ) session.commit() except KeyError: comparisons_to_run.append((qgenome, sgenome)) return comparisons_to_run
[docs]def add_run(session, method, cmdline, date, status, name): """Create a new Run and add it to the session. :param session: live SQLAlchemy session of pyani database :param method: string describing analysis run type :param cmdline: string describing pyani command-line for run :param date: datetime object describing analysis start time :param status: string describing status of analysis :param name: string - name given to the analysis run Creates a new Run object with the passed parameters, and returns it. """ try: run = Run(method=method, cmdline=cmdline, date=date, status=status, name=name) except Exception: raise PyaniORMException( f"Could not create {method} run with command line: {cmdline}" ) try: session.add(run) session.commit() except Exception: raise PyaniORMException(f"Could not add run {run} to the database") return run, run.run_id
[docs]def add_run_genomes( session, run, indir: Path, classpath: Path, labelpath: Path, **kwargs ) -> List: """Add genomes for a run to the database. :param session: live SQLAlchemy session of pyani database :param run: Run object describing the parent pyani run :param indir: path to the directory containing genomes :param classpath: path to the file containing class information for each genome :param labelpath: path to the file containing class information for each genome This function expects a single directory (indir) containing all FASTA files for a run, and optional paths to plain text files that contain information on class and label strings for each genome. If the genome already exists in the database, then a Genome object is recovered from the database. Otherwise, a new Genome object is created. All Genome objects will be associated with the passed Run object. The session changes are committed once all genomes and labels are added to the database without error, as a single transaction. """ # Get list of genome files and paths to class and labels files infiles = get_fasta_and_hash_paths(indir) # paired FASTA/hash files class_data = {} # type: Dict[str,str] label_data = {} # type: Dict[str,str] all_keys = [] # type: List[str] if classpath: class_data = load_classes_labels(classpath) all_keys += list(class_data.keys()) if labelpath: label_data = load_classes_labels(labelpath) all_keys += list(label_data.keys()) # Make dictionary of labels and/or classes new_keys = set(all_keys) label_dict = {} # type: Dict for key in new_keys: label_dict[key] = LabelTuple(label_data[key] or "", class_data[key] or "") # Get hash and sequence description for each FASTA/hash pair, and add # to current session database genome_ids = [] for fastafile, hashfile in infiles: try: inhash, _ = read_hash_string(hashfile) indesc = read_fasta_description(fastafile) except Exception: raise PyaniORMException("Could not read genome files for database import") abspath = fastafile.absolute() genome_len = get_genome_length(abspath) # If the genome is not already in the database, add it as a Genome object genome = session.query(Genome).filter(Genome.genome_hash == inhash).first() if not isinstance(genome, Genome): try: genome = Genome( genome_hash=inhash, path=str(abspath), length=genome_len, description=indesc, ) session.add(genome) except Exception: raise PyaniORMException(f"Could not add genome {genome} to database") # Associate this genome with the current run try: genome.runs.append(run) except Exception: raise PyaniORMException( f"Could not associate genome {genome} with run {run}" ) # If there's an associated class or label for the genome, add it if inhash in label_dict: try: session.add( Label( genome=genome, run=run, label=label_dict[inhash].label, class_label=label_dict[inhash].class_label, ) ) except Exception: raise PyaniORMException( f"Could not add labels for {genome} to database." ) genome_ids.append(genome.genome_id) try: session.commit() except Exception: raise PyaniORMException("Could not commit new genomes in database.") return genome_ids
[docs]def update_comparison_matrices(session, run) -> None: """Update the Run table with summary matrices for the analysis. :param session: active pyanidb session via ORM :param run: Run ORM object for the current ANIm run """ # Create dataframes for storing in the Run table # Rows and columns are the (ordered) list of genome IDs genome_ids = sorted([_.genome_id for _ in run.genomes.all()]) df_identity = pd.DataFrame(index=genome_ids, columns=genome_ids, dtype=float) df_coverage = pd.DataFrame(index=genome_ids, columns=genome_ids, dtype=float) df_alnlength = pd.DataFrame(index=genome_ids, columns=genome_ids, dtype=float) df_simerrors = pd.DataFrame(index=genome_ids, columns=genome_ids, dtype=float) df_hadamard = pd.DataFrame(index=genome_ids, columns=genome_ids, dtype=float) # Set appropriate diagonals for each matrix np.fill_diagonal(df_identity.values, 1.0) np.fill_diagonal(df_coverage.values, 1.0) np.fill_diagonal(df_simerrors.values, 1.0) np.fill_diagonal(df_hadamard.values, 1.0) for genome in run.genomes.all(): df_alnlength.loc[genome.genome_id, genome.genome_id] = genome.length # Loop over all comparisons for the run and fill in result matrices for cmp in run.comparisons.all(): qid, sid = cmp.query_id, cmp.subject_id df_identity.loc[qid, sid] = cmp.identity df_coverage.loc[qid, sid] = cmp.cov_query df_alnlength.loc[qid, sid] = cmp.aln_length df_simerrors.loc[qid, sid] = cmp.sim_errs df_hadamard.loc[qid, sid] = cmp.identity * cmp.cov_query if cmp.program in ["nucmer"]: df_hadamard.loc[sid, qid] = cmp.identity * cmp.cov_subject df_simerrors.loc[sid, qid] = cmp.sim_errs df_alnlength.loc[sid, qid] = cmp.aln_length df_coverage.loc[sid, qid] = cmp.cov_subject df_identity.loc[sid, qid] = cmp.identity # Add matrices to the database run.df_identity = df_identity.to_json() run.df_coverage = df_coverage.to_json() run.df_alnlength = df_alnlength.to_json() run.df_simerrors = df_simerrors.to_json() run.df_hadamard = df_hadamard.to_json() session.commit()