Disco FAQ

How come ssh localhost erl doesn’t use my normal $PATH?

ssh localhost erl

is different from:

ssh localhost
erl

In general, interactive shells behave differently than non-interactive ones. For example, see the Bash Reference Manual.

How do I profile programs in Disco?

Disco can use the Profile module to profile map and reduce tasks written in Python. Enable profiling by setting profile = True in your disco.job.Job.

Here’s a simple example:

"""
Try running this from the ``examples/faq/`` directory using:

disco run profile.ProfileJob http://example.com/data | xargs disco wait && xargs disco pstats -k cumulative
"""
from disco.job import Job

class ProfileJob(Job):
    profile = True

    @staticmethod
    def map(entry, params):
        yield entry.strip(), 1

See also

disco.core.Disco.profile_stats() for accessing profiling results from Python.

How do I debug programs in Disco?

Set up a single node Disco cluster locally on your laptop or desktop. It makes debugging a Disco job almost as easy as debugging any Python script.

Do I always have to provide a function for map and reduce?

No, you may specify either map or reduce or both. Many simple tasks can be solved with a single map function, without reduce.

It is somewhat less typical to specify only the reduce function. This case arises when you want to merge results from independent map jobs, or you want to join several input files without going through the map phase.

See also: Data Flow in MapReduce Disco Jobs

How many maps can I have? Does a higher number of maps lead to better performance?

In theory there is no restriction. In practice, the number is of course limited by the available disk space (for input files) and the amount of RAM that is required by the Disco master. Disco includes a test case, in tests/test_50k.py that starts 50,000 map tasks in parallel. You should be able to add a few zeroes there without any trouble. If you perform any stress tests of your own, let us know about your findings!

Each map and reduce instance is allocated exclusive access to a CPU. This means that the number of parallel processes is limited by the number of available CPUs. If you have 50,000 map instances but only 50 CPUs, only 50 maps are run in parallel while 49,550 instances are either waiting in the job queue or marked as ready — assuming that no other jobs are running in the system at the same time and your input is split to at least 50,000 separate files.

The number of maps can never exceed the number of input files as Disco can’t order many maps to process a single input file. In other words, to run K maps in parallel you need at least K input files. See Pushing Chunked Data to DDFS for more on splitting data stored in Disco Distributed Filesystem.

In general, the question about the expected speedup when increasing parallelism is a rather complicated one and it depends heavily on the task at hand. See Amdahl’s Law for more information about the subject. However, unless your tasks are so light that the execution time is dominated by the overhead caused by Disco, you can expect to gain some speedup by adding more maps until the number of maps equals to the number of available CPUs.

How do I pass the output of one map-reduce phase to another?

Many algorithms can be implemented cleanly as a sequence of mapreduce jobs. Chaining jobs together is also efficient, as the job’s results are readily distributed and stored in Disco’s internal format.

Here’s an example that runs ten jobs in a sequence, using outputs from the previous job as the input for the next one. The code can also be found in examples/faq/chain.py. The job increments each value in the input by one:

from disco.job import Job
from disco.worker.task_io import chain_reader

class FirstJob(Job):
    input = ['raw://0', 'raw://0']

    @staticmethod
    def map(line, params):
        yield int(line) + 1, ""

class ChainJob(Job):
    map_reader = staticmethod(chain_reader)

    @staticmethod
    def map(key_value, params):
        yield int(key_value[0]) + 1, key_value[1]

if __name__ == "__main__":
    # Jobs cannot belong to __main__ modules.  So, import this very
    # file to access the above classes.
    import chain
    last = chain.FirstJob().run()
    for i in range(9):
        last = chain.ChainJob().run(input=last.wait())
    print(last.name)

Assuming that the input files consists of zeroes, this example will produce a sequence of tens as the result.

How do I print messages to the Web interface from Python?

Use a normal Python print statement.

Note

This is meant for simple debugging, if you print messages too often, Disco will throttle your worker. The master limits the rate of messages coming from workers, to prevent it from being overwhelmed.

Internally, Disco wraps everything written to sys.stdout with appropriate markup for the Erlang worker process, which it communicates with via sys.stderr. See also The Disco Worker Protocol.

Why not Hadoop?

We see that platforms for distributed computing will be of such high importance in the future that it is crucial to have a wide variety of different approaches which produces healthy competition and co-evolution between the projects. In this respect, Hadoop and Disco can be seen as complementary projects, similar to Apache, Lighttpd and Nginx.

It is a matter of taste whether Erlang and Python are more suitable for the task than Java. We feel much more productive with Python than with Java. We also feel that Erlang is a perfect match for the Disco core that needs to handle tens of thousands of tasks in parallel.

Thanks to Erlang, the Disco core is remarkably compact. It is relatively easy to understand how the core works, and start experimenting with it or adapt it to new environments. Thanks to Python, it is easy to add new features around the core which ensures that Disco can respond quickly to real-world needs.

How do I use Disco on Amazon EC2?

In general, you can use the EC2 cluster as any other Disco cluster. However, if you want to access result files from your local machine, you need to set the DISCO_PROXY setting. This configures the master node as a proxy, since the computation nodes on EC2 are not directly accessible.

Hint

For instance, you could open an SSH tunnel to the master:

ssh MASTER -L 8989:localhost:8989

and set DISCO_PROXY=http://localhost:8989.