disco.worker
– Python Worker Interface¶
In Disco, workers do the brunt of the data processing
work. When a disco.job.Job
is created, it gets passed
a Worker
instance, which is responsible for defining the fields
used by the disco.job.JobPack
. In most cases, you don’t need
to define your own Worker subclass in order to run a job. The Worker
classes defined in disco
will take care of the details of
creating the fields necessary for the disco.job.JobPack
, and
when executed on the nodes, will handle the implementation of
the The Disco Worker Protocol.
There is perhaps a subtle, but important, distinction between
a worker and a Worker
. The former refers to any binary
that gets executed on the nodes, specified by jobdict.worker
.
The latter is a Python class, which handles details of submitting the
job on the client side, as well as controlling the execution of
user-defined code on the nodes. A Worker
can be subclassed
trivially to create a new worker, without having to worry
about fulfilling many of the requirements for a well-behaving worker.
In short, a Worker
provides Python library support for a
Disco worker. Those wishing to write a worker in a language
besides Python may make use of the Worker class for submitting jobs to
the master, but generally need to handle the The Disco Worker Protocol in
the language used for the worker executable.
The Classic Worker
is
a subclass of Worker
, which implements the classic
Disco mapreduce interface.
The following steps illustrate the sequence of events for running
a job using a standard Worker
:
- (client) instantiate a
disco.job.Job
- if a worker is supplied, use that worker
- otherwise, create a worker using
disco.job.Job.Worker
(the default isdisco.worker.classic.worker.Worker
)
- (client) instantiate a
- (client) call
disco.job.Job.run()
- create a
disco.job.JobPack
using:Worker.jobdict()
,Worker.jobenvs()
,Worker.jobhome()
,disco.task.jobdata()
- submit the
disco.job.JobPack
to the master
- create a
- (client) call
(node) master unpacks the job home
(node) master executes the
jobdict.worker
with current working directory set to the job home and environment variables set from Job Environment Variables(node) worker requests the
disco.task.Task
from the master(node) worker runs the task and reports the output to the master
-
class
disco.worker.
Input
(input, task=None, **kwds)¶ An iterable over one or more
Worker
inputs, which can gracefully handle corrupted replicas or otherwise failed inputs.Parameters: open (function) – a function with the following signature:
def open(url): ... return file
used to open input files.
-
class
disco.worker.
MergedInput
(input, task=None, **kwds)¶ Produces an iterator over the minimal head elements of the inputs.
-
class
disco.worker.
Output
(path_type_label, open=None)¶ A container for outputs from
workers
.Parameters: open (function) – a function with the following signature:
def open(url): ... return file
used to open new output files.
-
path
¶ The path to the underlying output file.
-
type
¶ The type of output.
-
label
¶ The label for the output (or None).
-
file
¶ The underlying output file handle.
-
-
class
disco.worker.
ParallelInput
(input, task=None, **kwds)¶ Produces an iterator over the unordered records in a set of inputs.
Usually require the full set of inputs (i.e. will block with streaming).
-
class
disco.worker.
Params
(**kwargs)¶ Classic parameter container for tasks.
This object provides a way to contain custom parameters, or state, in your tasks.
You can specify any number of
key, value
pairs to theParams
. The pairs will be available to task functions through the params argument. Each task receives its own copy of the initial params object.key must be a valid Python identifier. value can be any Python object.
-
class
disco.worker.
SerialInput
(input, task=None, **kwds)¶ Produces an iterator over the records in a list of sequential inputs.
-
class
disco.worker.
Worker
(**kwargs)¶ A
Worker
is adict
subclass, with special methods defined for serializing itself, and possibly reinstantiating itself on the nodes where tasks are run.Note
The base worker tries to guess which modules are needed automatically, for all of the job functions specified below, if the required_modules parameter is not specified. It sends any local dependencies (i.e. modules not included in the Python standard library) to nodes by default.
If guessing fails, or you have other requirements, see
disco.worker.modutil
for options.The
Worker
base class defines the following parameters:Parameters: - save_results (bool) – whether or not to save the output to Disco Distributed Filesystem.
- save_info (string) – the information about saving into a DFS.
- profile (bool) – determines whether
run()
will be profiled. - required_files (list of paths or dict) –
additional files that are required by the worker. Either a list of paths to files to include, or a dictionary which contains items of the form
(filename, filecontents)
.Changed in version 0.4: The worker includes required_files in
jobzip()
, so they are available relative to the working directory of the worker. - required_modules (see How to specify required modules) – required modules to send with the worker.
-
bin
¶ The path to the worker binary, relative to the job home. Used to set
jobdict.worker
injobdict()
.
-
getitem
(key, job, jobargs, default=None)¶ - Resolves
key
in the following order: jobargs
(parameters passed in duringdisco.job.Job.run()
)job
(attributes of thedisco.job.Job
)self
(items in theWorker
dict itself)default
- Resolves
-
input
(task, merged=False, **kwds)¶ Parameters: - task (
disco.task.Task
) – the task for which to retrieve input. - merged (bool) – if specified, returns a
MergedInput
. - kwds (dict) – additional keyword arguments for the
Input
.
Returns: an
Input
to iterate over the inputs from the master.- task (
-
jobdict
(job, **jobargs)¶ Creates a basic The Job Dict for the
Worker
.Makes use of the following parameters:
Parameters: - name (string) – directly sets
jobdict.prefix
. - owner (string) – directly sets
jobdict.owner
. If not specified, usesDISCO_JOB_OWNER
.
Returns: The Job Dict dict.
- name (string) – directly sets
-
jobenvs
(job, **jobargs)¶ Returns: Job Environment Variables dict.
-
jobhome
(job, **jobargs)¶ Returns: the job home (serialized). Calls
jobzip()
to create thedisco.fileutils.DiscoZipFile
.
-
jobzip
(job, **jobargs)¶ A hook provided by the
Worker
for creating the job home zip. The base implementation creates a minimal zip file containing the Disco standard library, and any user-specified required files and modules.Returns: a disco.fileutils.DiscoZipFile
.
-
classmethod
main
()¶ The main method used to bootstrap the
Worker
when it is being executed.It is enough for the module to define:
if __name__ == '__main__': Worker.main()
Note
It is critical that subclasses check if they are executing in the
__main__
module, before runningmain()
, as the worker module is also generally imported on the client side.
-
output
(task, label=None, **kwds)¶ Parameters: - task (
disco.task.Task
) – the task for which to create output. - label (int or None) – the label of the output partition to get.
- kwds (dict) – additional keyword arguments for the
Output
.
Returns: the previously opened
Output
for label, or if necessary, a newly opened one.- task (
-
run
(task, job, **jobargs)¶ Called to do the actual work of processing the
disco.task.Task
. This method runs in the Disco cluster, on a server that is executing one of the tasks in a job submitted by a client.
disco.worker.modutil
– Parse and find module dependencies¶
New in version 0.2.3.
This module provides helper functions to be used with the required_modules
parameter in Worker
.
These functions are needed when
your job functions depend on external Python modules and the default value
for required_modules
does not suffice.
By default, Disco tries to find out which modules are required by job functions automatically. If the found modules are not included in the Python standard library or other package that is installed system-wide, it sends them to nodes so they can be used by the Disco worker process.
Sometimes Disco may fail to detect all required modules. In this case, you can override the default value either by providing a list of requirements manually, or by generating the list semi-automatically using the functions in this module.
How to specify required modules¶
The required_modules
parameter accepts a list of module definitions.
A module definition may be either a module name, e.g. "PIL.Image"
,
or a tuple that specifies both the module name and its path,
e.g. ("mymodule", "lib/mymodule.py")
.
In the former case,
the disco.worker.classic.worker.Worker
only imports the module,
assuming that it has been previously installed to the node.
In the latter case,
Disco sends the module file to nodes before importing it,
and no pre-installation is required.
For instance, the following is a valid list for
required_modules
:
required_modules = ["math", "random", ("mymodule", "lib/mymodule.py")]
This expression imports the standard modules math
and random
and sends
a custom module lib/mymodule.py
to nodes before importing it.
Note that Disco sends only the files that can be found in your PYTHONPATH. It
is assumed that files outside PYTHONPATH
belong either to the Python standard
library or to another package that is installed system-wide. Make sure
that all modules that require automatic distribution can be found in your PYTHONPATH.
Automatic distribution works only for individual modules and not for packages nor modules that require a specific directory hierarchy. You need to install packages and modules with special requirements manually to your nodes.
Typical use cases¶
The following list describes some typical use cases for required_modules
.
The list helps you decide when to use the find_modules()
and
locate_modules()
functions.
(default) If you want to find and send all required modules used by your job functions recursively (i.e. also modules that depend on other modules are included), you don’t need to specify
required_modules
at all. This equals to:required_modules = modutil.find_modules(job_functions)where job_functions is a list of all job functions: map, map_init, combiner etc.
If you want to find and import all required modules, but not send them, or you want to disable recursive analysis, use
find_modules()
explicitly with thesend_modules
andrecursive
parameters.If you want to send a known set of modules (possible recursively) but you don’t know their paths, use
locate_modules()
.If you want to send a known set of modules. provide a list of (module name, module path) tuples.
If you just want to import specific modules, or sub-modules in a pre-installed package (e.g.
PIL.Image
), provide a list of module names.
Any combinations of the above are allowed. For instance:
required_modules = find_modules([fun_map]) + [("mymodule", "/tmp/mymodule.py"), "time", "random"]
is a valid expression.
Functions¶
-
exception
disco.worker.modutil.
ModUtilImportError
(error, function)¶ Error raised when a module can’t be found by
disco.worker.modutil
.
-
disco.worker.modutil.
find_modules
(functions, send_modules=True, job_path=None, recurse=True, exclude=())¶ Tries to guess and locate modules that are used by functions. Returns a list of required modules as specified in How to specify required modules.
Parameters: - functions – The functions to search for required modules.
- send_modules – If
True
, a(module name, module path)
tuple is returned for each required local module. IfFalse
, only the module name is returned and detected modules are not sent to nodes; this impliesrecurse=False
. - recurse – If
True
, this function includes all modules that are required by functions or any other included modules. In other words, it tries to ensure that all module files required by the job are included. IfFalse
, only modules that are directly used by functions are included.
-
disco.worker.modutil.
locate_modules
(modules, recurse=True, include_sys=False)¶ Finds module files corresponding to the module names specified in the list modules.
Parameters: - modules – The modules to search for other required modules.
- recurse – If
True
, recursively search for local modules that are used in modules.
A module is local if it can be found in your
PYTHONPATH
. For modules that can be found under system-wide default paths (e.g./usr/lib/python
), just the module name is returned without the corresponding path, so system-wide modules are not distributed to nodes unnecessarily.This function is used by
find_modules()
to locate modules used by the specified functions.
-
disco.worker.modutil.
parse_function
(function)¶ Tries to guess which modules are used by function. Returns a list of module names.
This function is used by
find_modules()
to parse modules used by a function. You can use it to check that all modules used by your functions are detected correctly.The current heuristic requires that modules are accessed using the dot notation directly, e.g.
random.uniform(1, 10)
. For instance, required modules are not detected correctly in the following snippet:a = random a.uniform(1, 10)
Also, modules used in generator expressions, like here:
return ((k, base64.encodestring(v)) for k, v in d.iteritems())
are not detected correctly.
disco.worker.task_io
– I/O Utility functions for Disco tasks¶
-
class
disco.worker.task_io.
InputStream
¶ A file-like object returned by the
map_input_stream
orreduce_input_stream
chain ofinput_stream()
functions. Used either to read bytes from the input source or to iterate through input entries.-
read
(num_bytes=None)¶ Reads at most num_bytes from the input source, or until EOF if num_bytes is not specified.
-
-
class
disco.worker.task_io.
OutputStream
¶ A file-like object returned by the
map_output_stream
orreduce_output_stream
chain ofoutput_stream()
functions. Used to encode key, value pairs add write them to the underlying file object.-
add
(key, value)¶ Adds a key, value pair to the output stream.
-
close
()¶ Close the output stream.
-
path
¶ The path on the local filesystem (used only for saving output to DDFS).
-
write
(data)¶ Deprecated since version 0.3.
Writes data to the underlying file object.
-
-
disco.worker.task_io.
chain_reader
(stream, size, url, ignore_corrupt=False)¶ Input stream for Disco’s internal compression format.
-
disco.worker.task_io.
disco_input_stream
(stream, size, url, ignore_corrupt=False)¶ Input stream for Disco’s internal compression format.
-
disco.worker.task_io.
disco_output_stream
(stream, partition, url, params)¶ Output stream for Disco’s internal compression format.
-
disco.worker.task_io.
gzip_line_reader
(fd, size, url, params)¶ Yields as many lines from the gzipped fd as possible, prints exception if fails.
-
disco.worker.task_io.
gzip_reader
(fd, size, url, params)¶ Wraps the input in a
gzip.GzipFile
object.
-
disco.worker.task_io.
input_stream
(stream, size, url, params)¶ Parameters: - stream –
InputStream
object - size – size of the input (may be
None
) - url – url of the input
Returns a triplet (
InputStream
, size, url) that is passed to the next input_stream function in the chain. The lastdisco.func.InputStream
object returned by the chain is used to iterate through input entries.Using an
input_stream()
allows you to customize how input urls are opened.Input streams are used for specifying the map_input_stream, map_reader, reduce_input_stream, and reduce_reader parameters for the
disco.worker.classic.worker.Worker
.- stream –
-
disco.worker.task_io.
old_netstr_reader
(fd, size, fname, head='')¶ Reader for Disco’s default/internal key-value format.
Reads output of a map / reduce job as the input for a new job. Specify this function as your
map_reader()
to use the output of a previous job as input to another job.
-
disco.worker.task_io.
output_stream
(stream, partition, url, params)¶ Parameters: - stream –
OutputStream
object - partition – partition id
- url – url of the input
Returns a pair (
OutputStream
, url) that is passed to the next output_stream function in the chain. TheOutputStream.add()
method of the lastOutputStream
object returned by the chain is used to output entries from map or reduce.Using an
output_stream()
allows you to customize where and how output is stored. The default should almost always be used.- stream –
-
disco.worker.task_io.
re_reader
(item_re_str, fd, size, fname, output_tail=False, read_buffer_size=8192)¶ A map reader that uses an arbitrary regular expression to parse the input stream.
Parameters: item_re_str – regular expression for matching input items The reader works as follows:
- X bytes is read from fd and appended to an internal buffer buf.
m = regexp.match(buf)
is executed.- If buf produces a match,
m.groups()
is yielded, which contains an input entry for the map function. Step 2. is executed for the remaining part of buf. If no match is made, go to step 1. - If fd is exhausted before size bytes have been read,
and size tests
True
, adisco.error.DataError
is raised. - When fd is exhausted but buf contains unmatched bytes, two modes are
available: If
output_tail=True
, the remaining buf is yielded as is. Otherwise, a message is sent that warns about trailing bytes. The remaining buf is discarded.
Note that
re_reader()
fails if the input streams contains unmatched bytes between matched entries. Make sure that your item_re_str is constructed so that it covers all bytes in the input stream.re_reader()
provides an easy way to construct parsers for textual input streams. For instance, the following reader produces full HTML documents as input entries:def html_reader(fd, size, fname): for x in re_reader("<HTML>(.*?)</HTML>", fd, size, fname): yield x[0]
-
disco.worker.task_io.
task_input_stream
(stream, size, url, params)¶ An
input_stream()
which looks at the scheme ofurl
and tries to import a function namedinput_stream
from the moduledisco.schemes.scheme_SCHEME
, where SCHEME is the parsed scheme. If no scheme is found in the url,file
is used. The resulting input stream is then used.
-
disco.worker.task_io.
task_output_stream
(stream, partition, url, params)¶ An
output_stream()
which returns a handle to a task output. The handle ensures that if a task fails, partially written data is ignored.