disco.worker – Python Worker Interface

In Disco, workers do the brunt of the data processing work. When a disco.job.Job is created, it gets passed a Worker instance, which is responsible for defining the fields used by the disco.job.JobPack. In most cases, you don’t need to define your own Worker subclass in order to run a job. The Worker classes defined in disco will take care of the details of creating the fields necessary for the disco.job.JobPack, and when executed on the nodes, will handle the implementation of the The Disco Worker Protocol.

There is perhaps a subtle, but important, distinction between a worker and a Worker. The former refers to any binary that gets executed on the nodes, specified by jobdict.worker. The latter is a Python class, which handles details of submitting the job on the client side, as well as controlling the execution of user-defined code on the nodes. A Worker can be subclassed trivially to create a new worker, without having to worry about fulfilling many of the requirements for a well-behaving worker. In short, a Worker provides Python library support for a Disco worker. Those wishing to write a worker in a language besides Python may make use of the Worker class for submitting jobs to the master, but generally need to handle the The Disco Worker Protocol in the language used for the worker executable.

The Classic Worker is a subclass of Worker, which implements the classic Disco mapreduce interface.

The following steps illustrate the sequence of events for running a job using a standard Worker:

  1. (client) instantiate a disco.job.Job
    1. if a worker is supplied, use that worker
    2. otherwise, create a worker using disco.job.Job.Worker (the default is disco.worker.classic.worker.Worker)
  2. (client) call disco.job.Job.run()
    1. create a disco.job.JobPack using: Worker.jobdict(), Worker.jobenvs(), Worker.jobhome(), disco.task.jobdata()
    2. submit the disco.job.JobPack to the master
  3. (node) master unpacks the job home

  4. (node) master executes the jobdict.worker with current working directory set to the job home and environment variables set from Job Environment Variables

  5. (node) worker requests the disco.task.Task from the master

  6. (node) worker runs the task and reports the output to the master

class disco.worker.Input(input, task=None, **kwds)

An iterable over one or more Worker inputs, which can gracefully handle corrupted replicas or otherwise failed inputs.

Parameters:open (function) –

a function with the following signature:

def open(url):
    ...
    return file

used to open input files.

class disco.worker.MergedInput(input, task=None, **kwds)

Produces an iterator over the minimal head elements of the inputs.

class disco.worker.Output(path_type_partition, open=None)

A container for outputs from workers.

Parameters:open (function) –

a function with the following signature:

def open(url):
    ...
    return file

used to open new output files.

path

The path to the underlying output file.

type

The type of output.

partition

The partition label for the output (or None).

file

The underlying output file handle.

class disco.worker.ParallelInput(input, task=None, **kwds)

Produces an iterator over the unordered records in a set of inputs.

Usually require the full set of inputs (i.e. will block with streaming).

class disco.worker.SerialInput(input, task=None, **kwds)

Produces an iterator over the records in a list of sequential inputs.

class disco.worker.Worker(**kwargs)

A Worker is a dict subclass, with special methods defined for serializing itself, and possibly reinstantiating itself on the nodes where tasks are run.

The Worker base class defines the following parameters:

Parameters:
bin

The path to the worker binary, relative to the job home. Used to set jobdict.worker in jobdict().

defaults()
Returns:dict of default values for the Worker.
getitem(key, job, jobargs, default=None)

Resolves key in the following order: #. jobargs (parameters passed in during disco.job.Job.run()) #. job (attributes of the disco.job.Job) #. self (items in the Worker dict itself) #. default

input(task, merged=False, **kwds)
Parameters:
  • task (disco.task.Task) – the task for which to retrieve input.
  • merged (bool) – if specified, returns a MergedInput.
  • kwds (dict) – additional keyword arguments for the Input.
Returns:

an Input to iterate over the inputs from the master.

jobdict(job, **jobargs)

Creates The Job Dict for the Worker.

Makes use of the following parameters, in addition to those defined by the Worker itself:

Parameters:
  • input (list of urls or list of list of urls) –

    used to set jobdict.input. Disco natively handles the following url schemes:

    • http://... - any HTTP address
    • file://... or no scheme - a local file.
      The file must exist on all nodes where the tasks are run. Due to these restrictions, this form has only limited use.
    • tag://... - a tag stored in Disco Distributed Filesystem
    • raw://... - pseudo-address: use the address itself as data.
    • dir://... - used by Disco internally.
    • disco://... - used by Disco internally.

    See also

    disco.schemes.

  • name (string) – directly sets jobdict.prefix.
  • owner (string) – directly sets jobdict.owner. If not specified, uses DISCO_JOB_OWNER.
  • scheduler (dict) – directly sets jobdict.scheduler.

Uses getitem() to resolve the values of parameters.

Returns:the job dict.
jobenvs(job, **jobargs)
Returns:Job Environment Variables dict.
jobhome(job, **jobargs)
Returns:the job home (serialized).

Calls jobzip() to create the disco.fileutils.DiscoZipFile.

jobzip(job, **jobargs)

A hook provided by the Worker for creating the job home zip.

Returns:a disco.fileutils.DiscoZipFile.
classmethod main()

The main method used to bootstrap the Worker when it is being executed.

It is enough for the module to define:

if __name__ == '__main__':
    Worker.main()

Note

It is critical that subclasses check if they are executing in the __main__ module, before running main(), as the worker module is also generally imported on the client side.

output(task, partition=None, **kwds)
Parameters:
  • task (disco.task.Task) – the task for which to create output.
  • partition (string or None) – the label of the output partition to get.
  • kwds (dict) – additional keyword arguments for the Output.
Returns:

the previously opened Output for partition, or if necessary, a newly opened one.

run(task, job, **jobargs)

Called to do the actual work of processing the disco.task.Task. This method runs in the Disco cluster, on a server that is executing one of the tasks in a job submitted by a client.

Read the Docs v: 0.4.5
Versions
latest
0.4.5
Downloads
PDF
HTML
Epub
On Read the Docs
Project Home
Builds

Free document hosting provided by Read the Docs.