DiscoDB Tutorial

This tutorial is a guide to using DiscoDBs in Disco.

See also


Create a DiscoDB

First, let’s modify the word count example to write its output to a DiscoDB:

This example could be run and the results printed from the `examples/util` directory in Disco:

disco run wordcount_ddb.WordCount http://discoproject.org/media/text/chekhov.txt
from disco.core import Job
from disco.util import kvgroup

from disco.schemes.scheme_discodb import discodb_stream

class WordCount(Job):
    reduce_output_stream = discodb_stream

    def map(line, params):
        for word in line.split():
            yield word, 1

    def reduce(iter, params):
        for word, counts in kvgroup(sorted(iter)):
            yield word, str(sum(counts))

Notice how all we needed to do was change the reduce_output_stream to disco.schemes.scheme_discodb.discodb_stream(), and turn the count into a str. Remember, DiscoDBs only store byte sequences as keys and values, its up to the user to serialize objects; in this case we just use str.

Query a DiscoDB

Next, lets write a job to query the DiscoDBs.

This example could be run and the results printed from the `examples/util` directory in Disco:

python query_ddb.py <query> <input> ...
import sys
from disco.core import Job, Disco, result_iterator

from disco.worker.classic.func import nop_map
from disco.schemes.scheme_discodb import input_stream

class Query(Job):
    map_input_stream = (input_stream, )
    map = staticmethod(nop_map)

    def map_reader(discodb, size, url, params):
        for k, vs in discodb.metaquery(params):
            yield k, list(vs)

if __name__ == '__main__':
    from query_ddb import Query
    job = Query().run(input=sys.argv[2:], params=sys.argv[1])
    for k, vs in result_iterator(job.wait()):
        print('{0}\t{1}'.format(k, sum(int(v) for v in vs)))

Now let’s try creating our word count db from scratch, and querying it:

$ cd disco/examples/util
$ disco run wordcount_ddb.WordCount http://discoproject.org/media/text/chekhov.txt
$ disco results @ | xargs python query_ddb.py 'word'
word    18
$ disco results @?WordCount | xargs python query_ddb.py 'this | word'
this | word     217


The special arguments @ and @?<string> are replaced by the most recent job name and the most recent job with name matching <string>, respectively. See discocli.

There are a few things to note in this example. First of all, we use a disco.worker.classic.func.nop_map(), since we do all the real work in our map_reader. We use a builtin disco.worker.task_io.input_stream(), to return a DiscoDB from the file on disk, and that’s the object our map_reader gets as a handle.

Notice also how we turn vs into a list. This is because discodb.DiscoDB.metaquery() returns lazy objects, which cannot be pickled.

Finally, notice how we run the disco.job.Job. We make the input and params runtime parameters, since we are pulling them from the command line. When we iterate over the results, we deserialize our counts using int, and sum them together.

Make sure you understand why we sum the counts together, why there could possibly be more than one count in the result (even though we only had one global reduce in our word count job).


Look at the second query we executed on the command line.