parallelize
and using pair RDDs to count words.¶wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
# Print out the type of wordsRDD
print type(wordsRDD)
<class 'pyspark.rdd.RDD'>
map()
transformation to add the letter 's' to each string in the base RDD we just created. We'll define a Python function that returns the word with an 's' at the end of the word. Please replace <FILL IN>
with your solution. If you have trouble, the next cell has the solution. After you have defined makePlural
you can run the third cell which contains a test. If you implementation is correct it will print 1 test passed
.¶<FILL IN>
sections. The cell that needs to be modified will have # TODO: Replace <FILL IN> with appropriate code
on its first line. Once the <FILL IN>
sections are updated and the code is run, the test cell can then be run to verify the correctness of your solution. The last code cell before the next markdown section will contain the tests.¶# TODO: Replace <FILL IN> with appropriate code
def makePlural(word):
"""Adds an 's' to `word`.
Note:
This is a simple function that only adds an 's'. No attempt is made to follow proper
pluralization rules.
Args:
word (str): A string.
Returns:
str: A string with 's' added to it.
"""
return word+"s"
print makePlural('cat')
cats
# One way of completing the function
def makePlural(word):
return word + 's'
print makePlural('cat')
cats
# Load in the testing code and check to see if your answer is correct
# If incorrect it will report back '1 test failed' for each failed test
# Make sure to rerun any cell you change before trying the test again
from test_helper import Test
# TEST Pluralize and test (1b)
Test.assertEquals(makePlural('rat'), 'rats', 'incorrect result: makePlural does not add an s')
1 test passed.
# TODO: Replace <FILL IN> with appropriate code
pluralRDD = wordsRDD.map(makePlural)
print pluralRDD.collect()
['cats', 'elephants', 'rats', 'rats', 'cats']
# TEST Apply makePlural to the base RDD(1c)
Test.assertEquals(pluralRDD.collect(), ['cats', 'elephants', 'rats', 'rats', 'cats'],
'incorrect values for pluralRDD')
1 test passed.
# TODO: Replace <FILL IN> with appropriate code
pluralLambdaRDD = wordsRDD.map(lambda x: x+"s")
print pluralLambdaRDD.collect()
['cats', 'elephants', 'rats', 'rats', 'cats']
# TEST Pass a lambda function to map (1d)
Test.assertEquals(pluralLambdaRDD.collect(), ['cats', 'elephants', 'rats', 'rats', 'cats'],
'incorrect values for pluralLambdaRDD (1d)')
1 test passed.
# TODO: Replace <FILL IN> with appropriate code
pluralLengths = (pluralRDD.map(lambda x : len(x))
.collect())
print pluralLengths
[4, 9, 4, 4, 4]
# TEST Length of each word (1e)
Test.assertEquals(pluralLengths, [4, 9, 4, 4, 4],
'incorrect values for pluralLengths')
1 test passed.
(k, v)
where k
is the key and v
is the value. In this example, we will create a pair consisting of ('<word>', 1)
for each word element in the RDD.¶map()
transformation with a lambda()
function to create a new RDD.¶# TODO: Replace <FILL IN> with appropriate code
wordPairs = wordsRDD.map(lambda x: (x,1))
print wordPairs.collect()
[('cat', 1), ('elephant', 1), ('rat', 1), ('rat', 1), ('cat', 1)]
# TEST Pair RDDs (1f)
Test.assertEquals(wordPairs.collect(),
[('cat', 1), ('elephant', 1), ('rat', 1), ('rat', 1), ('cat', 1)],
'incorrect value for wordPairs')
1 test passed.
collect()
all of the elements and count them in the driver program. While this approach could work for small datasets, we want an approach that will work for any size dataset including terabyte- or petabyte-sized datasets. In addition, performing all of the work in the driver program is slower than performing it in parallel in the workers. For these reasons, we will use data parallel operations.¶groupByKey()
approach **¶groupByKey()
transformation groups all the elements of the RDD with the same key into a single list in one of the partitions. There are two problems with using groupByKey()
:¶groupByKey()
to generate a pair RDD of type ('word', iterator)
.¶# TODO: Replace <FILL IN> with appropriate code
# Note that groupByKey requires no parameters
wordsGrouped = wordPairs.groupByKey()
for key, value in wordsGrouped.collect():
print '{0}: {1}'.format(key, list(value))
rat: [1, 1] elephant: [1] cat: [1, 1]
# TEST groupByKey() approach (2a)
Test.assertEquals(sorted(wordsGrouped.mapValues(lambda x: list(x)).collect()),
[('cat', [1, 1]), ('elephant', [1]), ('rat', [1, 1])],
'incorrect value for wordsGrouped')
1 test passed.
groupByKey()
to obtain the counts **¶groupByKey()
transformation creates an RDD containing 3 elements, each of which is a pair of a word and a Python iterator.¶map()
transformation. The result should be a pair RDD consisting of (word, count) pairs.¶# TODO: Replace <FILL IN> with appropriate code
wordCountsGrouped = wordsGrouped.map(lambda x: (x[0],sum(x[1])) )
print wordCountsGrouped.collect()
[('rat', 2), ('elephant', 1), ('cat', 2)]
# TEST Use groupByKey() to obtain the counts (2b)
Test.assertEquals(sorted(wordCountsGrouped.collect()),
[('cat', 2), ('elephant', 1), ('rat', 2)],
'incorrect value for wordCountsGrouped')
1 test passed.
reduceByKey
**¶reduceByKey()
transformation gathers together pairs that have the same key and applies the function provided to two values at a time, iteratively reducing all of the values to a single value. reduceByKey()
operates by applying the function first within each partition on a per-key basis and then across the partitions, allowing it to scale efficiently to large datasets.¶# TODO: Replace <FILL IN> with appropriate code
# Note that reduceByKey takes in a function that accepts two values and returns a single value
wordCounts = wordPairs.reduceByKey(lambda x,y : x+y)
print wordCounts.collect()
[('rat', 2), ('elephant', 1), ('cat', 2)]
# TEST Counting using reduceByKey (2c)
Test.assertEquals(sorted(wordCounts.collect()), [('cat', 2), ('elephant', 1), ('rat', 2)],
'incorrect value for wordCounts')
1 test passed.
# TODO: Replace <FILL IN> with appropriate code
wordCountsCollected = (wordsRDD.map(lambda x: (x,1)).reduceByKey(lambda x,y:x+y)
.collect())
print wordCountsCollected
[('rat', 2), ('elephant', 1), ('cat', 2)]
# TEST All together (2d)
Test.assertEquals(sorted(wordCountsCollected), [('cat', 2), ('elephant', 1), ('rat', 2)],
'incorrect value for wordCountsCollected')
1 test passed.
# TODO: Replace <FILL IN> with appropriate code
uniqueWords = wordsRDD.distinct().count()
print uniqueWords
3
# TEST Unique words (3a)
Test.assertEquals(uniqueWords, 3, 'incorrect count of uniqueWords')
1 test passed.
# TODO: Replace <FILL IN> with appropriate code
from operator import add
totalCount = (wordCounts
.map(lambda x:x[1])
.reduce(lambda x,y : x+y))
average = totalCount / float(wordCounts.count())
print totalCount
print round(average, 2)
5 1.67
# TEST Mean using reduce (3b)
Test.assertEquals(round(average, 2), 1.67, 'incorrect value of average')
1 test passed.
wordCount
function, deal with real world problems like capitalization and punctuation, load in our data source, and compute the word count on the new data.¶wordCount
function **¶wordsRDD
and return a pair RDD that has all of the words and their associated counts.¶# TODO: Replace <FILL IN> with appropriate code
def wordCount(wordListRDD):
"""Creates a pair RDD with word counts from an RDD of words.
Args:
wordListRDD (RDD of str): An RDD consisting of words.
Returns:
RDD of (str, int): An RDD consisting of (word, count) tuples.
"""
return wordListRDD.map(lambda x: (x,1)).reduceByKey(lambda x,y:x+y)
print wordCount(wordsRDD).collect()
[('rat', 2), ('elephant', 1), ('cat', 2)]
# TEST wordCount function (4a)
Test.assertEquals(sorted(wordCount(wordsRDD).collect()),
[('cat', 2), ('elephant', 1), ('rat', 2)],
'incorrect definition for wordCount function')
1 test passed.
removePunctuation
that converts all text to lower case, removes any punctuation, and removes leading and trailing spaces. Use the Python re module to remove any text that is not a letter, number, or space. Reading help(re.sub)
might be useful.¶# TODO: Replace <FILL IN> with appropriate code
import re
def removePunctuation(text):
"""Removes punctuation, changes to lower case, and strips leading and trailing spaces.
Note:
Only spaces, letters, and numbers should be retained. Other characters should should be
eliminated (e.g. it's becomes its). Leading and trailing spaces should be removed after
punctuation is removed.
Args:
text (str): A string.
Returns:
str: The cleaned up string.
"""
text = text.strip().lower()
text=re.sub(r'[^0-9a-zA-Z\s]', '', text)
return text
print removePunctuation('Hi, you!')
print removePunctuation(' No under_score!')
hi you no underscore
# TEST Capitalization and punctuation (4b)
Test.assertEquals(removePunctuation(" The Elephant's 4 cats. "),
'the elephants 4 cats',
'incorrect definition for removePunctuation function')
1 test passed.
SparkContext.textFile()
method. We also apply the recently defined removePunctuation()
function using a map()
transformation to strip out the punctuation and change all text to lowercase. Since the file is large we use take(15)
, so that we only print 15 lines.¶# Just run this code
import os.path
baseDir = os.path.join('data')
inputPath = os.path.join('cs100', 'lab1', 'shakespeare.txt')
fileName = os.path.join(baseDir, inputPath)
shakespeareRDD = (sc
.textFile(fileName, 8)
.map(removePunctuation))
print '\n'.join(shakespeareRDD
.zipWithIndex() # to (line, lineNum)
.map(lambda (l, num): '{0}: {1}'.format(num, l)) # to 'lineNum: line'
.take(15))
0: 1609 1: 2: the sonnets 3: 4: by william shakespeare 5: 6: 7: 8: 1 9: from fairest creatures we desire increase 10: that thereby beautys rose might never die 11: but as the riper should by time decease 12: his tender heir might bear his memory 13: but thou contracted to thine own bright eyes 14: feedst thy lights flame with selfsubstantial fuel
wordcount()
function, we have to address two issues with the format of the RDD:¶map()
transformation is the way to do this, but think about what the result of the split()
function will be.¶# TODO: Replace <FILL IN> with appropriate code
shakespeareWordsRDD = shakespeareRDD.flatMap(lambda x: x.split(" "))
shakespeareWordCount = shakespeareWordsRDD.count()
print shakespeareWordsRDD.top(5)
print shakespeareWordCount
[u'zwaggerd', u'zounds', u'zounds', u'zounds', u'zounds'] 928908
# TEST Words from lines (4d)
# This test allows for leading spaces to be removed either before or after
# punctuation is removed.
Test.assertTrue(shakespeareWordCount == 927631 or shakespeareWordCount == 928908,
'incorrect value for shakespeareWordCount')
Test.assertEquals(shakespeareWordsRDD.top(5),
[u'zwaggerd', u'zounds', u'zounds', u'zounds', u'zounds'],
'incorrect value for shakespeareWordsRDD')
1 test passed. 1 test passed.
# TODO: Replace <FILL IN> with appropriate code
shakeWordsRDD = shakespeareWordsRDD.filter(lambda x : x!="")
shakeWordCount = shakeWordsRDD.count()
print shakeWordCount
882996
# TEST Remove empty elements (4e)
Test.assertEquals(shakeWordCount, 882996, 'incorrect value for shakeWordCount')
1 test passed.
wordCount()
function to produce a list of word counts. We can view the top 15 words by using the takeOrdered()
action; however, since the elements of the RDD are pairs, we need a custom sort function that sorts using the value part of the pair.¶wordCount()
function and takeOrdered()
to obtain the fifteen most common words and their counts.¶# TODO: Replace <FILL IN> with appropriate code
top15WordsAndCounts = wordCount(shakeWordsRDD).takeOrdered(15,lambda x:-1*x[1])
print '\n'.join(map(lambda (w, c): '{0}: {1}'.format(w, c), top15WordsAndCounts))
the: 27361 and: 26028 i: 20681 to: 19150 of: 17463 a: 14593 you: 13615 my: 12481 in: 10956 that: 10890 is: 9134 not: 8497 with: 7771 me: 7769 it: 7678
# TEST Count the words (4f)
Test.assertEquals(top15WordsAndCounts,
[(u'the', 27361), (u'and', 26028), (u'i', 20681), (u'to', 19150), (u'of', 17463),
(u'a', 14593), (u'you', 13615), (u'my', 12481), (u'in', 10956), (u'that', 10890),
(u'is', 9134), (u'not', 8497), (u'with', 7771), (u'me', 7769), (u'it', 7678)],
'incorrect value for top15WordsAndCounts')
1 test passed.