DiscoDB Tutorial¶
This tutorial is a guide to using DiscoDBs
in Disco.
See also
discodb
.
Create a DiscoDB¶
First, let’s modify the word count example to write its output to a
DiscoDB
:
"""
This example could be run and the results printed from the `examples/util` directory in Disco:
disco run wordcount_ddb.WordCount http://discoproject.org/media/text/chekhov.txt
"""
from disco.core import Job
from disco.util import kvgroup
from disco.schemes.scheme_discodb import discodb_stream
class WordCount(Job):
reduce_output_stream = discodb_stream
@staticmethod
def map(line, params):
for word in line.split():
yield word, 1
@staticmethod
def reduce(iter, params):
for word, counts in kvgroup(sorted(iter)):
yield word, str(sum(counts))
Notice how all we needed to do was change the
reduce_output_stream to disco.schemes.scheme_discodb.discodb_stream()
,
and turn the count into a str
.
Remember, DiscoDBs only store byte sequences as keys and values,
its up to the user to serialize objects; in this case we just use str.
Query a DiscoDB¶
Next, lets write a job to query the DiscoDBs
.
"""
This example could be run and the results printed from the `examples/util` directory in Disco:
python query_ddb.py <query> <input> ...
"""
import sys
from disco.core import Job, Disco, result_iterator
from disco.worker.classic.func import nop_map
from disco.schemes.scheme_discodb import input_stream
class Query(Job):
map_input_stream = (input_stream, )
map = staticmethod(nop_map)
@staticmethod
def map_reader(discodb, size, url, params):
for k, vs in discodb.metaquery(params):
yield k, list(vs)
if __name__ == '__main__':
from query_ddb import Query
job = Query().run(input=sys.argv[2:], params=sys.argv[1])
for k, vs in result_iterator(job.wait()):
print('{0}\t{1}'.format(k, sum(int(v) for v in vs)))
Now let’s try creating our word count db from scratch, and querying it:
$ cd disco/examples/util
$ disco run wordcount_ddb.WordCount http://discoproject.org/media/text/chekhov.txt
WordCount@515:66e88:d39
$ disco results @ | xargs python query_ddb.py 'word'
word 18
$ disco results @?WordCount | xargs python query_ddb.py 'this | word'
this | word 217
Hint
The special arguments @
and @?<string>
are replaced by the most recent job name and
the most recent job with name matching <string>
, respectively.
See discocli
.
There are a few things to note in this example.
First of all, we use a disco.worker.classic.func.nop_map()
,
since we do all the real work in our map_reader.
We use a builtin disco.worker.task_io.input_stream()
,
to return a DiscoDB
from the file on disk,
and that’s the object our map_reader gets as a handle.
Notice also how we turn vs into a list.
This is because discodb.DiscoDB.metaquery()
returns lazy objects,
which cannot be pickled.
Finally, notice how we run the disco.job.Job
.
We make the input and params runtime parameters,
since we are pulling them from the command line.
When we iterate over the results,
we deserialize our counts using int
,
and sum them together.
Make sure you understand why we sum the counts together, why there could possibly be more than one count in the result (even though we only had one global reduce in our word count job).
Hint
Look at the second query we executed on the command line.