%%writefile mr_wc.py # this is the same example from the mrjob distribution. Just showing how to run it from inside a notebook from mrjob.job import MRJob class MRWordCountUtility(MRJob): def __init__(self, *args, **kwargs): super(MRWordCountUtility, self).__init__(*args, **kwargs) self.chars = 0 self.words = 0 self.lines = 0 def mapper(self, _, line): # Don't actually yield anything for each line. Instead, collect them # and yield the sums when all lines have been processed. The results # will be collected by the reducer. self.chars += len(line) + 1 # +1 for newline self.words += sum(1 for word in line.split() if word.strip()) self.lines += 1 def mapper_final(self): yield('chars', self.chars) yield('words', self.words) yield('lines', self.lines) def reducer(self, key, values): yield(key, sum(values)) if __name__ == '__main__': MRWordCountUtility.run() !ls !python mr_wc.py < 14826.txt %%writefile mr_inverted_index.py from mrjob.job import MRJob import os import re WORD_RE = re.compile(r"[\w']+") class MRInvertedIndex(MRJob): def __init__(self, *args, **kwargs): super(MRInvertedIndex, self).__init__(*args, **kwargs) def read_files(self, _, line): filein = open(original_path + '/' + line) for linein in filein: yield(line, linein.strip()) def find_words(self, filename, line): for word in WORD_RE.findall(line): word = word.lower() yield (word, filename) def posting_lists(self, word, files): yield (word, list(files)) def steps(self): # This function specifies the process pipeline. In this case we have a two-phases map-reduce process. # The first phase process the input files and creates an entry per each file line. The second phase # processes the lines and build the inverted index. The first process only has a mapper function, # while the second one has both mapper and reducer. return [self.mr(mapper=self.read_files), self.mr(mapper=self.find_words, reducer=self.posting_lists)] if __name__ == '__main__': original_path = os.getcwd() MRInvertedIndex.run() !ls *.txt > files.in !cat files.in !python mr_inverted_index.py < files.in > result.out !wc result.out !head -100 result.out | tail -20