pyani.pyani_jobs module

Code to manage jobs for pyani.

In order to be a little more consistent behind the scenes for schedulers, and to allow for a fairly hacky approach to scheduing on SGE, a job dependency graph is used.

Commands to be run are stored in Jobs. A Job’s dependency is stored so that the Job will not be executed until its dependency is executed.

When used in ANI analysis, the way jobs are used depends on the scheduler.

With multiprocessing, we place all root jobs in a single pool; then all first-level dependencies will go in a second (dependent) pool that is not run until the first is completed, and so on. It’s not very efficient, but should work equivalently to the original code that handled asynchronous pools directly.

With SGE, the dependencies can be managed independently, and effectively interleaved by the scheduler with no need for pools.

This code is essentially a frozen and cut-down version of pysge (https://github.com/widdowquinn/pysge)

class pyani.pyani_jobs.Job(name: str, command: str, queue: Optional[str] = None)[source]

Bases: object

Individual job to be run, with list of dependencies.

add_dependency(job) → None[source]

Add passed job to the dependency list for this Job.

Parameters:job – Job to be added to the Job’s dependency list

This Job should not execute until all dependent jobs are completed.

remove_dependency(job) → None[source]

Remove passed job from this Job’s dependency list.

Parameters:job – Job to be removed from the Job’s dependency list
wait(interval: float = 0.01) → None[source]

Wait until the job finishes, and poll SGE on its status.

Parameters:interval – float, number of seconds to wait before polling SGE
class pyani.pyani_jobs.JobGroup(name: str, command: str, queue: Optional[str] = None, arguments: Optional[Dict[str, List[Any]]] = None)[source]

Bases: object

Class that stores a group of jobs, permitting parameter sweeps.

add_dependency(job) → None[source]

Add the passed job to the dependency list for this JobGroup.

Parameters:job – Job, job to be added to the JobGroup’s dependency list

This JobGroup should not execute until all dependent jobs are completed

generate_script() → None[source]

Create the SGE script that will run the jobs in the JobGroup.

remove_dependency(job) → None[source]

Remove passed job from this JobGroup’s dependency list.

Parameters:job – Job, job to be removed from the JobGroup’s dependency list
wait(interval: float = 0.01) → None[source]

Wait for a defined period, then poll SGE for job status.

Parameters:interval – int, seconds to wait before polling SGE