%%writefile mr_wc.py
# this is the same example from the mrjob distribution. Just showing how to run it from inside a notebook
from mrjob.job import MRJob
class MRWordCountUtility(MRJob):
def __init__(self, *args, **kwargs):
super(MRWordCountUtility, self).__init__(*args, **kwargs)
self.chars = 0
self.words = 0
self.lines = 0
def mapper(self, _, line):
# Don't actually yield anything for each line. Instead, collect them
# and yield the sums when all lines have been processed. The results
# will be collected by the reducer.
self.chars += len(line) + 1 # +1 for newline
self.words += sum(1 for word in line.split() if word.strip())
self.lines += 1
def mapper_final(self):
yield('chars', self.chars)
yield('words', self.words)
yield('lines', self.lines)
def reducer(self, key, values):
yield(key, sum(values))
if __name__ == '__main__':
MRWordCountUtility.run()
Overwriting mr_wc.py
The directory contains now the python source as well as other files.
The following command lists the contents of the directory:
!ls
14826.txt 14840.txt 14828.txt files.in 14829.txt google_ngram.ipynb 14832.txt mr_inverted_index.py 14833.txt mr_wc.py 14839.txt mrjob_inverted_index.ipynb
Now we run the program with an input text file. The actual output of the program is at the end, three lines starting with "chars", "lines" and "words". The other lines are informative messages from mrjob.
!python mr_wc.py < 14826.txt
no configs found; falling back on auto-configuration no configs found; falling back on auto-configuration creating tmp directory /var/folders/vz/rkqml9x917v4m1dfz713y0q00000gn/T/mr_wc.fgonza.20150304.171640.111001 reading from STDIN writing to /var/folders/vz/rkqml9x917v4m1dfz713y0q00000gn/T/mr_wc.fgonza.20150304.171640.111001/step-0-mapper_part-00000 Counters from step 1: (no counters found) writing to /var/folders/vz/rkqml9x917v4m1dfz713y0q00000gn/T/mr_wc.fgonza.20150304.171640.111001/step-0-mapper-sorted > sort /var/folders/vz/rkqml9x917v4m1dfz713y0q00000gn/T/mr_wc.fgonza.20150304.171640.111001/step-0-mapper_part-00000 writing to /var/folders/vz/rkqml9x917v4m1dfz713y0q00000gn/T/mr_wc.fgonza.20150304.171640.111001/step-0-reducer_part-00000 Counters from step 1: (no counters found) Moving /var/folders/vz/rkqml9x917v4m1dfz713y0q00000gn/T/mr_wc.fgonza.20150304.171640.111001/step-0-reducer_part-00000 -> /var/folders/vz/rkqml9x917v4m1dfz713y0q00000gn/T/mr_wc.fgonza.20150304.171640.111001/output/part-00000 Streaming final output from /var/folders/vz/rkqml9x917v4m1dfz713y0q00000gn/T/mr_wc.fgonza.20150304.171640.111001/output "chars" 4691 "lines" 86 "words" 713 removing tmp directory /var/folders/vz/rkqml9x917v4m1dfz713y0q00000gn/T/mr_wc.fgonza.20150304.171640.111001
The following program implements an inverted indexing process using mrjob.
%%writefile mr_inverted_index.py
from mrjob.job import MRJob
import os
import re
WORD_RE = re.compile(r"[\w']+")
class MRInvertedIndex(MRJob):
def __init__(self, *args, **kwargs):
super(MRInvertedIndex, self).__init__(*args, **kwargs)
def read_files(self, _, line):
filein = open(original_path + '/' + line)
for linein in filein:
yield(line, linein.strip())
def find_words(self, filename, line):
for word in WORD_RE.findall(line):
word = word.lower()
yield (word, filename)
def posting_lists(self, word, files):
yield (word, list(files))
def steps(self):
# This function specifies the process pipeline. In this case we have a two-phases map-reduce process.
# The first phase process the input files and creates an entry per each file line. The second phase
# processes the lines and build the inverted index. The first process only has a mapper function,
# while the second one has both mapper and reducer.
return [self.mr(mapper=self.read_files),
self.mr(mapper=self.find_words,
reducer=self.posting_lists)]
if __name__ == '__main__':
original_path = os.getcwd()
MRInvertedIndex.run()
Overwriting mr_inverted_index.py
First we have to create a file with the list of files that we will process. In this case we list all the *.txt files in the current directory.
!ls *.txt > files.in
!cat files.in
14826.txt 14828.txt 14829.txt 14832.txt 14833.txt 14839.txt 14840.txt
Now we can run the program. The results are put in the file result.out.
!python mr_inverted_index.py < files.in > result.out
no configs found; falling back on auto-configuration no configs found; falling back on auto-configuration creating tmp directory /var/folders/vz/rkqml9x917v4m1dfz713y0q00000gn/T/mr_inverted_index.fgonza.20150304.171715.767302 reading from STDIN writing to /var/folders/vz/rkqml9x917v4m1dfz713y0q00000gn/T/mr_inverted_index.fgonza.20150304.171715.767302/step-0-mapper_part-00000 Counters from step 1: (no counters found) writing to /var/folders/vz/rkqml9x917v4m1dfz713y0q00000gn/T/mr_inverted_index.fgonza.20150304.171715.767302/step-1-mapper_part-00000 Counters from step 2: (no counters found) writing to /var/folders/vz/rkqml9x917v4m1dfz713y0q00000gn/T/mr_inverted_index.fgonza.20150304.171715.767302/step-1-mapper-sorted > sort /var/folders/vz/rkqml9x917v4m1dfz713y0q00000gn/T/mr_inverted_index.fgonza.20150304.171715.767302/step-1-mapper_part-00000 writing to /var/folders/vz/rkqml9x917v4m1dfz713y0q00000gn/T/mr_inverted_index.fgonza.20150304.171715.767302/step-1-reducer_part-00000 Counters from step 2: (no counters found) Moving /var/folders/vz/rkqml9x917v4m1dfz713y0q00000gn/T/mr_inverted_index.fgonza.20150304.171715.767302/step-1-reducer_part-00000 -> /var/folders/vz/rkqml9x917v4m1dfz713y0q00000gn/T/mr_inverted_index.fgonza.20150304.171715.767302/output/part-00000 Streaming final output from /var/folders/vz/rkqml9x917v4m1dfz713y0q00000gn/T/mr_inverted_index.fgonza.20150304.171715.767302/output removing tmp directory /var/folders/vz/rkqml9x917v4m1dfz713y0q00000gn/T/mr_inverted_index.fgonza.20150304.171715.767302
We show some results from the output file, which have 785 lines in total.
!wc result.out
!head -100 result.out | tail -20
785 2738 33250 result.out "april" ["14826.txt", "14840.txt", "14840.txt"] "arbitration" ["14839.txt"] "are" ["14826.txt", "14826.txt", "14826.txt", "14826.txt", "14826.txt", "14828.txt", "14833.txt", "14839.txt", "14840.txt", "14840.txt", "14840.txt", "14840.txt"] "around" ["14833.txt"] "as" ["14826.txt", "14826.txt", "14840.txt"] "asia's" ["14826.txt"] "asian" ["14826.txt", "14826.txt"] "asked" ["14826.txt"] "association" ["14826.txt"] "at" ["14826.txt", "14826.txt", "14826.txt", "14839.txt", "14839.txt", "14840.txt", "14840.txt"] "august" ["14829.txt"] "australia" ["14839.txt"] "australia's" ["14826.txt", "14826.txt"] "australian" ["14826.txt", "14839.txt"] "avowed" ["14826.txt"] "awaiting" ["14826.txt"] "aware" ["14826.txt"] "bad" ["14828.txt"] "baht" ["14832.txt", "14832.txt", "14832.txt"] "ban" ["14839.txt", "14839.txt", "14839.txt"]