#!/usr/bin/env python # coding: utf-8 # # Python Map Reduce # # Map reduce is a powerful technique adopted from functional programming approaches # # *Rather confusingly, there is also a framework known as __MapReduce__ which is slightly different. For the purposes of TM351, we are considering the functional map reduce approach, with a space, or hyphen, between the __map__ and __reduce__, not the UpperCamelCase framework version) approach. Also note that map-reduce also wraps the map-filter-apply approach.* # # This notebook provides a quick example that shows how the map-reduce pattern works in Python. #
Note to module team: if we are to move to an approach that places more emphasis on vectorisation and functional methods, it could be useful if we all do one ro two different treatments of map-reduce to see if we can come up with something *really* solid. We could perhaps introduce this much earlier in the context of the map-filter-reduce pattern. Do we do that somewhere? We took that approach to describing several pandas operators as the [split-apply-combine](https://ouseful-oer.github.io/openlearn-learntocode/week_08/ltc_08_01.html) pattern in the Learn to Code for Data Analysis FutureLearn / OpenLearn unit.
# ## Loading the Required Packages # # Python provides a `map` function as part of the base language, but the `reduce` function needs to be loaded in from the `functools` package. # In[1]: from functools import reduce # ## Introducing Map-Reduce # # As an *idea*, map-reduce provides us with a way of processing a large or streaming dataset (that is, a data set where data keeps being added to it, such as a data from a live Twitter feed or telemetry data streamed from an instrumentation source). # # The `map` element simply allows us to apply a function to all the items in an input list. When you use the `.apply(axis=1)` operator to apply a function to each row of a *pandas* dataframe in turn, you are performing a `map` operation. Given a list of items, the `map` operation returns another list of items of the same length. # # If we generalise the notion of a list to a Python `iterator`, to which we might "add" data from a live, streaming data source, you can see how the `map` approach would allow us to process the data one item at a time, as it is made available by the iterator. For convenience, we'll refer to the function applied by the `map` as the *mapped* or *mapper* function. # # Whilst it's useful to be able to apply the same function to every item in a list so that we can process each item in the same way, sometimes we need to summarise the data, for example by finding a running total of values retrieved from the list, or running average of the values as we retrieve them from the list. # # The `reduce` function performs this recombination step. The reduce function takes two arguments: an initial value, and the next element from a list. The reduce function applies a summarising, or reducing, function to the two arguments and returns a value that becomes the new initial value. Then it gets the next item from the list, and does the same thing again. Essentially, the `reduce` function performs a rolling computation over an initial value and the next value retrieved from a list, with the result becoming the initial value for the next application of the aggregation, or reducing, function. For convenience, we'll refer to the function applied by the `reduce` as the *summarising* or *reducer* function. When the reducer is first called, the initial value is the value of the first item in the list, and the next list item is the second item from the (original) list. # # Note that a `reduce` operation does not require a preceding `map` operation. We can apply a `reduce` (if it is meaningful to do so) to *any* list. # # As well as allowing us to process large amounts of data in a manageable way — one piece of data at a time, essentially — `map` and `reduce` functions also play a very important role in supporting *parallelisation*, by allowing us to decompose a task into multiple subtasks, apply those on separate compute nodes *in parallel*, and then recombine the results. # # For example, the function computed by `map` step might be very intensive, so each value retrieved from the input list might be passed to a separate computational node for the function to be applied to it. The `reduce` step can then be used to recombine the separately computed elements, as they become available, into a single value. # ## Getting Started with Map-Reduce in Python # # One of the most commonly used examples for demonstrating map reduce is the task of squaring a set of numbers and then summing the squares. # # First, we need to create a list of numbers that will provide us with our initial list of values to which we'll apply the `map`: # In[2]: numbers = list(range(1, 6)) numbers # The initial step of *squaring* the numbers can be applied using the `map` function. # # Let's create the *mapper*, the function we want the `map` to apply: # In[3]: def square(n): """Return the squared value of the provided value.""" return n ** 2 # n to the power 2. Alternatively, we could have written n * n # Let's test it works correctly: # In[4]: assert square(2) == 4 assert square(-3) == 9 assert square(0) == 0 # We can apply the `square()` function to each item in the list using the `map` function as follows: # # ```python # map(mapper_function_name, list) # ``` # # Using the `map` to apply the mapper function returns a map iterator. If we unpack all the values of the iterator by casting it to a list, we get a list of the same length as the input list: # In[5]: map(square, numbers) # In[6]: squared = list(map(square, numbers)) squared # Now we need to create the *reducer* function that the `reduce` will apply to combine the items from the mapped list. # # Before we create our reducer to sum the squared list values, let's create a function that will convert the values to strings and concatenate them. This will demonstrate a little more clearly how the `reduce` function works. # # Here's the function: # In[7]: def str_concat(x, y): """Convert each item to a string and join them with a +.""" return '+'.join([str(x), str(y)]) # Let's see how the function works in practice: # In[8]: str_concat(1, 2) # What happens if the first argument is the result of a previous application of the same function? # In[9]: str_concat(str_concat(1, 2), 3) # Now let's see what happens if we treat that function as a reducer applied to the original (non-squared) list of numbers. # # We call the reducer using the form: # # ```python # reduce(reducer_function_name, list) # ``` # # So what happens? # In[10]: reduce(str_concat, numbers) # The `str_concat` function is first applied to the first two items in the list. The result is then used as the first (`x`) value in the next call of the reducer and the next item in the list is the `y` value. # # # We can see how often the reducer function is called by instrumenting it. The following function makes use of a `global` variable to keep track of how many times the function has been called and reports this value, as well as the input values used to make the call: # In[11]: callnumber = 0 def str_concat_callnumber(x, y): """Convert each item to a string and join them with +'s dependent on the input.""" global callnumber #Add to the call number each time we call the function callnumber = callnumber + 1 # Updated VM should update python and cope with f strings print('At call {callnumber} with x={x} and y={y}.'.format(callnumber=callnumber, x=x, y=y)) return '+'.join([str(x), str(y)]) # Let's see how it works: # In[12]: str_concat_callnumber(1, 2) # What do we see if we call the function with a previous call to it as the first argument, remembering to reset the `callnumber` count before we start? # In[13]: callnumber = 0 str_concat_callnumber(str_concat_callnumber(1, 2), 3) # On the first pass, we take the first two elements into the `str_concat_callnumber` function, increments the call counter, and returns a result. Then the `str_concat_callnumber` is called again, taking the output from its first invocation as its first argument and displaying the updated counter value as well as its arguments. # # What happens if we use the `str_concat_callnumber` function as our reducer function? Can you predict what you will see? # In[14]: callnumber = 0 reduce(str_concat_callnumber, numbers) # We see that the `str_concat_depth()` function must have been called multiple times by the `reduce` function, operating on a pair of values at each step. # Now let's go back to our original problem, of finding the summed square values of items in the originally presented list. # # Our reducer function simply needs to add two values together: # In[15]: def add_pair(x, y): """Return the sum of the two input values.""" return x + y # Let's call this function on the list of squared numbers: # In[16]: reduce(add_pair, squared) # We could also call it as a one liner over the original list: # In[17]: reduce(add_pair, map(square, numbers)) # Is this correct? # In[18]: assert (1 * 1) + (2 * 2) + (3 * 3) + (4 * 4) + (5 * 5) == reduce(add_pair, map(square, numbers)) # Yes, it seems to be. # ### On the type and structure of reducer arguments and `return` values # # Note that in the simplest of reducers, the `type` or structure of the first argument needs to be a similar `type` or structure as the elements in the list: the reducer function should return an item of the same `type` or structure as the list members. As far as the reducer function is concerned, the first value could be provided either by the list, or by the reducer function. # # In the `str_concat_depth()` function, we were able to return a different type from the reducer to the values retrieved from the list because the reducer function cast the types of both inputs a desired type (a `str`) *before* operating on them. # ## Activity 1 # # One of the ways in which the map-reduce appraoch can be used is to do some counting in the context of a `map` function applied to some value, and then aggregate those counts using a `reduce`. # # In this activity, you will use such an approach to count the number of vowels and consonants in a list of words. # # We can use the list comprehension `[vowel for vowel in word if vowel in 'aeiou']` to extract a list of vowels from a string. # In[19]: [vowel for vowel in 'amazing' if vowel in 'aeiou'] # Use the map-reduce pattern to count the number of vowels and consonants in a piece of text, providing tests for your functions as appropriate. The text is represented as a list of lower case words containing only vowels or consonants. # # As part of your answer, create two functions, called `mapper` and `reducer`. Have each function do the appropriate part of the map-reduce process. # # What advantages do you see from using this approach rather than a more simplistic approach of simply counting all the vowels and consonants in a single string made by joining together all the words in the list. # # Test your result against the following (correct) results: # # ``` # assert YOUR_RESULT_FOR_CONSONANTS == 442 # assert YOUR_RESULT_FOR_VOWELS == 292 # ``` # # *Hint: the map phase should count the vowels and consonants in each word, returning some data structure containing the two count values. The reduce phase should increase the count of vowels and consonants as appropriate.* # In[20]: txt = ['it', 'was', 'the', 'best', 'of', 'times', 'it', 'was', 'the', 'worst', 'of', 'times', 'it', 'was', 'the', 'age', 'of', 'wisdom', 'it', 'was', 'the', 'age', 'of', 'foolishness', 'it', 'was', 'the', 'epoch', 'of', 'belief', 'it', 'was', 'the', 'epoch', 'of', 'incredulity', 'it', 'was', 'the', 'season', 'of', 'light', 'it', 'was', 'the', 'season', 'of', 'darkness', 'it', 'was', 'the', 'spring', 'of', 'hope', 'it', 'was', 'the', 'winter', 'of', 'despair', 'we', 'had', 'everything', 'before', 'us', 'we', 'had', 'nothing', 'before', 'us', 'we', 'were', 'all', 'going', 'direct', 'to', 'heaven', 'we', 'were', 'all', 'going', 'direct', 'the', 'other', 'way', 'in', 'short', 'the', 'period', 'was', 'so', 'far', 'like', 'the', 'present', 'period', 'that', 'some', 'of', 'its', 'noisiest', 'authorities', 'insisted', 'on', 'its', 'being', 'received', 'for', 'good', 'or', 'for', 'evil', 'in', 'the', 'superlative', 'degree', 'of', 'comparison', 'only', 'there', 'were', 'a', 'king', 'with', 'a', 'large', 'jaw', 'and', 'a', 'queen', 'with', 'a', 'plain', 'face', 'on', 'the', 'throne', 'of', 'england', 'there', 'were', 'a', 'king', 'with', 'a', 'large', 'jaw', 'and', 'a', 'queen', 'with', 'a', 'fair', 'face', 'on', 'the', 'throne', 'of', 'france', 'in', 'both', 'countries', 'it', 'was', 'clearer', 'than', 'crystal', 'to', 'the', 'lords', 'of', 'the', 'state', 'preserves', 'of', 'loaves', 'and', 'fishes', 'that', 'things', 'in', 'general', 'were', 'settled', 'for', 'ever'] # In[21]: # Insert your solution here. # You may find it convenient to construct your answer over several code cells, # making sure to comment your code where appropriate and # using additional markdown cell that explain the reasoning behind your code # as you develop your solution. # *What benefits do you see from using the map-reduce approach to count the vowels and consonants in the word list? Write your thoughts here.* # #### Answer # # *Click the arrow in the margin to reveal a worked answer.* # The mapper needs to take in a string and return a count of vowels and a count of consonants. We could use a `dict`, such as `{'consonants': c_count, 'vowels': v_count}` or a `tuple` (named or otherwise; most simply `(v_count, c_count)`) to record the individual word counts. # # I'll use the `dict` approach: # In[22]: def _mapper(word): """Return a count of consonants and vowels in a word.""" # Note that in the provided list, the words are all lower case # so we can forget about handling uppercase vs lower case etc. # # In the general case, there may be characters in a word that are not # vowels or consonants (hyphens, apostrophes, etc) # so len(vowels) + len(consonants) does not necessarily equal len(word) # # For now, with the list provided, words contain only vowels or consonants, # so we can optimise... vowels_count = len([l for l in word if l in 'aeiou']) # consonants_count = len([l for l in word if l not in 'aeiou']) # Or more efficiently for words only containing vowels and consonants # where (vowels_count + consonants_count) == len(word) consonants_count = len(word) - vowels_count return {'vowels': vowels_count, 'consonants': consonants_count} # Let's test that function: # In[23]: assert _mapper('hello') == {'vowels': 2, 'consonants': 3} # For the reducer, we need to provided a way of summing values provided by two dictionaries (or if you used the `tuple` approach, two `tuples`) and returning a `dict` (or `tuple`) of the same form: # In[24]: def _reducer(running_totals, next_value): """Separately sum the values of vowels and consonants counts from two dicts.""" return {'vowels': running_totals['vowels'] + next_value['vowels'], 'consonants': running_totals['consonants'] + next_value['consonants']} # Let's test that function: # In[25]: # A simple assertion test may not work if the dict attributes are returned in a different order result = _reducer({'vowels': 10, 'consonants': 10}, {'vowels': 2, 'consonants': 3}) assert result['vowels'] == 12 assert result['consonants'] == 13 # Do things still work if we pass a result from the reducer back into the reducer as the first argument? # In[26]: result = _reducer({'vowels': 10, 'consonants': 10}, {'vowels': 2, 'consonants': 3}) result = _reducer(result, {'vowels': 3, 'consonants': 4}) assert result['vowels'] == 15 assert result['consonants'] == 17 # Now let's `map` our mapper function against the text (`txt`) and use a `reduce` to apply our reducer function to the result: # In[27]: _result = reduce(_reducer, map(_mapper, txt)) _result # And check the result against the provided assertion tests: # In[28]: assert _result['consonants'] == 442 assert _result['vowels'] == 292 # As to what benefits there are from using the map-reduce approach, I can imagine that if I had a really long list of words, I would need to: # # - hold them all in memory; if the list was too big, this might kill my machine; # - process the string as a single task; if the list is very long, this may take a long time. # # In a map reduce context, if the words were pulled from a database, for example, a cursor could be used to access them one at a time, which would reduce memory overhead. Also, if I had multiple processors or processor cores available, I might be able to use them separately and in parallel to each count some of the vowels in some of the words, using a `reduce` to combine their separate results; this should provide a speed up in time to complete the count. # ## Activity 2 # Use the map-reduce pattern to find the average word length in the text above. # # Test any functions you create as appropriate. # # Think about what the partial results should look like so they can be combined in the `reduce` stage. # # Test your final average answer by comparing it the value obtained by finding the length of the single string may be joining the items in the original list together (`''.join(txt)` and dividing it by length of the string by the number of words in the list (`len(txt)`). # # Comment on the strategy you used to compute the final average word length. # # *Hint: if the reducer returns an average and the mapped list returns a count, are they the same sort of thing? And you really want to calculate the average at each step anyway?* # In[29]: # Insert your solution here. # You may find it convenient to construct your answer over several code cells, # making sure to comment your code where appropriate and # using additional markdown cell that explain the reasoning behind your code # as you develop your solution. # *Add your comments and reflections about the approach you took to calculate the final average here.* # #### Answer # # *Click the arrow in the margin to reveal a worked answer.* # We can't combine averages of different groups without knowing the number of words in each group: if my average so far after reducing 500 mapped elements is five characters per word, and the next word has 7 characters, the overall average word length is *not* 6 characters, it's `((500 * 5) + 7) / 501`, which is to say the total length of the words provided so far divided by the number of words seen so far. # # Therefore, we need a data structure that records two out of the three elements from the average seen so far, the total number of words seen so far, and the total summed length of words seen so far. From any two, we'll be able to get the average somehow. # # The mapper is quite simple - it simply needs to return the number of characters in a single word, and a single word count incrementer. # In[30]: def _mapper_char_count(word): """Return the number of characters in a word along with a count incrementer.""" return {'length': len(word), 'count': 1} # Check that it runs okay: # In[31]: _mapper_char_count('hello') # And also test the function: # In[32]: _result = _mapper_char_count('hello') assert _result['length'] == 5 assert _result['count'] == 1 # For the reducer, we might record the total length of all words so far, as well as the count: # In[33]: def _reducer_total_length(total_length_so_far, next_length): """Keep a running count of summed total word length and word count.""" return {'length': total_length_so_far['length'] + next_length['length'], 'count': total_length_so_far['count'] + next_length['count']} # And test it: # In[34]: _result = _reducer_total_length({'count': 1, 'length': 5}, {'count': 1, 'length': 2}) assert _result['count'] == 2 assert _result['length'] == 7 # And test it can feed from itself: # In[35]: _result = _reducer_total_length({'count': 1, 'length': 5}, {'count': 1, 'length': 2}) _result = _reducer_total_length(_result, {'count': 1, 'length': 3}) assert _result['count'] == 3 assert _result['length'] == 10 # Now run map-reduce calculation: # In[36]: _result = reduce(_reducer_total_length, map(_mapper_char_count, txt)) _result # And find the average word length: # In[37]: average_word_length = _result['length'] / _result['count'] average_word_length # Does this equal the average obtained by calculating it without using the map-reduce approach? # In[38]: assert average_word_length == len(''.join(txt)) / len(txt) # As an alternative approach, what would our reducer function look like if we'd calculated the average as a running average? # In[39]: def _reducer_running_average(average_length_so_far, next_length): """Keep a running average word length and word count.""" _total_length_so_far = average_length_so_far['length'] * average_length_so_far['count'] _new_total_length = _total_length_so_far + next_length['length'] _new_count = average_length_so_far['count'] + next_length['count'] return {'length': _new_total_length / _new_count, 'count': _new_count} # That looks a bit more complex... # # Does it work? # In[40]: _result = _reducer_running_average({'count': 1, 'length': 5}, {'count': 1, 'length': 7}) assert _result['count'] == 2 assert _result['length'] == 6 # How about a more complicated test (the tests are harder to figure out too!): # In[41]: _result = _reducer_running_average({'count': 3, 'length': 4}, {'count': 1, 'length': 8}) assert _result['count'] == 4 assert _result['length'] == 5 # not 6... # In[42]: _result = reduce(_reducer_running_average, map(_mapper_char_count, txt)) _result # And does that check out? # In[43]: assert _result['length'] == len(''.join(txt)) / len(txt) # Looking at the approach of calculating the running average as part of the map step, this is far more convoluted and computationally intensive: we have having to do multiplications and divisions at each step, and introduing floating point numbers where we only used integers when calculating the running total and then performing the avergaing calculation *when we actually needed it*. # ## Combining Partial Results Obtained From the Reducer Using the Reducer # # *Not provided in original notebook 16.2* # # One of the possibilities that arises from using map-reduce is that we might combine partial results from the application of the reducer to do different sets of mapped values using a third application of the reducer. # # For example, let's split the list of words into two separate lists: # In[44]: txt1 = txt[:100] txt2 = txt[100:] # Using the mapper and reducer from the first activity in this notebook, we can map and reduce each sublist separately: # In[45]: _map_reduce1 = reduce(_reducer, map(_mapper, txt1)) _map_reduce2 = reduce(_reducer, map(_mapper, txt2)) _map_reduce1, _map_reduce2 # If we create a list of length 2 using this two partial resulats, we can combine them using a further application of the reducer: # In[46]: partials = [_map_reduce1, _map_reduce2] reduce(_reducer, partials) # Those numbers may look familiar... # # Let's check that these match the values calculated using the map-reduce method applied to the singlt, full list of words: # In[47]: assert reduce(_reducer, partials)['consonants'] == reduce(_reducer, map(_mapper, txt))['consonants'] assert reduce(_reducer, partials)['vowels'] == reduce(_reducer, map(_mapper, txt))['vowels'] # The map-reduce approach is very powerful and can be used to provide an efficient means of decomposing a problem into separately computable parts that can be scaled across multiple parallel processes or tasks. # # The map-reduce approach is most powerful when the reducer function returns elements that have a similar structure to the list elements it processes, because this minimises the amount of computation required to actually compute the function that that the reducer is intended to apply. Ideally, the reducer wants to be agnostic as to whether it has been given one of its arguments from the incoming list or from a previous application of itself. # ## What next? # If you are working through this Notebook as part of an inline exercise, return to the module materials now. # # If you are working through this set of Notebooks as a whole, move on to `16.3 Accident analysis map-reduce`. #
There is an opportunity here to drop into asyncio or dask stuff, run several tasks, and show how map-reduce can be applied in a multiprocessing / parallel processing context.