disco.worker – Python Worker Interface

In Disco, workers do the brunt of the data processing work. When a disco.job.Job is created, it gets passed a Worker instance, which is responsible for defining the fields used by the disco.job.JobPack. In most cases, you don’t need to define your own Worker subclass in order to run a job. The Worker classes defined in disco will take care of the details of creating the fields necessary for the disco.job.JobPack, and when executed on the nodes, will handle the implementation of the The Disco Worker Protocol.

There is perhaps a subtle, but important, distinction between a worker and a Worker. The former refers to any binary that gets executed on the nodes, specified by jobdict.worker. The latter is a Python class, which handles details of submitting the job on the client side, as well as controlling the execution of user-defined code on the nodes. A Worker can be subclassed trivially to create a new worker, without having to worry about fulfilling many of the requirements for a well-behaving worker. In short, a Worker provides Python library support for a Disco worker. Those wishing to write a worker in a language besides Python may make use of the Worker class for submitting jobs to the master, but generally need to handle the The Disco Worker Protocol in the language used for the worker executable.

The Classic Worker is a subclass of Worker, which implements the classic Disco mapreduce interface.

The following steps illustrate the sequence of events for running a job using a standard Worker:

  1. (client) instantiate a disco.job.Job
    1. if a worker is supplied, use that worker
    2. otherwise, create a worker using disco.job.Job.Worker (the default is disco.worker.classic.worker.Worker)
  2. (client) call disco.job.Job.run()
    1. create a disco.job.JobPack using: Worker.jobdict(), Worker.jobenvs(), Worker.jobhome(), disco.task.jobdata()
    2. submit the disco.job.JobPack to the master
  3. (node) master unpacks the job home

  4. (node) master executes the jobdict.worker with current working directory set to the job home and environment variables set from Job Environment Variables

  5. (node) worker requests the disco.task.Task from the master

  6. (node) worker runs the task and reports the output to the master

class disco.worker.Input(input, task=None, **kwds)

An iterable over one or more Worker inputs, which can gracefully handle corrupted replicas or otherwise failed inputs.

Parameters:open (function) –

a function with the following signature:

def open(url):
    return file

used to open input files.

class disco.worker.MergedInput(input, task=None, **kwds)

Produces an iterator over the minimal head elements of the inputs.

class disco.worker.Output(path_type_label, open=None)

A container for outputs from workers.

Parameters:open (function) –

a function with the following signature:

def open(url):
    return file

used to open new output files.


The path to the underlying output file.


The type of output.


The label for the output (or None).


The underlying output file handle.

class disco.worker.ParallelInput(input, task=None, **kwds)

Produces an iterator over the unordered records in a set of inputs.

Usually require the full set of inputs (i.e. will block with streaming).

class disco.worker.Params(**kwargs)

Classic parameter container for tasks.

This object provides a way to contain custom parameters, or state, in your tasks.

You can specify any number of key, value pairs to the Params. The pairs will be available to task functions through the params argument. Each task receives its own copy of the initial params object.

key must be a valid Python identifier. value can be any Python object.

class disco.worker.SerialInput(input, task=None, **kwds)

Produces an iterator over the records in a list of sequential inputs.

class disco.worker.Worker(**kwargs)

A Worker is a dict subclass, with special methods defined for serializing itself, and possibly reinstantiating itself on the nodes where tasks are run.


The base worker tries to guess which modules are needed automatically, for all of the job functions specified below, if the required_modules parameter is not specified. It sends any local dependencies (i.e. modules not included in the Python standard library) to nodes by default.

If guessing fails, or you have other requirements, see disco.worker.modutil for options.

The Worker base class defines the following parameters:

  • save_results (bool) – whether or not to save the output to Disco Distributed Filesystem.
  • save_info (string) – the information about saving into a DFS.
  • profile (bool) – determines whether run() will be profiled.
  • required_files (list of paths or dict) –

    additional files that are required by the worker. Either a list of paths to files to include, or a dictionary which contains items of the form (filename, filecontents).

    Changed in version 0.4: The worker includes required_files in jobzip(), so they are available relative to the working directory of the worker.

  • required_modules (see How to specify required modules) – required modules to send with the worker.

The path to the worker binary, relative to the job home. Used to set jobdict.worker in jobdict().

Returns:dict of default values for the Worker.
getitem(key, job, jobargs, default=None)
Resolves key in the following order:
  1. jobargs (parameters passed in during disco.job.Job.run())
  2. job (attributes of the disco.job.Job)
  3. self (items in the Worker dict itself)
  4. default
input(task, merged=False, **kwds)
  • task (disco.task.Task) – the task for which to retrieve input.
  • merged (bool) – if specified, returns a MergedInput.
  • kwds (dict) – additional keyword arguments for the Input.

an Input to iterate over the inputs from the master.

jobdict(job, **jobargs)

Creates a basic The Job Dict for the Worker.

Makes use of the following parameters:


The Job Dict dict.

jobenvs(job, **jobargs)
Returns:Job Environment Variables dict.
jobhome(job, **jobargs)
Returns:the job home (serialized).

Calls jobzip() to create the disco.fileutils.DiscoZipFile.

jobzip(job, **jobargs)

A hook provided by the Worker for creating the job home zip. The base implementation creates a minimal zip file containing the Disco standard library, and any user-specified required files and modules.

Returns:a disco.fileutils.DiscoZipFile.
classmethod main()

The main method used to bootstrap the Worker when it is being executed.

It is enough for the module to define:

if __name__ == '__main__':


It is critical that subclasses check if they are executing in the __main__ module, before running main(), as the worker module is also generally imported on the client side.

output(task, label=None, **kwds)
  • task (disco.task.Task) – the task for which to create output.
  • label (int or None) – the label of the output partition to get.
  • kwds (dict) – additional keyword arguments for the Output.

the previously opened Output for label, or if necessary, a newly opened one.

run(task, job, **jobargs)

Called to do the actual work of processing the disco.task.Task. This method runs in the Disco cluster, on a server that is executing one of the tasks in a job submitted by a client.

disco.worker.modutil – Parse and find module dependencies

New in version 0.2.3.

This module provides helper functions to be used with the required_modules parameter in Worker. These functions are needed when your job functions depend on external Python modules and the default value for required_modules does not suffice.

By default, Disco tries to find out which modules are required by job functions automatically. If the found modules are not included in the Python standard library or other package that is installed system-wide, it sends them to nodes so they can be used by the Disco worker process.

Sometimes Disco may fail to detect all required modules. In this case, you can override the default value either by providing a list of requirements manually, or by generating the list semi-automatically using the functions in this module.

How to specify required modules

The required_modules parameter accepts a list of module definitions. A module definition may be either a module name, e.g. "PIL.Image", or a tuple that specifies both the module name and its path, e.g. ("mymodule", "lib/mymodule.py"). In the former case, the disco.worker.classic.worker.Worker only imports the module, assuming that it has been previously installed to the node. In the latter case, Disco sends the module file to nodes before importing it, and no pre-installation is required.

For instance, the following is a valid list for required_modules:

required_modules = ["math", "random", ("mymodule", "lib/mymodule.py")]

This expression imports the standard modules math and random and sends a custom module lib/mymodule.py to nodes before importing it.

Note that Disco sends only the files that can be found in your PYTHONPATH. It is assumed that files outside PYTHONPATH belong either to the Python standard library or to another package that is installed system-wide. Make sure that all modules that require automatic distribution can be found in your PYTHONPATH.

Automatic distribution works only for individual modules and not for packages nor modules that require a specific directory hierarchy. You need to install packages and modules with special requirements manually to your nodes.

Typical use cases

The following list describes some typical use cases for required_modules. The list helps you decide when to use the find_modules() and locate_modules() functions.

  • (default) If you want to find and send all required modules used by your job functions recursively (i.e. also modules that depend on other modules are included), you don’t need to specify required_modules at all. This equals to:

    required_modules = modutil.find_modules(job_functions)

    where job_functions is a list of all job functions: map, map_init, combiner etc.

  • If you want to find and import all required modules, but not send them, or you want to disable recursive analysis, use find_modules() explicitly with the send_modules and recursive parameters.

  • If you want to send a known set of modules (possible recursively) but you don’t know their paths, use locate_modules().

  • If you want to send a known set of modules. provide a list of (module name, module path) tuples.

  • If you just want to import specific modules, or sub-modules in a pre-installed package (e.g. PIL.Image), provide a list of module names.

Any combinations of the above are allowed. For instance:

required_modules = find_modules([fun_map]) + [("mymodule", "/tmp/mymodule.py"), "time", "random"]

is a valid expression.


exception disco.worker.modutil.ModUtilImportError(error, function)

Error raised when a module can’t be found by disco.worker.modutil.

disco.worker.modutil.find_modules(functions, send_modules=True, job_path=None, recurse=True, exclude=())

Tries to guess and locate modules that are used by functions. Returns a list of required modules as specified in How to specify required modules.

  • functions – The functions to search for required modules.
  • send_modules – If True, a (module name, module path) tuple is returned for each required local module. If False, only the module name is returned and detected modules are not sent to nodes; this implies recurse=False.
  • recurse – If True, this function includes all modules that are required by functions or any other included modules. In other words, it tries to ensure that all module files required by the job are included. If False, only modules that are directly used by functions are included.
disco.worker.modutil.locate_modules(modules, recurse=True, include_sys=False)

Finds module files corresponding to the module names specified in the list modules.

  • modules – The modules to search for other required modules.
  • recurse – If True, recursively search for local modules that are used in modules.

A module is local if it can be found in your PYTHONPATH. For modules that can be found under system-wide default paths (e.g. /usr/lib/python), just the module name is returned without the corresponding path, so system-wide modules are not distributed to nodes unnecessarily.

This function is used by find_modules() to locate modules used by the specified functions.


Tries to guess which modules are used by function. Returns a list of module names.

This function is used by find_modules() to parse modules used by a function. You can use it to check that all modules used by your functions are detected correctly.

The current heuristic requires that modules are accessed using the dot notation directly, e.g. random.uniform(1, 10). For instance, required modules are not detected correctly in the following snippet:

a = random
a.uniform(1, 10)

Also, modules used in generator expressions, like here:

return ((k, base64.encodestring(v)) for k, v in d.iteritems())

are not detected correctly.

disco.worker.task_io – I/O Utility functions for Disco tasks

class disco.worker.task_io.InputStream

A file-like object returned by the map_input_stream or reduce_input_stream chain of input_stream() functions. Used either to read bytes from the input source or to iterate through input entries.


Reads at most num_bytes from the input source, or until EOF if num_bytes is not specified.

class disco.worker.task_io.OutputStream

A file-like object returned by the map_output_stream or reduce_output_stream chain of output_stream() functions. Used to encode key, value pairs add write them to the underlying file object.

add(key, value)

Adds a key, value pair to the output stream.


Close the output stream.


The path on the local filesystem (used only for saving output to DDFS).


Deprecated since version 0.3.

Writes data to the underlying file object.

disco.worker.task_io.chain_reader(stream, size, url, ignore_corrupt=False)

Input stream for Disco’s internal compression format.

disco.worker.task_io.disco_input_stream(stream, size, url, ignore_corrupt=False)

Input stream for Disco’s internal compression format.

disco.worker.task_io.disco_output_stream(stream, partition, url, params)

Output stream for Disco’s internal compression format.

disco.worker.task_io.gzip_line_reader(fd, size, url, params)

Yields as many lines from the gzipped fd as possible, prints exception if fails.

disco.worker.task_io.gzip_reader(fd, size, url, params)

Wraps the input in a gzip.GzipFile object.

disco.worker.task_io.input_stream(stream, size, url, params)
  • streamInputStream object
  • size – size of the input (may be None)
  • url – url of the input

Returns a triplet (InputStream, size, url) that is passed to the next input_stream function in the chain. The last disco.func.InputStream object returned by the chain is used to iterate through input entries.

Using an input_stream() allows you to customize how input urls are opened.

Input streams are used for specifying the map_input_stream, map_reader, reduce_input_stream, and reduce_reader parameters for the disco.worker.classic.worker.Worker.

disco.worker.task_io.old_netstr_reader(fd, size, fname, head='')

Reader for Disco’s default/internal key-value format.

Reads output of a map / reduce job as the input for a new job. Specify this function as your map_reader() to use the output of a previous job as input to another job.

disco.worker.task_io.output_stream(stream, partition, url, params)
  • streamOutputStream object
  • partition – partition id
  • url – url of the input

Returns a pair (OutputStream, url) that is passed to the next output_stream function in the chain. The OutputStream.add() method of the last OutputStream object returned by the chain is used to output entries from map or reduce.

Using an output_stream() allows you to customize where and how output is stored. The default should almost always be used.

disco.worker.task_io.re_reader(item_re_str, fd, size, fname, output_tail=False, read_buffer_size=8192)

A map reader that uses an arbitrary regular expression to parse the input stream.

Parameters:item_re_str – regular expression for matching input items

The reader works as follows:

  1. X bytes is read from fd and appended to an internal buffer buf.
  2. m = regexp.match(buf) is executed.
  3. If buf produces a match, m.groups() is yielded, which contains an input entry for the map function. Step 2. is executed for the remaining part of buf. If no match is made, go to step 1.
  4. If fd is exhausted before size bytes have been read, and size tests True, a disco.error.DataError is raised.
  5. When fd is exhausted but buf contains unmatched bytes, two modes are available: If output_tail=True, the remaining buf is yielded as is. Otherwise, a message is sent that warns about trailing bytes. The remaining buf is discarded.

Note that re_reader() fails if the input streams contains unmatched bytes between matched entries. Make sure that your item_re_str is constructed so that it covers all bytes in the input stream.

re_reader() provides an easy way to construct parsers for textual input streams. For instance, the following reader produces full HTML documents as input entries:

def html_reader(fd, size, fname):
    for x in re_reader("<HTML>(.*?)</HTML>", fd, size, fname):
        yield x[0]
disco.worker.task_io.task_input_stream(stream, size, url, params)

An input_stream() which looks at the scheme of url and tries to import a function named input_stream from the module disco.schemes.scheme_SCHEME, where SCHEME is the parsed scheme. If no scheme is found in the url, file is used. The resulting input stream is then used.

disco.worker.task_io.task_output_stream(stream, partition, url, params)

An output_stream() which returns a handle to a task output. The handle ensures that if a task fails, partially written data is ignored.