pyani.run_sge module

Code to run a set of command-line jobs using SGE/Grid Engine.

For parallelisation on multi-node system, we use some custom code to submit jobs.

pyani.run_sge.build_and_submit_jobs(root_dir: pathlib.Path, jobs: Iterable[T_co], sgeargs: Optional[str] = None) → None[source]

Submit passed iterable of Job objects to SGE.

Parameters:
  • root_dir – root directory for SGE and job output
  • jobs – list of Job objects, describing each job to be submitted
  • sgeargs – str, additional arguments to qsub

This places SGE’s output in the passed root directory

pyani.run_sge.build_directories(root_dir: pathlib.Path) → None[source]

Construct the subdirectories output, stderr, stdout, and jobs.

Parameters:root_dir – path of root directory in which to place output

Subdirectories are created in the passed root directory. These subdirectories have the following roles:

jobs Stores the scripts for each job stderr Stores the stderr output from SGE stdout Stores the stdout output from SGE output Stores output (if the scripts place the output here)
  • root_dir Path to the top-level directory for creation of subdirectories
pyani.run_sge.build_job_scripts(root_dir: pathlib.Path, jobs: List[T]) → None[source]

Construct script for each passed Job in the jobs iterable.

Parameters:
  • root_dir – Path to output directory
  • jobs
pyani.run_sge.build_joblist(jobgraph) → List[T][source]

Return a list of jobs, from a passed jobgraph.

Parameters:jobgraph
pyani.run_sge.compile_jobgroups_from_joblist(joblist: List[T], jgprefix: str, sgegroupsize: int) → List[T][source]

Return list of jobgroups, rather than list of jobs.

Parameters:
  • joblist
  • jgprefix – str, prefix for SGE jobgroup
  • sgegroupsize – int, number of jobs in each SGE jobgroup
pyani.run_sge.extract_submittable_jobs(waiting: List[T]) → List[T][source]

Obtain list of jobs that are able to be submitted from pending list.

Parameters:waiting – list of Job objects
pyani.run_sge.populate_jobset(job: pyani.pyani_jobs.Job, jobset: Set[T], depth: int) → Set[T][source]

Create set of jobs reflecting dependency tree.

Parameters:
  • job
  • jobset
  • depth

The set contains jobs at different depths of the dependency tree, retaining dependencies as strings, not Jobs.

pyani.run_sge.run_dependency_graph(jobgraph, jgprefix: str = 'ANIm_SGE_JG', sgegroupsize: int = 10000, sgeargs: Optional[str] = None) → None[source]

Create and runs SGE scripts for jobs based on passed jobgraph.

Parameters:
  • jobgraph – list of jobs, which may have dependencies.
  • verbose – flag for multiprocessing verbosity
  • jgprefix – a prefix for the submitted jobs, in the scheduler
  • sgegroupsize – the maximum size for an array job submission
  • sgeargs – additional arguments to qsub

The strategy here is to loop over each job in the dependency graph and, because we expect a single main delta-filter (wrapped) job, with a single nucmer dependency for each analysis, we can split the dependency graph into two lists of corresponding jobs, and run the corresponding nucmer jobs before the delta-filter jobs.

pyani.run_sge.split_seq(iterable: Iterable[T_co], size: int) → Generator[T_co, T_contra, V_co][source]

Split a passed iterable into chunks of a given size.

Parameters:
  • iterable – iterable
  • size – int, number of items to retun in each chunk
pyani.run_sge.submit_jobs(root_dir: pathlib.Path, jobs: Iterable[T_co], sgeargs: Optional[str] = None) → None[source]

Submit passed jobs to SGE server with passed directory as root.

Parameters:
  • root_dir – path to output directory
  • jobs – list of Job objects
  • sgeargs – str, additional arguments for qsub
pyani.run_sge.submit_safe_jobs(root_dir: pathlib.Path, jobs: Iterable[T_co], sgeargs: Optional[str] = None) → None[source]

Submit passed list of jobs to SGE server with dir as root for output.

Parameters:
  • root_dir – path to output directory
  • jobs – iterable of Job objects
  • sgeargs – str, additional arguments for qsub