In Disco, workers do the brunt of the data processing work. When a disco.job.Job is created, it gets passed a Worker instance, which is responsible for defining the fields used by the disco.job.JobPack. In most cases, you don’t need to define your own Worker subclass in order to run a job. The Worker classes defined in disco will take care of the details of creating the fields necessary for the disco.job.JobPack, and when executed on the nodes, will handle the implementation of the The Disco Worker Protocol.
There is perhaps a subtle, but important, distinction between a worker and a Worker. The former refers to any binary that gets executed on the nodes, specified by jobdict.worker. The latter is a Python class, which handles details of submitting the job on the client side, as well as controlling the execution of user-defined code on the nodes. A Worker can be subclassed trivially to create a new worker, without having to worry about fulfilling many of the requirements for a well-behaving worker. In short, a Worker provides Python library support for a Disco worker. Those wishing to write a worker in a language besides Python may make use of the Worker class for submitting jobs to the master, but generally need to handle the The Disco Worker Protocol in the language used for the worker executable.
The Classic Worker is a subclass of Worker, which implements the classic Disco mapreduce interface.
The following steps illustrate the sequence of events for running a job using a standard Worker:
(node) master unpacks the job home
(node) master executes the jobdict.worker with current working directory set to the job home and environment variables set from Job Environment Variables
(node) worker requests the disco.task.Task from the master
(node) worker runs the task and reports the output to the master
An iterable over one or more Worker inputs, which can gracefully handle corrupted replicas or otherwise failed inputs.
Parameters: | open (function) – a function with the following signature: def open(url):
...
return file
used to open input files. |
---|
Produces an iterator over the minimal head elements of the inputs.
A container for outputs from workers.
Parameters: | open (function) – a function with the following signature: def open(url):
...
return file
used to open new output files. |
---|
The path to the underlying output file.
The type of output.
The partition label for the output (or None).
The underlying output file handle.
Produces an iterator over the unordered records in a set of inputs.
Usually require the full set of inputs (i.e. will block with streaming).
Produces an iterator over the records in a list of sequential inputs.
A Worker is a dict subclass, with special methods defined for serializing itself, and possibly reinstantiating itself on the nodes where tasks are run.
The Worker base class defines the following parameters:
Parameters: |
|
---|
The path to the worker binary, relative to the job home. Used to set jobdict.worker in jobdict().
Resolves key in the following order: #. jobargs (parameters passed in during disco.job.Job.run()) #. job (attributes of the disco.job.Job) #. self (items in the Worker dict itself) #. default
Parameters: |
|
---|---|
Returns: | an Input to iterate over the inputs from the master. |
Creates The Job Dict for the Worker.
Makes use of the following parameters, in addition to those defined by the Worker itself:
Parameters: |
|
---|
Uses getitem() to resolve the values of parameters.
Returns: | the job dict. |
---|
Returns: | Job Environment Variables dict. |
---|
Returns: | the job home (serialized). |
---|
Calls jobzip() to create the disco.fileutils.DiscoZipFile.
A hook provided by the Worker for creating the job home zip.
Returns: | a disco.fileutils.DiscoZipFile. |
---|
The main method used to bootstrap the Worker when it is being executed.
It is enough for the module to define:
if __name__ == '__main__':
Worker.main()
Note
It is critical that subclasses check if they are executing in the __main__ module, before running main(), as the worker module is also generally imported on the client side.
Parameters: |
|
---|---|
Returns: | the previously opened Output for partition, or if necessary, a newly opened one. |
Called to do the actual work of processing the disco.task.Task. This method runs in the Disco cluster, on a server that is executing one of the tasks in a job submitted by a client.