disco.worker.classic – Classic Disco Worker Interface

disco.worker.classic.worker – Classic Disco Runtime Environment

When a Job is constructed using the classic Worker defined in this module, Disco runs the disco.worker.classic.worker module for every job task. This module reconstructs the Worker on the node where it is run, in order to execute the job functions which were used to create it.

Classic Workers resolve all parameters using getitem().

Thus, users can subclass Job as a convenient way to specify fixed parameters. For example, here’s a simple distributed grep from the Disco examples/ directory:

"""
An example using Job classes to implement grep.

Could be run as follows (assuming this module can be found on sys.path):

disco run grep.Grep -P params pattern [tag_url_or_path] ...
"""
from disco.job import Job
from disco.worker.classic.func import nop_map

class Grep(Job):
    map = staticmethod(nop_map)
    params = r''

    @staticmethod
    def map_reader(fd, size, url, params):
        import re
        pattern = re.compile(params)
        for line in fd:
            if pattern.match(line):
                yield url, line
class disco.worker.classic.worker.Worker(**kwargs)

A disco.worker.Worker, which additionally supports the following parameters, to maintain the Classic Disco Interface:

Parameters:
  • map (disco.worker.classic.func.map()) – a function that defines the map task.
  • map_init (disco.worker.classic.func.init()) –

    initialization function for the map task. This function is called once before the task starts.

    Deprecated since version 0.4: map_init has not been needed ever since :class:`disco.worker.task_io.InputStream`s were introduced. Use map_input_stream and/or map_reader instead.

  • map_input_stream (sequence of disco.worker.task_io.input_stream()) –

    The given functions are chained together and the final resulting disco.worker.task_io.InputStream object is used to iterate over input entries.

    New in version 0.2.4.

  • map_output_stream (sequence of disco.worker.task_io.output_stream()) –

    The given functions are chained together and the disco.worker.task_io.OutputStream.add() method of the last returned disco.worker.task_io.OutputStream object is used to serialize key, value pairs output by the map.

    New in version 0.2.4.

  • map_reader (None or disco.worker.task_io.input_stream()) –

    Convenience function to define the last disco.worker.task_io.input_stream() function in the map_input_stream chain.

    If you want to use outputs of an earlier job as inputs, use disco.worker.task_io.chain_reader() as the map_reader.

    Changed in version 0.3.1: The default is None.

  • combiner (disco.worker.classic.func.combiner()) – called after the partitioning function, for each partition.
  • reduce (disco.worker.classic.func.reduce()) –

    If no reduce function is specified, the job will quit after the map phase has finished.

    New in version 0.3.1: Reduce now supports an alternative signature, disco.worker.classic.func.reduce2(), which uses an iterator instead of out.add() to output results.

    Changed in version 0.2: It is possible to define only reduce without map. See also Do I always have to provide a function for map and reduce?.

  • reduce_init (disco.worker.classic.func.init()) –

    initialization function for the reduce task. This function is called once before the task starts.

    Deprecated since version 0.4: reduce_init has not been needed ever since :class:`disco.worker.task_io.InputStream`s were introduced. Use reduce_input_stream and/or reduce_reader instead.

  • reduce_input_stream (sequence of disco.worker.task_io.output_stream()) –

    The given functions are chained together and the last returned disco.worker.task_io.InputStream object is given to reduce as its first argument.

    New in version 0.2.4.

  • reduce_output_stream (sequence of disco.worker.task_io.output_stream()) –

    The given functions are chained together and the last returned disco.worker.task_io.OutputStream object is given to reduce as its second argument.

    New in version 0.2.4.

  • reduce_reader (disco.worker.task_io.input_stream()) –

    Convenience function to define the last disco.worker.task_io.input_stream() if map is specified. If map is not specified, you can read arbitrary inputs with this function, similar to map_reader.

    Default is disco.worker.task_io.chain_reader().

    New in version 0.2.

  • partition (disco.worker.classic.func.partition()) –

    decides how the map output is distributed to reduce.

    Default is disco.worker.classic.func.default_partition().

  • partitions (int or None) –

    number of partitions, if any.

    Default is 1.

  • sort (boolean) –

    flag specifying whether the intermediate results, that is, input to the reduce function, should be sorted. Sorting is most useful in ensuring that the equal keys are consequent in the input for the reduce function.

    Other than ensuring that equal keys are grouped together, sorting ensures that keys are returned in the ascending order. No other assumptions should be made on the comparison function.

    The external program sort is used to sort the input on disk. In-memory sort can easily be performed by the tasks themselves.

    Default is False.

  • sort_buffer_size (string) –

    how much memory can be used by external sort.

    Passed as the ‘-S’ option to Unix sort (see man sort). Default is 10% i.e. 10% of the total available memory.

  • params (object) –

    object that is passed to worker tasks to store state The object is serialized using the pickle module, so it should be pickleable.

    A convenience class Params is provided that provides an easy way to encapsulate a set of parameters. Params allows including functions in the parameters.

  • ext_params

    if either map or reduce function is an external program, typically specified using disco.util.external(), this object is used to deliver parameters to the program.

    See disco.worker.classic.external.

  • status_interval (int) –

    print “K items mapped / reduced” for every Nth item. Setting the value to 0 disables messages.

    Increase this value, or set it to zero, if you get “Message rate limit exceeded” error due to system messages. This might happen if your tasks are really fast. Decrease the value if you want more messages or you don’t have that many data items.

    Default is 100000.

jobdict(job, **jobargs)

Creates The Job Dict for the Worker.

Makes use of the following parameters, in addition to those defined by the Worker itself:

Parameters:
  • input (list of urls or list of list of urls) –

    used to set jobdict.input. Disco natively handles the following url schemes:

    • http://... - any HTTP address
    • file://... or no scheme - a local file.
      The file must exist on all nodes where the tasks are run. Due to these restrictions, this form has only limited use.
    • tag://... - a tag stored in Disco Distributed Filesystem
    • raw://... - pseudo-address: use the address itself as data.
    • dir://... - used by Disco internally.
    • disco://... - used by Disco internally.

    See also

    disco.schemes.

  • scheduler (dict) – directly sets jobdict.scheduler.

Uses getitem() to resolve the values of parameters.

Returns:the job dict.
disco.worker.classic.worker.get(*args, **kwargs)

See disco.task.Task.get().

disco.worker.classic.worker.put(*args, **kwargs)

See disco.task.Task.put().

disco.worker.classic.worker.this_host()

Returns hostname of the node that executes the current task.

disco.worker.classic.worker.this_inputs()

Returns the inputs for the worker.

disco.worker.classic.worker.this_master()

Returns hostname and port of the disco master.

disco.worker.classic.worker.this_name()

Returns the jobname for the current task.

disco.worker.classic.worker.this_partition()

For a map task, returns an integer between [0..nr_maps] that identifies the task. This value is mainly useful if you need to generate unique IDs in each map task. There are no guarantees about how ids are assigned for map tasks.

For a reduce task, returns an integer between [0..partitions] that identifies this partition. You can use a custom partitioning function to assign key-value pairs to a particular partition.

disco.worker.classic.func — Functions for constructing Classic Disco jobs

A Classic Disco job is specified by one or more job functions. This module defines the interfaces for the job functions, some default values, as well as otherwise useful functions.

disco.worker.classic.func.combiner(key, value, buffer, done, params)

Returns an iterator of (key, value) pairs or None.

Parameters:
  • key – key object emitted by the map()
  • value – value object emitted by the map()
  • buffer – an accumulator object (a dictionary), that combiner can use to save its state. The function must control the buffer size, to prevent it from consuming too much memory, by calling buffer.clear() after each block of results. Note that each partition (as determined by the key and partition()) gets its own buffer object.
  • done – flag indicating if this is the last call with a given buffer
  • params – the object specified by the params parameter

This function receives all output from the map() before it is saved to intermediate results. Only the output produced by this function is saved to the results.

After map() has consumed all input entries, combiner is called for the last time with the done flag set to True. This is the last opportunity for the combiner to return something.

disco.worker.classic.func.init(input_iter, params)

Perform some task initialization.

Parameters:input_iter – an iterator returned by a reader()

Typically this function is used to initialize some modules in the worker environment (e.g. ctypes.cdll.LoadLibrary()), to initialize some values in params, or to skip unneeded entries in the beginning of the input stream.

disco.worker.classic.func.make_range_partition(min_val, max_val)

Returns a new partitioning function that partitions keys in the range [min_val:max_val] into equal sized partitions.

The number of partitions is defined by the partitions parameter

disco.worker.classic.func.map(entry, params)

Returns an iterable of (key, value) pairs given an entry.

Parameters:
  • entry – entry coming from the input stream
  • params – used to maintain state between calls to the map function.

For instance:

def fun_map(e, params):
    return [(w, 1) for w in e.split()]

This example takes a line of text as input in e, tokenizes it, and returns a list of words as the output.

The map task can also be an external program. For more information, see disco.worker.classic.external - Classic Disco External Interface.

disco.worker.classic.func.nop_map(entry, params)

No-op map.

This function can be used to yield the results from the input stream.

disco.worker.classic.func.nop_reduce(iter, out, params)

No-op reduce.

This function can be used to combine results per partition from many map functions to a single result file per partition.

disco.worker.classic.func.notifier(urls)
Parameters:urls (url or list of urls) – a list of urls gives replica locators.
disco.worker.classic.func.partition(key, nr_partitions, params)

Returns an integer in range(0, nr_partitions).

Parameters:
  • key – is a key object emitted by a task function
  • nr_partitions – the number of partitions
  • params – the object specified by the params parameter
disco.worker.classic.func.reduce(input_stream, output_stream, params)

Takes three parameters, and adds reduced output to an output object.

Parameters:
  • input_streamInputStream object that is used to iterate through input entries.
  • output_streamOutputStream object that is used to output results.
  • params – the object specified by the params parameter

For instance:

def fun_reduce(iter, out, params):
    d = {}
    for k, v in iter:
        d[k] = d.get(k, 0) + 1
    for k, c in d.iteritems():
        out.add(k, c)

This example counts how many times each key appears.

The reduce task can also be an external program. For more information, see disco.worker.classic.external - Classic Disco External Interface.

disco.worker.classic.func.reduce2(input_stream, params)

Alternative reduce signature which takes 2 parameters.

Reduce functions with this signature should return an iterator of key, value pairs, which will be implicitly added to the OutputStream.

For instance:

def fun_reduce(iter, params):
    from disco.util import kvgroup
    for k, vs in kvgroup(sorted(iter)):
        yield k, sum(1 for v in vs)

This example counts the number of values for each key.

disco.worker.classic.func.sum_combiner(key, value, buf, done, params)

Sums the values for each key.

This is a convenience function for performing a basic sum in the combiner.

disco.worker.classic.func.sum_reduce(iter, params)

Sums the values for each key.

This is a convenience function for performing a basic sum in the reduce.

disco.worker.classic.external - Classic Disco External Interface

Note

Since Disco 0.4, you can write workers in any language without any dependencies to Python, using The Disco Worker Protocol. Use Disco External Interface if primarily you want to use Python with only parts of your job written in another language.

An external interface for specifying map and reduce functions as external programs, instead of Python functions. This feature is useful if you have already an existing program or a library which could be useful for a Disco job, or your map / reduce task is severely CPU or memory-bound and implementing it, say, in C, would remedy the problem.

Note that this external interface is not suitable for speeding up jobs that are mostly IO bound, or slowed down due to overhead caused by Disco. Actually, since the external interface uses the standard input and output for communicating with the process, the overhead caused by Disco is likely to increase when using the external interface. However, if the task is CPU or memory-bound, the additional communication overhead is probably minimal compared to gained benefits.

Easy approach using the ctypes module

In many cases there is an easier alternative to the external interface: You can write the CPU-intensive functions in C and compile them to a shared library which can be included in the required_files list of disco.core.Disco.new_job(). Here is an example:

def fast_map(e, params):
        return [("", params.mylib.fast_function(e))]

def map_init(iter, params):
        ctypes.cdll.LoadLibrary("mylib.so")
        params.mylib = ctypes.CDLL("mylib.so")

Disco("disco://discomaster").new_job(
        name = "mylib_job",
        input = ["http://someinput"],
        map = fast_map,
        map_init = map_init,
        required_files = ["mylib.so"],
        required_modules = ["ctypes"])

If this approach works for you, there is no need to read this document further. For more information, see documentation of the ctypes module.

External interface

The external program reads key-value pairs from the standard input and outputs key-value pairs to the standard output. In addition, the program may read parameters from the standard input when the task starts, and it may output log messages to the standard error stream. This interface should be easy to implement in any programming language, although C is used in examples below.

The key-value pairs are both read and written in the following format:

<key-size><key><value-size><value>

Here key-size and value-size are 32-bit integers, encoded in little-endian, which specify the sizes of the key and the value in bytes. key and value correspond to the key and the value strings.

For instance, the following C function reads a key-value pair from the standard input:

void read_kv(char **key, char **val)
{
        unsigned int len;
        *key = *val = NULL;
        /* read key */
        if (!fread(&len, 4, 1, stdin))
                return 0;
        if (len){
                *key = malloc(len);
                fread(*key, len, 1, stdin);
        }
        /* read value */
        fread(&len, 4, 1, stdin);
        if (len){
                *val = malloc(len);
                fread(*val, len, 1, stdin);
        }
        return 1;
}

Outputting a key-value pair works correspondingly using fwrite(). Using the function defined above, one can iterate through all input pairs as follows:

char *key, *val;
while (read_kv(&key, &val)){
        /* do something with key and value */
        free(key);
        free(val);
}

The external program must read key-value pairs from the standard input as long as there is data available. The program must not exit before all the input is consumed.

Note that extra care must be taken with buffering of the standard output, so that the output pairs are actually sent to the receiving program, and not kept in an internal buffer. Call fflush(stdout) if unsure.

External program is started with one command line argument: “map” or “reduce”. This makes it possible to use a single binary to handle both map and reduce by using the command line argument to choose the function it should execute.

Map and reduce tasks follow slightly different interfaces, as specified below.

External map

An external map task must read a key-value pair from stdin as specified above, and before reading the next pair, output a result list which may be empty. The output list is defined as follows:

<num-pairs>[<pair_0>...<pair_{num_pairs}>]

where num-pairs is a 32-bit integer, which may be zero. It is followed by exactly num-pairs consequent key-value pairs as defined above.

Inputs for the external map are read using the map_reader. The map reader may produce each input entry as a single string that is used as the in a key-value pair where the key is an empty string. Alternatively, the reader may return a pair of strings as a tuple, in which case both the key and the value are specified.

The map finishes when the result list for the final key-value pair is received.

External reduce

In contrast to the external map, the external reduce is not required to match each input with a result list. Instead, the external reduce may output a result list, as specified above, any time it wants, also after all the inputs have been exhausted. As an extreme case, it may not produce any output at all.

The reduce finishes when the program exits.

Logging

When outputting messages to the standard error, the following format must be used

void msg(const char *msg){
     fprintf(stderr, "**<MSG> %s\n", msg);
}

void die(const char *msg){
     fprintf(stderr, "**<ERR> %s\n", msg);
     exit(1);
}

Each line must have the first seven bytes as defined above, and the line must end with a newline character.

Parameters

Any parameters for the external program must be specified in the ext_params parameter for disco.core.Job(). If ext_params is specified as a string, Disco will provide it as is for the external program in the standard input, before any key-value pairs. It is on the responsibility of the external program to read all bytes that belong to the parameter set before starting to receive key-value pairs.

As a special case, the standard C interface for Disco, as specified below, accepts a dictionary of string-string pairs as ext_params. The dictionary is then encoded by disco.core.Job() using the disco.worker.classic.netstring module. The netstring format is extremely simple, consisting of consequent key-value pairs. An example how to parse parameters in this case can be found in the read_parameters() function in ext/disco.c.

Usage

An external task consists of a single executable main program and an arbitrary number of supporting files. All the files are written to a single flat directory on the target node, so the program must be prepared to access any supporting files on its current working directory, including any libraries it needs.

Any special settings, or environment variables, that the program needs to be set can be usually arranged by a separate shell script that prepares the environment before running the actual executable. In that case your main program will be the shell script, and the actual executable one of the supporting files.

An external program absolutely must not read any files besides the ones included in its supporting files. It must not write to any files on its host, to ensure integrity of the runtime environment.

An external map or reduce task is specified by giving a dictionary, instead of a function, as the fun_map or reduce parameter in disco.core.Job(). The dictionary contains at least a single key-value pair where key is the string “op” and the value the actual executable code. Here’s an example:

disco.job("disco://localhost:5000",
          ["disco://localhost/myjob/file1"],
          fun_map = {"op": open("bin/external_map").read(),
                     "config.txt": open("bin/config.txt").read()})

The dictionary may contain other keys as well, which correspond to the file names (not paths) of the supporting files, such as “config.txt” above. The corresponding values must contain the contents of the supporting files as strings.

A convenience function disco.util.external() is provided for constructing the dictionary that specifies an external task. Here’s the same example as above but using disco.util.external():

disco.job("disco://localhost:5000",
          ["disco://localhost/myjob/file1"],
          fun_map = disco.external(["bin/external_map", "bin/config.txt"]))

Note that the first file in the list must be the actual executable. The rest of the paths may point at the supporting files in an arbitrary order.

Disco C library

Disco comes with a tiny C file, ext/disco.c and a header, ext/disco.h which wrap the external interface behind a few simple functions. The library takes care of allocating memory for incoming key-value pairs, without doing malloc-free for each pair separately. It also takes care of reading a parameter dictionary to a Judy array which is like a dictionary object for C.

Here’s a simple external map program that echoes back each key-value pair, illustriating usage of the library.

#include <disco.h>

int main(int argc, char **argv)
{
        const Pvoid_t params = read_parameters();
        Word_t *ptr;
        JSLG(ptr, params, "some parameter");
        if (!ptr)
                die("parameter missing");

        p_entry *key = NULL;
        p_entry *val = NULL;

        int i = 0;
        while (read_kv(&key, &val)){
                if (!(i++ % 10000))
                        msg("Got key <%s> val <%s>", key->data, val->data);
                write_num_prefix(1);
                write_kv(key, val);
        }
        msg("%d key-value pairs read ok", i);
        return 0;
}

The following functions are available in the library

Pvoid_t read_parameters()

This function must be called before any call to the function read_kv(). It returns the parameter dictionary as a Judy array of type JudySL. See JudySL man page for more information.

void die(const char *msg)

Kills the job with the message msg.

int read_kv(p_entry **key, p_entry **val)

Reads a key-value pair from the standard input. read_kv() can re-use key and value across many calls, so there is no need to free() them explicitely. If you need to save a key-value pair on some iteration, use copy_entry() to make a copy of the desired entry. Naturally you are responsible for freeing any copy that isn’t needed anymore, unless you re-use it as a copy_entry() destination. To summarize, you need to call free() for entries that won’t be re-used in a copy_entry() or read_kv() call.

Returns key and value strings in p_entry structs.

p_entry

Container type for a string.

p_entry.len

Length of the string

p_entry.sze

Size of the allocated buffer. Always holds len <= sze.

p_entry.data

Actual string of the size len, ending with an additional zero byte.

void write_num_prefix(int num)

Writes the num_pairs prefix for the result list as defined above. This call must be followed by num write_kv() calls.

void write_kv(const p_entry *key, const p_entry *val)

Writes a key-value pair to the standard output. Must be preceded with a write_num_prefix() call.

In addition, the library contains the following utility functions:

void *dxmalloc(unsigned int size)

Tries to allocate size bytes. Exits with die() if allocation fails.

void copy_entry(p_entry **dst, const p_entry *src)

Copies src to dst. Grows dst if needed, or allocates a new p_entry if dst = NULL.

disco.worker.classic.external.package(files)

Packages an external program, together with other files it depends on, to be used either as a map or reduce function.

Parameters:files – a list of paths to files so that the first file points at the actual executable.

This example shows how to use an external program, cmap that needs a configuration file cmap.conf, as the map function:

disco.new_job(input=["disco://localhost/myjob/file1"],
              fun_map=disco.util.external(["/home/john/bin/cmap",
                                           "/home/john/cmap.conf"]))

All files listed in files are copied to the same directory so any file hierarchy is lost between the files.