disco.core – Disco Core Library

The disco.core module provides a high-level interface for communication with the Disco master. It provides functions for submitting new jobs, querying the status of the system, and getting results of jobs.

class disco.core.Disco(master=None, settings=None, proxy=None)

The Disco object provides an interface to the Disco master. It can be used to query the status of the system, or to submit a new job.

Parameters:master (url) – address of the Disco master, e.g. disco://localhost.

See also

Disco.new_job() and disco.job for more on creating jobs.

blacklist(node)

Blacklists node so that tasks are no longer run on it.

New in version 0.2.4.

clean(jobname)

Deletes job metadata.

Deprecated since version 0.4: Use Disco.purge() to delete job results, deleting job metadata only is strongly discouraged.

Note

After the job has been cleaned, there is no way to obtain the result urls from the master. However, no data is actually deleted by Disco.clean(), in contrast to Disco.purge().

events(jobname, offset=0)

Returns an iterator that iterates over job events, ordered by time. It is safe to call this function while the job is running, thus it provides an efficient way to monitor job events continuously. The iterator yields tuples offset, event.

Parameters:offset (int) – skip events that occurred before this offset

New in version 0.2.3.

See also

DISCO_EVENTS for information on how to enable the console output of job events.

jobinfo(jobname)

Returns a dict containing information about the job.

joblist()

Returns a list of jobs and their statuses.

jobpack(jobname)

Return the disco.job.JobPack submitted for the job.

kill(jobname)

Kills the job.

new_job(name, **jobargs)

Submits a new job request to the master using disco.job.Job.

nodeinfo()

Returns a dictionary describing status of the nodes that are managed by this Disco master.

oob_get(jobname, key)

Returns an out-of-band value assigned to key for the job.

OOB data can be stored and retrieved for job tasks using disco.task.Task.get() and disco.task.Task.put().

oob_list(jobname)

Returns all out-of-band keys for the job.

profile_stats(jobname, mode='', stream=<open file '<stdout>', mode 'w'>)

Returns results of job profiling. The Job Dict must have had the profile flag enabled.

Parameters:
  • mode ('map' or 'reduce' or '') – restricts results to the map or reduce phase, or not.
  • stream (file-like object) – alternate output stream. See the pstats.Stats constructor.

The function returns a pstats.Stats object. For instance, you can print out results as follows:

job.profile_stats().sort_stats('cumulative').print_stats()

New in version 0.2.1.

purge(jobname)

Deletes all metadata and data related to the job.

results(jobspec, timeout=2000)

Returns a list of results for a single job or for many concurrently running jobs, depending on the type of jobspec.

Parameters:
  • jobspec (disco.job.Job, string, or list) –

    If a job or job name is provided, return a tuple which looks like:

    status, results
    

    If a list is provided, return two lists: inactive jobs and active jobs. Both the lists contain elements of the following type:

    jobname, (status, results)
    

    where status is one of: 'unknown job', 'dead', 'active', or 'ready'.

  • timeout (int) – wait at most this many milliseconds, for at least one on the jobs to finish.

Using a list of jobs is a more efficient way to wait for multiple jobs to finish. Consider the following example that prints out results as soon as the jobs (initially active) finish:

while active:
  inactive, active = disco.results(jobs)
  for jobname, (status, results) in inactive:
    if status == 'ready':
      for k, v in result_iterator(results):
        print(k, v)
      disco.purge(jobname)

Note how the list of active jobs, active, returned by Disco.results(), can be used as the input to this function as well.

wait(jobname, poll_interval=2, timeout=None, clean=False, show=None)

Block until the job has finished. Returns a list of the result urls.

Parameters:
  • poll_interval (int) – the number of seconds between job status requests.
  • timeout (int or None) – if specified, the number of seconds before returning or raising a disco.JobError.
  • clean (bool) –

    if True, call Disco.clean() when the job has finished.

    Deprecated since version 0.4.

  • show (bool or string) –

    enables console output of job events. The default is provided by DISCO_EVENTS.

    New in version 0.2.3.

whitelist(node)

Whitelists node so that the master may submit tasks to it.

New in version 0.2.4.

disco.core.result_iterator(urls, reader=<function disco_input_stream>, input_stream=(function task_input_stream, ), notifier=<function notifier>, params=None, ddfs=None)

An iterator over records stored in either disco or ddfs.

Parameters: