This tutorial shows how to create and run a Disco job that counts words.
To start with, you need nothing but a single text file.
Let’s call the file
If you don’t happen to have a suitable file on hand,
you can download one from here.
1. Prepare input data¶
Disco can distribute computation only as well as data can be distributed. In general, we can push data to Disco Distributed Filesystem, which will take care of distributing and replicating it.
Prior to Disco 0.3.2, this was done by splitting data manually,
and then using
ddfs push to push user-defined blobs.
As of Disco 0.3.2, you can use
to automatically chunk and push size-limited chunks to DDFS.
See Pushing Chunked Data to DDFS.
Lets chunk and push the data to a tag
ddfs chunk data:bigtxt ./bigfile.txt
We should have seen some output telling us that the chunk(s) have been created. We can also check where they are located:
ddfs blobs data:bigtxt
and make sure they contain what you think they do:
ddfs xcat data:bigtxt | less
Chunks are stored in Disco’s internal compressed format,
thus we use
ddfs xcat instead of
ddfs cat to view them.
ddfs xcat applies some
ddfs cat just dumps the raw bytes contained in the blobs.
If you used the file provided above,
you should have only ended up with a single chunk.
This is because the default chunk size is 64MB (compressed),
bigfile.txt is only 12MB (uncompressed).
You can try with a larger file to see that chunks are created as needed.
If you have unchunked data stored in DDFS that you would like to chunk, you can run a Disco job, to parallelize the chunking operation. Disco includes an example of how to do this, which should work unmodified for most use cases.
2. Write job functions¶
def fun_map(line, params): for word in line.split(): yield word, 1
Quite compact, eh?
The map function takes two parameters, here they are called line and params.
The first parameter contains an input entry, which is by default a line of text.
An input entry can be anything though,
since you can define a custom function that parses an input stream
(see the parameter map_reader in the
The second parameter, params, can be any object that you specify,
in case that you need some additional input for your functions.
For our example, we can happily process input line by line.
The map function needs to return an iterator over of key-value pairs.
Here we split a line into tokens using the builtin
Each token is output separately as a key, together with the value 1.
Now, let’s write the corresponding reduce function:
def fun_reduce(iter, params): from disco.util import kvgroup for word, counts in kvgroup(sorted(iter)): yield word, sum(counts)
The first parameter, iter, is an iterator over those keys and values produced by the map function, which belong to this reduce instance (see partitioning).
In this case, words are randomly assigned to different reduce instances.
Again, this is something that can be changed
partition() for more information).
However, as long as all occurrences of the same word go to the same reduce,
we can be sure that the final counts are correct.
The second parameter params is the same as in the map function.
We simply use
disco.util.kvgroup() to pull out each word along with its counts,
and sum the counts together, yielding the result.
Now we have written map and reduce functions for counting words in parallel.
3. Run the job¶
Now the only thing missing is a command for running the job. There’s a large number of parameters that you can use to specify your job, but only three of them are required for a simple job like ours.
In addition to starting the job, we want to print out the results as well.
First, however, we have to wait until the job has finished.
This is done with the
which returns results of the job once has it has finished.
For convenience, the
as well as other methods related to a job,
can be called through the
The following example from
examples/util/count_words.py runs the job,
and prints out the results:
from disco.core import Job, result_iterator def map(line, params): for word in line.split(): yield word, 1 def reduce(iter, params): from disco.util import kvgroup for word, counts in kvgroup(sorted(iter)): yield word, sum(counts) if __name__ == '__main__': job = Job().run(input=["http://discoproject.org/media/text/chekhov.txt"], map=map, reduce=reduce) for word, count in result_iterator(job.wait(show=True)): print(word, count)
This example could also be written by extending
See, for example, examples/util/wordcount.py.
Now comes the moment of truth.
Run the script as follows:
If everything goes well, you will see that the job executes.
The inputs are read from the tag
data:bigtxt, which was created earlier.
Finally the output is printed.
While the job is running, you can point your web
http://localhost:8989 (or some other port where you run the
Disco master) which lets you follow the progress of your job in real-time.
You can also set
DISCO_EVENTS to see job events from your console:
DISCO_EVENTS=1 python count_words.py
In this case, the events were anyway printed to the console,
since we specified
As you saw, creating a new Disco job is pretty straightforward. You could extend this simple example in any number of ways. For instance, by using the params object to include a list of stop words.
You could continue on with Extended Tutorial which is intended as a follow-on tutorial to this one.
If you pushed the data to Disco Distributed Filesystem,
you could try changing the input to
map_reader = disco.worker.task_io.chain_reader.
You could follow the DiscoDB Tutorial,
to learn more about using
discodb with Disco.
You could try using
to make the job more efficient.
You can also experiment with custom partitioning and reader functions.
They are written in the same way as map and reduce functions.
Just see some examples in the
After that, you could try chaining jobs together,
so that output of the previous job becomes input for the next one.
The best way to learn is to pick a problem or algorithm that you know well, and implement it with Disco. After all, Disco was designed to be as simple as possible so you can concentrate on your own problems, not on the framework.