disco.worker – Python Worker Interface¶
In Disco, workers do the brunt of the data processing work. When a disco.job.Job is created, it gets passed a Worker instance, which is responsible for defining the fields used by the disco.job.JobPack. In most cases, you don’t need to define your own Worker subclass in order to run a job. The Worker classes defined in disco will take care of the details of creating the fields necessary for the disco.job.JobPack, and when executed on the nodes, will handle the implementation of the The Disco Worker Protocol.
There is perhaps a subtle, but important, distinction between a worker and a Worker. The former refers to any binary that gets executed on the nodes, specified by jobdict.worker. The latter is a Python class, which handles details of submitting the job on the client side, as well as controlling the execution of user-defined code on the nodes. A Worker can be subclassed trivially to create a new worker, without having to worry about fulfilling many of the requirements for a well-behaving worker. In short, a Worker provides Python library support for a Disco worker. Those wishing to write a worker in a language besides Python may make use of the Worker class for submitting jobs to the master, but generally need to handle the The Disco Worker Protocol in the language used for the worker executable.
(node) master unpacks the job home
(node) worker requests the disco.task.Task from the master
(node) worker runs the task and reports the output to the master
- class disco.worker.Input(input, task=None, **kwds)¶
An iterable over one or more Worker inputs, which can gracefully handle corrupted replicas or otherwise failed inputs.
Parameters: open (function) –
a function with the following signature:
def open(url): ... return file
used to open input files.
- class disco.worker.MergedInput(input, task=None, **kwds)¶
Produces an iterator over the minimal head elements of the inputs.
- class disco.worker.Output(path_type_label, open=None)¶
A container for outputs from workers.
Parameters: open (function) –
a function with the following signature:
def open(url): ... return file
used to open new output files.
The path to the underlying output file.
The type of output.
The label for the output (or None).
The underlying output file handle.
- class disco.worker.ParallelInput(input, task=None, **kwds)¶
Produces an iterator over the unordered records in a set of inputs.
Usually require the full set of inputs (i.e. will block with streaming).
- class disco.worker.Params(**kwargs)¶
Classic parameter container for tasks.
This object provides a way to contain custom parameters, or state, in your tasks.
You can specify any number of key, value pairs to the Params. The pairs will be available to task functions through the params argument. Each task receives its own copy of the initial params object.
key must be a valid Python identifier. value can be any Python object.
- class disco.worker.SerialInput(input, task=None, **kwds)¶
Produces an iterator over the records in a list of sequential inputs.
- class disco.worker.Worker(**kwargs)¶
The base worker tries to guess which modules are needed automatically, for all of the job functions specified below, if the required_modules parameter is not specified. It sends any local dependencies (i.e. modules not included in the Python standard library) to nodes by default.
If guessing fails, or you have other requirements, see disco.worker.modutil for options.
The Worker base class defines the following parameters:
- save_results (bool) – whether or not to save the output to Disco Distributed Filesystem.
- save_info (string) – the information about saving into a DFS.
- profile (bool) – determines whether run() will be profiled.
- required_files (list of paths or dict) –
additional files that are required by the worker. Either a list of paths to files to include, or a dictionary which contains items of the form (filename, filecontents).
Changed in version 0.4: The worker includes required_files in jobzip(), so they are available relative to the working directory of the worker.
- required_modules (see How to specify required modules) – required modules to send with the worker.
- getitem(key, job, jobargs, default=None)¶
- input(task, merged=False, **kwds)¶
an Input to iterate over the inputs from the master.
- jobdict(job, **jobargs)¶
Makes use of the following parameters:
The Job Dict dict.
- jobhome(job, **jobargs)¶
Returns: the job home (serialized).
Calls jobzip() to create the disco.fileutils.DiscoZipFile.
- jobzip(job, **jobargs)¶
A hook provided by the Worker for creating the job home zip. The base implementation creates a minimal zip file containing the Disco standard library, and any user-specified required files and modules.
Returns: a disco.fileutils.DiscoZipFile.
- classmethod main()¶
The main method used to bootstrap the Worker when it is being executed.
It is enough for the module to define:
if __name__ == '__main__': Worker.main()
It is critical that subclasses check if they are executing in the __main__ module, before running main(), as the worker module is also generally imported on the client side.
- output(task, label=None, **kwds)¶
the previously opened Output for label, or if necessary, a newly opened one.
disco.worker.modutil – Parse and find module dependencies¶
New in version 0.2.3.
This module provides helper functions to be used with the required_modules parameter in Worker. These functions are needed when your job functions depend on external Python modules and the default value for required_modules does not suffice.
By default, Disco tries to find out which modules are required by job functions automatically. If the found modules are not included in the Python standard library or other package that is installed system-wide, it sends them to nodes so they can be used by the Disco worker process.
Sometimes Disco may fail to detect all required modules. In this case, you can override the default value either by providing a list of requirements manually, or by generating the list semi-automatically using the functions in this module.
How to specify required modules¶
The required_modules parameter accepts a list of module definitions. A module definition may be either a module name, e.g. "PIL.Image", or a tuple that specifies both the module name and its path, e.g. ("mymodule", "lib/mymodule.py"). In the former case, the disco.worker.classic.worker.Worker only imports the module, assuming that it has been previously installed to the node. In the latter case, Disco sends the module file to nodes before importing it, and no pre-installation is required.
For instance, the following is a valid list for required_modules:
required_modules = ["math", "random", ("mymodule", "lib/mymodule.py")]
This expression imports the standard modules math and random and sends a custom module lib/mymodule.py to nodes before importing it.
Note that Disco sends only the files that can be found in your PYTHONPATH. It is assumed that files outside PYTHONPATH belong either to the Python standard library or to another package that is installed system-wide. Make sure that all modules that require automatic distribution can be found in your PYTHONPATH.
Automatic distribution works only for individual modules and not for packages nor modules that require a specific directory hierarchy. You need to install packages and modules with special requirements manually to your nodes.
Typical use cases¶
(default) If you want to find and send all required modules used by your job functions recursively (i.e. also modules that depend on other modules are included), you don’t need to specify required_modules at all. This equals to:required_modules = modutil.find_modules(job_functions)
where job_functions is a list of all job functions: map, map_init, combiner etc.
If you want to find and import all required modules, but not send them, or you want to disable recursive analysis, use find_modules() explicitly with the send_modules and recursive parameters.
If you want to send a known set of modules (possible recursively) but you don’t know their paths, use locate_modules().
If you want to send a known set of modules. provide a list of (module name, module path) tuples.
If you just want to import specific modules, or sub-modules in a pre-installed package (e.g. PIL.Image), provide a list of module names.
Any combinations of the above are allowed. For instance:
required_modules = find_modules([fun_map]) + [("mymodule", "/tmp/mymodule.py"), "time", "random"]
is a valid expression.
- exception disco.worker.modutil.ModUtilImportError(error, function)¶
Error raised when a module can’t be found by disco.worker.modutil.
- disco.worker.modutil.find_modules(functions, send_modules=True, job_path=None, recurse=True, exclude=())¶
Tries to guess and locate modules that are used by functions. Returns a list of required modules as specified in How to specify required modules.
- functions – The functions to search for required modules.
- send_modules – If True, a (module name, module path) tuple is returned for each required local module. If False, only the module name is returned and detected modules are not sent to nodes; this implies recurse=False.
- recurse – If True, this function includes all modules that are required by functions or any other included modules. In other words, it tries to ensure that all module files required by the job are included. If False, only modules that are directly used by functions are included.
- disco.worker.modutil.locate_modules(modules, recurse=True, include_sys=False)¶
Finds module files corresponding to the module names specified in the list modules.
- modules – The modules to search for other required modules.
- recurse – If True, recursively search for local modules that are used in modules.
A module is local if it can be found in your PYTHONPATH. For modules that can be found under system-wide default paths (e.g. /usr/lib/python), just the module name is returned without the corresponding path, so system-wide modules are not distributed to nodes unnecessarily.
This function is used by find_modules() to locate modules used by the specified functions.
Tries to guess which modules are used by function. Returns a list of module names.
This function is used by find_modules() to parse modules used by a function. You can use it to check that all modules used by your functions are detected correctly.
The current heuristic requires that modules are accessed using the dot notation directly, e.g. random.uniform(1, 10). For instance, required modules are not detected correctly in the following snippet:
a = random a.uniform(1, 10)
Also, modules used in generator expressions, like here:
return ((k, base64.encodestring(v)) for k, v in d.iteritems())
are not detected correctly.