Source code for pyani.scripts.subcommands.subcmd_fastani

# -*- coding: utf-8 -*-
# (c) The University of Strathclyde 2021–Present
# Author: Bailey Harrington
#
# Contact: bailey.harrington@strath.ac.uk
#
# Bailey Harrington,
# Strathclyde Institute for Pharmacy and Biomedical Sciences,
# Cathedral Street,
# Glasgow,
# G4 0RE,
# Scotland,
# UK
#
# The MIT License
#
# Copyright (c) 2021–Present University of Strathclyde
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

import datetime
import json  # is this needed? [LP: for this, probably not]
import logging
import os

from argparse import (
    Namespace,
)  # unsure what this does [LP: makes it easier to mock CLI stuff; also mypy hints]
from itertools import (
    permutations,
    combinations,
)  # may be specific to anib [LP: yes; ANIb is asymmetrical, but fastANI will take care of what permutations does if you pass a file list; I'd expect you'd only need this when passing jobs out via SLURM/SGE]
from pathlib import Path, PosixPath
from typing import List, Tuple, NamedTuple
from Bio import SeqIO  # unsure what this does [LP: bioinformatics file format IO]
from tqdm import tqdm  # unsure what this does [LP: progress bars]

from pyani import (
    fastani,
    pyani_jobs,
    run_sge,
    pyani_config,
    run_multiprocessing as run_mp,
)
from pyani.pyani_files import collect_existing_output
from pyani.pyani_orm import (
    Comparison,
    PyaniException,
    PyaniORMException,
    add_run,
    add_run_genomes,
    filter_existing_comparisons,
    get_session,
    update_comparison_matrices,
)
from pyani.pyani_tools import termcolor


# Convenience struct describing a pairwise comparison job for the SQLAlchemy
# implementation
[docs]class ComparisonJob(NamedTuple): """Pairwise comparison job for the SQLAlchemy implementation""" query: str ref: str fastcmd: str outfile: Path fragLen: int kmerSize: int minFraction: float job: pyani_jobs.Job
# Convenience struct describing an analysis run
[docs]class RunData(NamedTuple): """Convenience struct describing an analysis run.""" method: str name: str date: datetime.datetime cmdline: str
[docs]class ComparisonResult(NamedTuple): """Convenience struct for a single fastani comparison result.""" qid: float rid: float aln_length: int sim_errs: int pid: float qlen: int rlen: int qcov: float rcov: float
[docs]def subcmd_fastani(args: Namespace) -> None: """Perform fastANI on all genome files in an input directory. :param args: Namespace, command-line arguments Finds ANI by the fastANI method, as described in Jain et al (2018) Nature Communications 9, 5114. doi:10.1038/s41467-018-07641-9. All FASTA format files (selected by suffix) in the input directory are compared against each other, pairwise, using fastANI (whose path must be provided). For each pairwise comparison, the fastANI .fastani file output is parsed to obtain an alignment length and similarity error countfor the two organisms, as represented by sequences in the FASTA files. These are processed to calculate aligned sequence lengths, average nucleotide identity (ANI) percentages, coverage (aligned percentage of whole genome), and similarity error count for each pairwise comparison. The calculated values are deposited in the SQLite3 database being used for the analysis. For each pairwise comparison, the fastANI output is stored in the output directory for long enough to extract summary information, but for each run the output is gzip compressed. Once all runs are complete, the outputs for each comparison are concatenated into a single gzip archive. """ logger = logging.getLogger(__name__) # announce that we're starting logger.info(termcolor("Running fastANI analysis", "red")) # Get current fastani version logger.info(termcolor("fastANI executable: %s", args.fastani_exe)) fastani_version = fastani.get_version(args.fastani_exe) logger.info(termcolor("FastANI version: %s", "cyan"), fastani_version) # Use provided name, or make new one for this analysis start_time = datetime.datetime.now() name = args.name or "_".join(["FastANI", start_time.isoformat()]) logger.info(termcolor("Analysis name: %s", "cyan"), name) # Connect to existing database (which may be "clean", or have old analyses) logger.debug("Connecting to database %s", args.dbpath) try: session = get_session(args.dbpath) session.echo = True except Exception: # is there a less generic option? logger.error( "Could not connect to database %s; exiting", args.dbpath, exc_info=True ) raise SystemExit(1) # Add information about this run to the database logger.debug("Adding run info to database %s...", args.dbpath) try: run, run_id = add_run( session, method="FastANI", cmdline=args.cmdline, date=start_time, status="started", name=name, ) except PyaniORMException: logger.error("Could not add run to the database; (exiting)", exc_info=True) raise SystemExit(1) logger.debug( "\t...added run ID: %s to the database", run_id ) # this should use the run_id # Identify input files for comparison, and populate the database logger.debug("Adding genomes for run %s to database...", run_id) try: genome_ids = add_run_genomes( session, run, args.indir, args.classes, args.labels ) except PyaniORMException: logger.error( "Could not add genomes to database for run %d; (exiting)", run_id, exc_info=True, ) # this differs from subcmd_anim.py logger.debug("\t...added genome IDs: %s", genome_ids) # Generate command-liens for fastANI analysis logger.info("Generating fastANI command-lines") fastanidir = args.outdir / pyani_config.ALIGNDIR["fastANI"] logger.debug("fastANI output will be written temporarily to %s", fastanidir) # Create output directories. We create the main parent directory (args.outdir), but # also subdirectories for the _________________ logger.debug("Creating output directory %s", fastanidir) try: fastanidir.mkdir(exist_ok=True, parents=True) except IOError: logger.error( "Could not create output directory %s; (exiting)", fastanidir, exc_info=True, ) raise SystemError(1) # Get list of genomes for this analysis from the database logger.info("Compiling genomes for comparison") genomes = run.genomes.all() logger.debug("\tCollected %d genomes for this run", len(genomes)) # Generate all pair permutations of genome IDs as a list of (Genome, Geneme) tuples logger.info( "Compiling pairwise comparisons (this can take time for large datasets)..." ) comparisons = list(permutations(tqdm(genomes, disable=args.disable_tqdm), 2)) logger.info("\t...total pairwise comparisons to be performed: %d", len(comparisons)) # Check for existing comparisons; if one has already been done (for the same # software package, version, and setting) we add the comparison to this run, # but remove it from the list of comparisons to be performed logger.info("Checking database for existing comparison data...") comparisons_to_run = filter_existing_comparisons( session, run, comparisons, "fastANI", fastani_version, fragsize=args.fragLen, # fragsize maxmatch=False, # maxmatch kmersize=args.kmerSize, minmatch=args.minFraction, ) logger.info( "\t...after check, still need to run %d comparisons", len(comparisons_to_run) ) # this is indicating 0 comparisons — need to diagnose # If there are no comparisons to run, update the Run matrices and exit # from this function if not comparisons_to_run: logger.info( termcolor( "All comparison results present in database (skipping comparisons)", "magenta", ) ) logger.info("Updating summary matrices with existing results") update_comparison_matrices(session, run) return # If we are in recovery mode, we are salvaging output from a previous # run, and do not necessarily need to rerun all the jobs. In this case, # we prepare a list of output files we want to recover from the results # in the output directory. if args.recovery: logger.warning("Entering recovery mode...") logger.debug( "\tIn this mode, existing comparison output from %s is reused", args.outdir ) existingfiles = collect_existing_output(args.outdir, "fastani", args) logger.debug( "\tIdentified %d existing output files for reuse", len(existingfiles) ) else: existingfiles = None # in anim this was an empty list; in anib None logger.debug("\tIdentified no existing output files") # Create list of FastANI jobs for each comparison still to be performed logger.info("Creating fastani jobs for fastANI...") job = generate_joblist(comparisons_to_run, existingfiles, args) logger.debug("...created %d fastani jobs", len(job)) # Pass jobs to appropriate scheduler logger.debug("Passing %d jobs to %s...", len(job), args.scheduler) run_fastani_jobs(job, args) logger.info("...jobs complete") # Process output and add results to database # This requires us to drop out of threading/multiprocessing: Python's SQLite3 # interface doesn't allow sharing connecitons and cursors logger.info("Adding comparison results to database...") update_comparison_results(job, run, session, fastani_version, args) update_comparison_matrices(session, run) logger.info("...database updated.")
[docs]def generate_joblist( comparisons: List, # in ANIm: List[Tuple] existingfiles: List, # in ANIm: List[Path] args: Namespace, ) -> List[ComparisonJob]: """Return list of ComparisonJobs :param comparisons: list of (Genome, Genome) tuples for which comparisons are needed :param existing files: list of pre-existing FastANI outputs :param args: Namespace, command-line arguments """ logger = logging.getLogger(__name__) joblist = [] # will hold ComparisonJob structs for idx, (query, ref) in enumerate(tqdm(comparisons, disable=args.disable_tqdm)): # ¶ need to make a thing that produces this fastcmd = fastani.construct_fastani_cmdline( query.path, ref.path, args.outdir, args.fastani_exe, args.fragLen, args.kmerSize, args.minFraction, ) logger.debug("Commands to run:\n\t%s", fastcmd) outfile = fastcmd.split()[6] # ¶ should this be hard-coded??? # ¶ There is likely no need to test for different output files # ¶ The only exception might be something with --matrix outfname = Path(outfile) # ¶ unsure about this suffix logger.debug("Expected output file for db: %s", outfname) # If we're in recovery mode, we don't want to repeat a computational # comparison that already exists, so we check whether the ultimate # output is in the set of existing files and, if not, we add the jobs # TODO: something faster than a list search (dict or set?) # The comparisons collections always gets updated, so that results are # added to the database whether they come from recovery mode or are run # in this call of the script. if args.recovery and outfname.name in existingfiles: logger.debug("Recovering output from %s, not building job", outfname) # Need to track the expected output, but set the job itself to None: joblist.append( ComparisonJob( query, ref, fastcmd, outfname, args.fragLen, args.kmerSize, args.minFraction, None, ) ) else: logger.debug("Building job") # Build jobs fastjob = pyani_jobs.Job("f{args.jobprefix}_{idx:06d}", fastcmd) joblist.append( ComparisonJob( query, ref, fastcmd, outfname, args.fragLen, args.kmerSize, args.minFraction, fastjob, ) ) return joblist
[docs]def run_fastani_jobs(joblist: List[ComparisonJob], args: Namespace) -> None: """Pass fastANI jobs to the scheduler. :param joblist: list of ComparisonJob namedtuples :param args: command-line arguments for the run """ logger = logging.getLogger(__name__) logger.debug("Scheduler: %s", args.scheduler) # ¶ Possibly tie this to the --threads option of fastANI? if args.scheduler == "multiprocessing": logger.info("Running jobs with multiprocessing") if not args.workers: logger.debug("(using maximum number of worker threads)") else: logger.debug("(using %d worker threads, if available)", args.workers) cumval = run_mp.run_dependency_graph( [_.job for _ in joblist], workers=args.workers ) # ¶ Determine if this is needed if cumval > 0: logger.error( "At least one fastANI comparison failed. Please investigate (exiting)" ) raise PyaniException("Multiprocessing run failed in fastANI") logger.info("Multiprocessing run completed without error") elif args.scheduler.lower() == "sge": logger.info("Running jobs with SGE") logger.debug("Setting jobarray group size to %d", args.sgegroupsize) logger.debug("Joblist contains %d jobs", len(joblist)) run_sge.run_dependency_graph( [_.job for _ in joblist], jgprefix=args.jobprefix, sgegroupsize=args.sgegroupsize, sgeargs=args.sgeargs, ) # elif args.scheduler.lower() == "slurm": # logger.info("Running jobs with Slurm") # logger.debug("Setting jobarray group size to...") # logger.debug("Joblist contains %d jobs", len(joblist)) else: logger.error(termcolor("Scheduler %s not recognised", "red"), args.scheduler) raise SystemError(1)
[docs]def update_comparison_results( joblist: List[ComparisonJob], run, session, fastani_version: str, args: Namespace ) -> None: """Update the Comparison table with the completed result set. :param joblist: list of ComparisonJob namedtuples :param run: Run ORM object for the current ANIm run :param session: active pyanidb session via ORM :param fastani_version: version of fastANI used for the comparison :param args: command-line arguments for this run The Comparison table stores individual comparison results, one per row. """ logger = logging.getLogger(__name__) # Add individual results to Comparison table for job in tqdm(joblist, disable=args.disable_tqdm): logger.debug("\t%s vs %s", job.query.description, job.ref.description) # ¶ fastANI allows many runs to share the same outfile; might have to alter this # ¶ May also need to use something other than this Comparison object # ¶ Or add new columns? try: contents = fastani.parse_fastani_file(job.outfile) except fastani.PyaniFastANIException: contents = fastani.ComparisonResult(job.query, job.ref, 0, 0, 0) print(contents) # if len(contents) > 1: # raise ValueError( # "fastANI output file %s has more than one line", job.outfile # ) logger.debug("Parsed fastANI file contents: %s", contents) query, ref, ani, matches, num_frags = contents ani = float(ani) # should be in the range 0–1 aln_length = matches * job.fragLen # should be in bp units sim_errs = ( int(num_frags) * job.fragLen - matches * job.fragLen ) # should be in bp units qcov = ( float(matches) * job.fragLen / job.query.length ) # should be inthe range 0–1 # try: # pid = (1 - sim_errs) / int(aln_length) # except ZeroDivisionError: # aln_length was zero (no alignment) # pid = 0 run.comparisons.append( Comparison( query=job.query, subject=job.ref, aln_length=int(aln_length), sim_errs=int(sim_errs), identity=ani, cov_query=qcov, cov_subject=None, program="fastANI", version=fastani_version, fragsize=job.fragLen, maxmatch=False, kmersize=job.kmerSize, minmatch=job.minFraction, ) ) # Populate db logger.debug("Committing results to database") session.commit()