When a Job is constructed using the classic Worker defined in this module, Disco runs the disco.worker.classic.worker module for every job task. This module reconstructs the Worker on the node where it is run, in order to execute the job functions which were used to create it.
Classic Workers resolve all parameters using getitem().
Thus, users can subclass Job as a convenient way to specify fixed parameters. For example, here’s a simple distributed grep from the Disco examples/ directory:
""" An example using Job classes to implement grep. Could be run as follows (assuming this module can be found on sys.path): disco run grep.Grep -P params pattern [tag_url_or_path] ... """ from disco.job import Job from disco.worker.classic.func import nop_map class Grep(Job): map = staticmethod(nop_map) params = r'' @staticmethod def map_reader(fd, size, url, params): import re pattern = re.compile(params) for line in fd: if pattern.match(line): yield url, line
Deprecated since version 0.4: Params objects aren’t generally needed, since entire modules are sent with the Worker, state can be stored as in normal Python.
Classic parameter container for map / reduce tasks.
This object provides a way to contain custom parameters, or state, in your tasks.
You can specify any number of key, value pairs to the Params. The pairs will be available to task functions through the params argument. Each task receives its own copy of the initial params object. key must be a valid Python identifier. value can be any Python object.
A disco.worker.Worker, which additionally supports the following parameters, to maintain the Classic Disco Interface:
Note
The classic worker tries to guess which modules are needed automatically, for all of the job functions specified below, if the required_modules parameter is not specified. It sends any local dependencies (i.e. modules not included in the Python standard library) to nodes by default.
If guessing fails, or you have other requirements, see disco.worker.classic.modutil for options.
Parameters: |
|
---|
Returns hostname of the node that executes the current task.
Returns hostname and port of the disco master.
Returns the jobname for the current task.
For a map task, returns an integer between [0..nr_maps] that identifies the task. This value is mainly useful if you need to generate unique IDs in each map task. There are no guarantees about how ids are assigned for map tasks.
For a reduce task, returns an integer between [0..partitions] that identifies this partition. You can use a custom partitioning function to assign key-value pairs to a particular partition.
A Classic Disco job is specified by one or more job functions. This module defines the interfaces for the job functions, some default values, as well as otherwise useful functions.
A file-like object returned by the map_input_stream or reduce_input_stream chain of input_stream() functions. Used either to read bytes from the input source or to iterate through input entries.
Reads at most num_bytes from the input source, or until EOF if num_bytes is not specified.
A file-like object returned by the map_output_stream or reduce_output_stream chain of output_stream() functions. Used to encode key, value pairs add write them to the underlying file object.
Adds a key, value pair to the output stream.
Close the output stream.
The path on the local filesystem (used only for saving output to DDFS).
Deprecated since version 0.3.
Writes data to the underlying file object.
Input stream for Disco’s internal compression format.
Returns an iterator of (key, value) pairs or None.
Parameters: |
|
---|
This function receives all output from the map() before it is saved to intermediate results. Only the output produced by this function is saved to the results.
After map() has consumed all input entries, combiner is called for the last time with the done flag set to True. This is the last opportunity for the combiner to return something.
Returns hash(key) % nr_partitions.
Input stream for Disco’s internal compression format.
Output stream for Disco’s internal compression format.
Yields as many lines from the gzipped fd as possible, prints exception if fails.
Wraps the input in a gzip.GzipFile object.
Perform some task initialization.
Parameters: | input_iter – an iterator returned by a reader() |
---|
Typically this function is used to initialize some modules in the worker environment (e.g. ctypes.cdll.LoadLibrary()), to initialize some values in params, or to skip unneeded entries in the beginning of the input stream.
Parameters: |
|
---|
Returns a triplet (InputStream, size, url) that is passed to the next input_stream function in the chain. The last disco.func.InputStream object returned by the chain is used to iterate through input entries.
Using an input_stream() allows you to customize how input urls are opened.
Input streams are used for specifying the map_input_stream, map_reader, reduce_input_stream, and reduce_reader parameters for the disco.worker.classic.worker.Worker.
Returns a new partitioning function that partitions keys in the range [min_val:max_val] into equal sized partitions.
The number of partitions is defined by the partitions parameter
Returns an iterable of (key, value) pairs given an entry.
Parameters: |
|
---|
For instance:
def fun_map(e, params):
return [(w, 1) for w in e.split()]
This example takes a line of text as input in e, tokenizes it, and returns a list of words as the output.
The map task can also be an external program. For more information, see disco.worker.classic.external - Classic Disco External Interface.
An input_stream() which looks at the scheme of url and tries to import a function named input_stream from the module disco.schemes.scheme_SCHEME, where SCHEME is the parsed scheme. If no scheme is found in the url, file is used. The resulting input stream is then used.
An output_stream() which returns a handle to a task output. The handle ensures that if a task fails, partially written data is ignored.
No-op map.
This function can be used to yield the results from the input stream.
No-op reduce.
This function can be used to combine results per partition from many map functions to a single result file per partition.
Parameters: | urls (url or list of urls) – a list of urls gives replica locators. |
---|
Reader for Disco’s default/internal key-value format.
Reads output of a map / reduce job as the input for a new job. Specify this function as your map_reader() to use the output of a previous job as input to another job.
Parameters: |
|
---|
Returns a pair (OutputStream, url) that is passed to the next output_stream function in the chain. The OutputStream.add() method of the last OutputStream object returned by the chain is used to output entries from map or reduce.
Using an output_stream() allows you to customize where and how output is stored. The default should almost always be used.
Returns an integer in range(0, nr_partitions).
Parameters: |
|
---|
A map reader that uses an arbitrary regular expression to parse the input stream.
Parameters: | item_re_str – regular expression for matching input items |
---|
The reader works as follows:
- X bytes is read from fd and appended to an internal buffer buf.
- m = regexp.match(buf) is executed.
- If buf produces a match, m.groups() is yielded, which contains an input entry for the map function. Step 2. is executed for the remaining part of buf. If no match is made, go to step 1.
- If fd is exhausted before size bytes have been read, and size tests True, a disco.error.DataError is raised.
- When fd is exhausted but buf contains unmatched bytes, two modes are available: If output_tail=True, the remaining buf is yielded as is. Otherwise, a message is sent that warns about trailing bytes. The remaining buf is discarded.
Note that re_reader() fails if the input streams contains unmatched bytes between matched entries. Make sure that your item_re_str is constructed so that it covers all bytes in the input stream.
re_reader() provides an easy way to construct parsers for textual input streams. For instance, the following reader produces full HTML documents as input entries:
def html_reader(fd, size, fname):
for x in re_reader("<HTML>(.*?)</HTML>", fd, size, fname):
yield x[0]
Takes three parameters, and adds reduced output to an output object.
Parameters: |
|
---|
For instance:
def fun_reduce(iter, out, params):
d = {}
for k, v in iter:
d[k] = d.get(k, 0) + 1
for k, c in d.iteritems():
out.add(k, c)
This example counts how many times each key appears.
The reduce task can also be an external program. For more information, see disco.worker.classic.external - Classic Disco External Interface.
Alternative reduce signature which takes 2 parameters.
Reduce functions with this signature should return an iterator of key, value pairs, which will be implicitly added to the OutputStream.
For instance:
def fun_reduce(iter, params):
from disco.util import kvgroup
for k, vs in kvgroup(sorted(iter)):
yield k, sum(1 for v in vs)
This example counts the number of values for each key.
An input_stream() which looks at the scheme of url and tries to import a function named input_stream from the module disco.schemes.scheme_SCHEME, where SCHEME is the parsed scheme. If no scheme is found in the url, file is used. The resulting input stream is then used.
An output_stream() which returns a handle to a task output. The handle ensures that if a task fails, partially written data is ignored.
Sums the values for each key.
This is a convenience function for performing a basic sum in the combiner.
Sums the values for each key.
This is a convenience function for performing a basic sum in the reduce.
An input_stream() which looks at the scheme of url and tries to import a function named input_stream from the module disco.schemes.scheme_SCHEME, where SCHEME is the parsed scheme. If no scheme is found in the url, file is used. The resulting input stream is then used.
An output_stream() which returns a handle to a task output. The handle ensures that if a task fails, partially written data is ignored.
New in version 0.2.3.
This module provides helper functions to be used with the required_modules parameter in Worker. These functions are needed when your job functions depend on external Python modules and the default value for required_modules does not suffice.
By default, Disco tries to find out which modules are required by job functions automatically. If the found modules are not included in the Python standard library or other package that is installed system-wide, it sends them to nodes so they can be used by the Disco worker process.
Sometimes Disco may fail to detect all required modules. In this case, you can override the default value either by providing a list of requirements manually, or by generating the list semi-automatically using the functions in this module.
The required_modules parameter accepts a list of module definitions. A module definition may be either a module name, e.g. "PIL.Image", or a tuple that specifies both the module name and its path, e.g. ("mymodule", "lib/mymodule.py"). In the former case, the disco.worker.classic.worker.Worker only imports the module, assuming that it has been previously installed to the node. In the latter case, Disco sends the module file to nodes before importing it, and no pre-installation is required.
For instance, the following is a valid list for required_modules:
required_modules = ["math", "random", ("mymodule", "lib/mymodule.py")]
This expression imports the standard modules math and random and sends a custom module lib/mymodule.py to nodes before importing it.
Note that Disco sends only the files that can be found in your PYTHONPATH. It is assumed that files outside PYTHONPATH belong either to the Python standard library or to another package that is installed system-wide. Make sure that all modules that require automatic distribution can be found in your PYTHONPATH.
Automatic distribution works only for individual modules and not for packages nor modules that require a specific directory hierarchy. You need to install packages and modules with special requirements manually to your nodes.
The following list describes some typical use cases for required_modules. The list helps you decide when to use the find_modules() and locate_modules() functions.
(default) If you want to find and send all required modules used by your job functions recursively (i.e. also modules that depend on other modules are included), you don’t need to specify required_modules at all. This equals to:
required_modules = modutil.find_modules(job_functions)where job_functions is a list of all job functions: map, map_init, combiner etc.
If you want to find and import all required modules, but not send them, or you want to disable recursive analysis, use find_modules() explicitly with the send_modules and recursive parameters.
If you want to send a known set of modules (possible recursively) but you don’t know their paths, use locate_modules().
If you want to send a known set of modules. provide a list of (module name, module path) tuples.
If you just want to import specific modules, or sub-modules in a pre-installed package (e.g. PIL.Image), provide a list of module names.
Any combinations of the above are allowed. For instance:
required_modules = find_modules([fun_map]) + [("mymodule", "/tmp/mymodule.py"), "time", "random"]
is a valid expression.
Error raised when a module can’t be found by disco.worker.classic.modutil.
Tries to guess and locate modules that are used by functions. Returns a list of required modules as specified in How to specify required modules.
Parameters: |
|
---|
Finds module files corresponding to the module names specified in the list modules.
Parameters: |
|
---|
A module is local if it can be found in your PYTHONPATH. For modules that can be found under system-wide default paths (e.g. /usr/lib/python), just the module name is returned without the corresponding path, so system-wide modules are not distributed to nodes unnecessarily.
This function is used by find_modules() to locate modules used by the specified functions.
Tries to guess which modules are used by function. Returns a list of module names.
This function is used by find_modules() to parse modules used by a function. You can use it to check that all modules used by your functions are detected correctly.
The current heuristic requires that modules are accessed using the dot notation directly, e.g. random.uniform(1, 10). For instance, required modules are not detected correctly in the following snippet:
a = random
a.uniform(1, 10)
Also, modules used in generator expressions, like here:
return ((k, base64.encodestring(v)) for k, v in d.iteritems())
are not detected correctly.
Note
Since Disco 0.4, you can write workers in any language without any dependencies to Python, using The Disco Worker Protocol. Use Disco External Interface if primarily you want to use Python with only parts of your job written in another language.
An external interface for specifying map and reduce functions as external programs, instead of Python functions. This feature is useful if you have already an existing program or a library which could be useful for a Disco job, or your map / reduce task is severely CPU or memory-bound and implementing it, say, in C, would remedy the problem.
Note that this external interface is not suitable for speeding up jobs that are mostly IO bound, or slowed down due to overhead caused by Disco. Actually, since the external interface uses the standard input and output for communicating with the process, the overhead caused by Disco is likely to increase when using the external interface. However, if the task is CPU or memory-bound, the additional communication overhead is probably minimal compared to gained benefits.
In many cases there is an easier alternative to the external interface: You can write the CPU-intensive functions in C and compile them to a shared library which can be included in the required_files list of disco.core.Disco.new_job(). Here is an example:
def fast_map(e, params):
return [("", params.mylib.fast_function(e))]
def map_init(iter, params):
ctypes.cdll.LoadLibrary("mylib.so")
params.mylib = ctypes.CDLL("mylib.so")
Disco("disco://discomaster").new_job(
name = "mylib_job",
input = ["http://someinput"],
map = fast_map,
map_init = map_init,
required_files = ["mylib.so"],
required_modules = ["ctypes"])
If this approach works for you, there is no need to read this document further. For more information, see documentation of the ctypes module.
The external program reads key-value pairs from the standard input and outputs key-value pairs to the standard output. In addition, the program may read parameters from the standard input when the task starts, and it may output log messages to the standard error stream. This interface should be easy to implement in any programming language, although C is used in examples below.
The key-value pairs are both read and written in the following format:
<key-size><key><value-size><value>
Here key-size and value-size are 32-bit integers, encoded in little-endian, which specify the sizes of the key and the value in bytes. key and value correspond to the key and the value strings.
For instance, the following C function reads a key-value pair from the standard input:
void read_kv(char **key, char **val)
{
unsigned int len;
*key = *val = NULL;
/* read key */
if (!fread(&len, 4, 1, stdin))
return 0;
if (len){
*key = malloc(len);
fread(*key, len, 1, stdin);
}
/* read value */
fread(&len, 4, 1, stdin);
if (len){
*val = malloc(len);
fread(*val, len, 1, stdin);
}
return 1;
}
Outputting a key-value pair works correspondingly using fwrite(). Using the function defined above, one can iterate through all input pairs as follows:
char *key, *val;
while (read_kv(&key, &val)){
/* do something with key and value */
free(key);
free(val);
}
The external program must read key-value pairs from the standard input as long as there is data available. The program must not exit before all the input is consumed.
Note that extra care must be taken with buffering of the standard output, so that the output pairs are actually sent to the receiving program, and not kept in an internal buffer. Call fflush(stdout) if unsure.
External program is started with one command line argument: “map” or “reduce”. This makes it possible to use a single binary to handle both map and reduce by using the command line argument to choose the function it should execute.
Map and reduce tasks follow slightly different interfaces, as specified below.
An external map task must read a key-value pair from stdin as specified above, and before reading the next pair, output a result list which may be empty. The output list is defined as follows:
<num-pairs>[<pair_0>...<pair_{num_pairs}>]
where num-pairs is a 32-bit integer, which may be zero. It is followed by exactly num-pairs consequent key-value pairs as defined above.
Inputs for the external map are read using the map_reader. The map reader may produce each input entry as a single string that is used as the in a key-value pair where the key is an empty string. Alternatively, the reader may return a pair of strings as a tuple, in which case both the key and the value are specified.
The map finishes when the result list for the final key-value pair is received.
In contrast to the external map, the external reduce is not required to match each input with a result list. Instead, the external reduce may output a result list, as specified above, any time it wants, also after all the inputs have been exhausted. As an extreme case, it may not produce any output at all.
The reduce finishes when the program exits.
When outputting messages to the standard error, the following format must be used
void msg(const char *msg){
fprintf(stderr, "**<MSG> %s\n", msg);
}
void die(const char *msg){
fprintf(stderr, "**<ERR> %s\n", msg);
exit(1);
}
Each line must have the first seven bytes as defined above, and the line must end with a newline character. The msg() function above is subject to the same limits as the standard disco_worker.msg() message function.
Any parameters for the external program must be specified in the ext_params parameter for disco.core.Job(). If ext_params is specified as a string, Disco will provide it as is for the external program in the standard input, before any key-value pairs. It is on the responsibility of the external program to read all bytes that belong to the parameter set before starting to receive key-value pairs.
As a special case, the standard C interface for Disco, as specified below, accepts a dictionary of string-string pairs as ext_params. The dictionary is then encoded by disco.core.Job() using the disco.worker.classic.netstring module. The netstring format is extremely simple, consisting of consequent key-value pairs. An example how to parse parameters in this case can be found in the read_parameters() function in ext/disco.c.
An external task consists of a single executable main program and an arbitrary number of supporting files. All the files are written to a single flat directory on the target node, so the program must be prepared to access any supporting files on its current working directory, including any libraries it needs.
Any special settings, or environment variables, that the program needs to be set can be usually arranged by a separate shell script that prepares the environment before running the actual executable. In that case your main program will be the shell script, and the actual executable one of the supporting files.
An external program absolutely must not read any files besides the ones included in its supporting files. It must not write to any files on its host, to ensure integrity of the runtime environment.
An external map or reduce task is specified by giving a dictionary, instead of a function, as the fun_map or reduce parameter in disco.core.Job(). The dictionary contains at least a single key-value pair where key is the string “op” and the value the actual executable code. Here’s an example:
disco.job("disco://localhost:5000",
["disco://localhost/myjob/file1"],
fun_map = {"op": open("bin/external_map").read(),
"config.txt": open("bin/config.txt").read()})
The dictionary may contain other keys as well, which correspond to the file names (not paths) of the supporting files, such as “config.txt” above. The corresponding values must contain the contents of the supporting files as strings.
A convenience function disco.util.external() is provided for constructing the dictionary that specifies an external task. Here’s the same example as above but using disco.util.external():
disco.job("disco://localhost:5000",
["disco://localhost/myjob/file1"],
fun_map = disco.external(["bin/external_map", "bin/config.txt"]))
Note that the first file in the list must be the actual executable. The rest of the paths may point at the supporting files in an arbitrary order.
Disco comes with a tiny C file, ext/disco.c and a header, ext/disco.h which wrap the external interface behind a few simple functions. The library takes care of allocating memory for incoming key-value pairs, without doing malloc-free for each pair separately. It also takes care of reading a parameter dictionary to a Judy array which is like a dictionary object for C.
Here’s a simple external map program that echoes back each key-value pair, illustriating usage of the library.
#include <disco.h>
int main(int argc, char **argv)
{
const Pvoid_t params = read_parameters();
Word_t *ptr;
JSLG(ptr, params, "some parameter");
if (!ptr)
die("parameter missing");
p_entry *key = NULL;
p_entry *val = NULL;
int i = 0;
while (read_kv(&key, &val)){
if (!(i++ % 10000))
msg("Got key <%s> val <%s>", key->data, val->data);
write_num_prefix(1);
write_kv(key, val);
}
msg("%d key-value pairs read ok", i);
return 0;
}
The following functions are available in the library
This function must be called before any call to the function read_kv(). It returns the parameter dictionary as a Judy array of type JudySL. See JudySL man page for more information.
Kills the job with the message msg.
Reads a key-value pair from the standard input. read_kv() can re-use key and value across many calls, so there is no need to free() them explicitely. If you need to save a key-value pair on some iteration, use copy_entry() to make a copy of the desired entry. Naturally you are responsible for freeing any copy that isn’t needed anymore, unless you re-use it as a copy_entry() destination. To summarize, you need to call free() for entries that won’t be re-used in a copy_entry() or read_kv() call.
Returns key and value strings in p_entry structs.
Writes the num_pairs prefix for the result list as defined above. This call must be followed by num write_kv() calls.
Writes a key-value pair to the standard output. Must be preceded with a write_num_prefix() call.
In addition, the library contains the following utility functions:
Tries to allocate size bytes. Exits with die() if allocation fails.
Copies src to dst. Grows dst if needed, or allocates a new p_entry if dst = NULL.
Packages an external program, together with other files it depends on, to be used either as a map or reduce function.
Parameters: | files – a list of paths to files so that the first file points at the actual executable. |
---|
This example shows how to use an external program, cmap that needs a configuration file cmap.conf, as the map function:
disco.new_job(input=["disco://localhost/myjob/file1"],
fun_map=disco.util.external(["/home/john/bin/cmap",
"/home/john/cmap.conf"]))
All files listed in files are copied to the same directory so any file hierarchy is lost between the files.