pyani.run_sge module¶
Code to run a set of command-line jobs using SGE/Grid Engine.
For parallelisation on multi-node system, we use some custom code to submit jobs.
-
pyani.run_sge.
build_and_submit_jobs
(root_dir: pathlib.Path, jobs: Iterable[T_co], sgeargs: Optional[str] = None) → None[source]¶ Submit passed iterable of Job objects to SGE.
Parameters: - root_dir – root directory for SGE and job output
- jobs – list of Job objects, describing each job to be submitted
- sgeargs – str, additional arguments to qsub
This places SGE’s output in the passed root directory
-
pyani.run_sge.
build_directories
(root_dir: pathlib.Path) → None[source]¶ Construct the subdirectories output, stderr, stdout, and jobs.
Parameters: root_dir – path of root directory in which to place output Subdirectories are created in the passed root directory. These subdirectories have the following roles:
jobs Stores the scripts for each job stderr Stores the stderr output from SGE stdout Stores the stdout output from SGE output Stores output (if the scripts place the output here)- root_dir Path to the top-level directory for creation of subdirectories
-
pyani.run_sge.
build_job_scripts
(root_dir: pathlib.Path, jobs: List[T]) → None[source]¶ Construct script for each passed Job in the jobs iterable.
Parameters: - root_dir – Path to output directory
- jobs –
-
pyani.run_sge.
build_joblist
(jobgraph) → List[T][source]¶ Return a list of jobs, from a passed jobgraph.
Parameters: jobgraph –
-
pyani.run_sge.
compile_jobgroups_from_joblist
(joblist: List[T], jgprefix: str, sgegroupsize: int) → List[T][source]¶ Return list of jobgroups, rather than list of jobs.
Parameters: - joblist –
- jgprefix – str, prefix for SGE jobgroup
- sgegroupsize – int, number of jobs in each SGE jobgroup
-
pyani.run_sge.
extract_submittable_jobs
(waiting: List[T]) → List[T][source]¶ Obtain list of jobs that are able to be submitted from pending list.
Parameters: waiting – list of Job objects
-
pyani.run_sge.
populate_jobset
(job: pyani.pyani_jobs.Job, jobset: Set[T], depth: int) → Set[T][source]¶ Create set of jobs reflecting dependency tree.
Parameters: - job –
- jobset –
- depth –
The set contains jobs at different depths of the dependency tree, retaining dependencies as strings, not Jobs.
-
pyani.run_sge.
run_dependency_graph
(jobgraph, jgprefix: str = 'ANIm_SGE_JG', sgegroupsize: int = 10000, sgeargs: Optional[str] = None) → None[source]¶ Create and runs SGE scripts for jobs based on passed jobgraph.
Parameters: - jobgraph – list of jobs, which may have dependencies.
- verbose – flag for multiprocessing verbosity
- jgprefix – a prefix for the submitted jobs, in the scheduler
- sgegroupsize – the maximum size for an array job submission
- sgeargs – additional arguments to qsub
The strategy here is to loop over each job in the dependency graph and, because we expect a single main delta-filter (wrapped) job, with a single nucmer dependency for each analysis, we can split the dependency graph into two lists of corresponding jobs, and run the corresponding nucmer jobs before the delta-filter jobs.
-
pyani.run_sge.
split_seq
(iterable: Iterable[T_co], size: int) → Generator[T_co, T_contra, V_co][source]¶ Split a passed iterable into chunks of a given size.
Parameters: - iterable – iterable
- size – int, number of items to retun in each chunk
-
pyani.run_sge.
submit_jobs
(root_dir: pathlib.Path, jobs: Iterable[T_co], sgeargs: Optional[str] = None) → None[source]¶ Submit passed jobs to SGE server with passed directory as root.
Parameters: - root_dir – path to output directory
- jobs – list of Job objects
- sgeargs – str, additional arguments for qsub
-
pyani.run_sge.
submit_safe_jobs
(root_dir: pathlib.Path, jobs: Iterable[T_co], sgeargs: Optional[str] = None) → None[source]¶ Submit passed list of jobs to SGE server with dir as root for output.
Parameters: - root_dir – path to output directory
- jobs – iterable of Job objects
- sgeargs – str, additional arguments for qsub