version 1.0.2
collect()
on any datasets. When you are using a small dataset, calling collect()
and then using Python to get a sense for the data locally (in the driver program) will work fine, but this will not work when you are using a large dataset that doesn't fit in memory on one machine. Solutions that call collect()
and do local analysis that could have been done with Spark will likely fail in the autograder and not receive full credit.¶import sys
import os
from test_helper import Test
baseDir = os.path.join('data')
inputPath = os.path.join('cs100', 'lab4', 'small')
ratingsFilename = os.path.join(baseDir, inputPath, 'ratings.dat.gz')
moviesFilename = os.path.join(baseDir, inputPath, 'movies.dat')
ratings.dat.gz
) is formatted as:¶UserID::MovieID::Rating::Timestamp
¶movies.dat
) dataset is formatted as:¶MovieID::Title::Genres
¶Genres
field has the format¶Genres1|Genres2|Genres3|...
¶split()
to parse their lines.¶numPartitions = 2
rawRatings = sc.textFile(ratingsFilename).repartition(numPartitions)
rawMovies = sc.textFile(moviesFilename)
def get_ratings_tuple(entry):
""" Parse a line in the ratings dataset
Args:
entry (str): a line in the ratings dataset in the form of UserID::MovieID::Rating::Timestamp
Returns:
tuple: (UserID, MovieID, Rating)
"""
items = entry.split('::')
return int(items[0]), int(items[1]), float(items[2])
def get_movie_tuple(entry):
""" Parse a line in the movies dataset
Args:
entry (str): a line in the movies dataset in the form of MovieID::Title::Genres
Returns:
tuple: (MovieID, Title)
"""
items = entry.split('::')
return int(items[0]), items[1]
ratingsRDD = rawRatings.map(get_ratings_tuple).cache()
moviesRDD = rawMovies.map(get_movie_tuple).cache()
ratingsCount = ratingsRDD.count()
moviesCount = moviesRDD.count()
print 'There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount)
print 'Ratings: %s' % ratingsRDD.take(3)
print 'Movies: %s' % moviesRDD.take(3)
assert ratingsCount == 487650
assert moviesCount == 3883
assert moviesRDD.filter(lambda (id, title): title == 'Toy Story (1995)').count() == 1
assert (ratingsRDD.takeOrdered(1, key=lambda (user, movie, rating): movie)
== [(1, 1, 5.0)])
There are 487650 ratings and 3883 movies in the datasets Ratings: [(1, 1193, 5.0), (1, 914, 3.0), (1, 2355, 5.0)] Movies: [(1, u'Toy Story (1995)'), (2, u'Jumanji (1995)'), (3, u'Grumpier Old Men (1995)')]
sortByKey()
method. However this choice is problematic, as we can still end up with different results if the key is not unique.¶unicode
type instead of the string
type as the titles are in unicode characters.¶tmp1 = [(1, u'alpha'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'delta')]
tmp2 = [(1, u'delta'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'alpha')]
oneRDD = sc.parallelize(tmp1)
twoRDD = sc.parallelize(tmp2)
oneSorted = oneRDD.sortByKey(True).collect()
twoSorted = twoRDD.sortByKey(True).collect()
print oneSorted
print twoSorted
assert set(oneSorted) == set(twoSorted) # Note that both lists have the same elements
assert twoSorted[0][0] < twoSorted.pop()[0] # Check that it is sorted by the keys
assert oneSorted[0:2] != twoSorted[0:2] # Note that the subset consisting of the first two elements does not match
[(1, u'alpha'), (1, u'epsilon'), (1, u'delta'), (2, u'alpha'), (2, u'beta'), (3, u'alpha')] [(1, u'delta'), (1, u'epsilon'), (1, u'alpha'), (2, u'alpha'), (2, u'beta'), (3, u'alpha')]
take(2)
), then we would observe different answers - that is a really bad outcome as we want identical input data to always yield identical output. A better technique is to sort the RDD by both the key and value, which we can do by combining the key and value into a single string and then sorting on that string. Since the key is an integer and the value is a unicode string, we can use a function to combine them into a single unicode string (e.g., unicode('%.3f' % key) + ' ' + value
) before sorting the RDD using sortBy().¶def sortFunction(tuple):
""" Construct the sort string (does not perform actual sorting)
Args:
tuple: (rating, MovieName)
Returns:
sortString: the value to sort with, 'rating MovieName'
"""
key = unicode('%.3f' % tuple[0])
value = tuple[1]
return (key + ' ' + value)
print oneRDD.sortBy(sortFunction, True).collect()
print twoRDD.sortBy(sortFunction, True).collect()
[(1, u'alpha'), (1, u'delta'), (1, u'epsilon'), (2, u'alpha'), (2, u'beta'), (3, u'alpha')] [(1, u'alpha'), (1, u'delta'), (1, u'epsilon'), (2, u'alpha'), (2, u'beta'), (3, u'alpha')]
sortFunction
we defined.¶oneSorted1 = oneRDD.takeOrdered(oneRDD.count(),key=sortFunction)
twoSorted1 = twoRDD.takeOrdered(twoRDD.count(),key=sortFunction)
print 'one is %s' % oneSorted1
print 'two is %s' % twoSorted1
assert oneSorted1 == twoSorted1
one is [(1, u'alpha'), (1, u'delta'), (1, u'epsilon'), (2, u'alpha'), (2, u'beta'), (3, u'alpha')] two is [(1, u'alpha'), (1, u'delta'), (1, u'epsilon'), (2, u'alpha'), (2, u'beta'), (3, u'alpha')]
getCountsAndAverages()
that takes a single tuple of (MovieID, (Rating1, Rating2, Rating3, ...)) and returns a tuple of (MovieID, (number of ratings, averageRating)). For example, given the tuple (100, (10.0, 20.0, 30.0))
, your function should return (100, (3, 20.0))
¶# TODO: Replace <FILL IN> with appropriate code
# First, implement a helper function `getCountsAndAverages` using only Python
def getCountsAndAverages(IDandRatingsTuple):
""" Calculate average rating
Args:
IDandRatingsTuple: a single tuple of (MovieID, (Rating1, Rating2, Rating3, ...))
Returns:
tuple: a tuple of (MovieID, (number of ratings, averageRating))
"""
movie = IDandRatingsTuple[0]
ratings = IDandRatingsTuple[1]
return (movie, (len(ratings), float(sum(ratings)) / len(ratings)))
# TEST Number of Ratings and Average Ratings for a Movie (1a)
Test.assertEquals(getCountsAndAverages((1, (1, 2, 3, 4))), (1, (4, 2.5)),
'incorrect getCountsAndAverages() with integer list')
Test.assertEquals(getCountsAndAverages((100, (10.0, 20.0, 30.0))), (100, (3, 20.0)),
'incorrect getCountsAndAverages() with float list')
Test.assertEquals(getCountsAndAverages((110, xrange(20))), (110, (20, 9.5)),
'incorrect getCountsAndAverages() with xrange')
1 test passed. 1 test passed. 1 test passed.
getCountsAndAverages()
helper function with Spark to determine movies with highest average ratings.¶ratingsRDD
contains tuples of the form (UserID, MovieID, Rating). From ratingsRDD
create an RDD with tuples of the form (MovieID, Python iterable of Ratings for that MovieID). This transformation will yield an RDD of the form: [(1, <pyspark.resultiterable.ResultIterable object at 0x7f16d50e7c90>), (2, <pyspark.resultiterable.ResultIterable object at 0x7f16d50e79d0>), (3, <pyspark.resultiterable.ResultIterable object at 0x7f16d50e7610>)]
. Note that you will only need to perform two Spark transformations to do this step.¶movieIDsWithRatingsRDD
and your getCountsAndAverages()
helper function, compute the number of ratings and average rating for each movie to yield tuples of the form (MovieID, (number of ratings, average rating)). This transformation will yield an RDD of the form: [(1, (993, 4.145015105740181)), (2, (332, 3.174698795180723)), (3, (299, 3.0468227424749164))]
. You can do this step with one Spark transformation¶moviesRDD
, apply RDD transformations that use movieIDsWithAvgRatingsRDD
to get the movie names for movieIDsWithAvgRatingsRDD
, yielding tuples of the form (average rating, movie name, number of ratings). This set of transformations will yield an RDD of the form: [(1.0, u'Autopsy (Macchie Solari) (1975)', 1), (1.0, u'Better Living (1998)', 1), (1.0, u'Big Squeeze, The (1996)', 3)]
. You will need to do two Spark transformations to complete this step: first use the moviesRDD
with movieIDsWithAvgRatingsRDD
to create a new RDD with Movie names matched to Movie IDs, then convert that RDD into the form of (average rating, movie name, number of ratings). These transformations will yield an RDD that looks like: [(3.6818181818181817, u'Happiest Millionaire, The (1967)', 22), (3.0468227424749164, u'Grumpier Old Men (1995)', 299), (2.882978723404255, u'Hocus Pocus (1993)', 94)]
¶# TODO: Replace <FILL IN> with appropriate code
# From ratingsRDD with tuples of (UserID, MovieID, Rating) create an RDD with tuples of
# the (MovieID, iterable of Ratings for that MovieID)
movieIDsWithRatingsRDD = (ratingsRDD
.map(lambda (user, movie, rating): (movie, rating))
.groupByKey())
print 'movieIDsWithRatingsRDD: %s\n' % movieIDsWithRatingsRDD.take(3)
# Using `movieIDsWithRatingsRDD`, compute the number of ratings and average rating for each movie to
# yield tuples of the form (MovieID, (number of ratings, average rating))
movieIDsWithAvgRatingsRDD = movieIDsWithRatingsRDD.map(lambda rec: getCountsAndAverages(rec))
print 'movieIDsWithAvgRatingsRDD: %s\n' % movieIDsWithAvgRatingsRDD.take(3)
# To `movieIDsWithAvgRatingsRDD`, apply RDD transformations that use `moviesRDD` to get the movie
# names for `movieIDsWithAvgRatingsRDD`, yielding tuples of the form
# (average rating, movie name, number of ratings)
movieNameWithAvgRatingsRDD = (moviesRDD
.join(movieIDsWithAvgRatingsRDD)
.map(lambda (movie_id, (movie_name, (num, avg))): (avg, movie_name, num)))
print 'movieNameWithAvgRatingsRDD: %s\n' % movieNameWithAvgRatingsRDD.take(3)
movieIDsWithRatingsRDD: [(2, <pyspark.resultiterable.ResultIterable object at 0xb0f773ec>), (4, <pyspark.resultiterable.ResultIterable object at 0xb0f7748c>), (6, <pyspark.resultiterable.ResultIterable object at 0xb0f774cc>)] movieIDsWithAvgRatingsRDD: [(2, (332, 3.174698795180723)), (4, (71, 2.676056338028169)), (6, (442, 3.7918552036199094))] movieNameWithAvgRatingsRDD: [(3.6818181818181817, u'Happiest Millionaire, The (1967)', 22), (3.0468227424749164, u'Grumpier Old Men (1995)', 299), (2.882978723404255, u'Hocus Pocus (1993)', 94)]
# TEST Movies with Highest Average Ratings (1b)
Test.assertEquals(movieIDsWithRatingsRDD.count(), 3615,
'incorrect movieIDsWithRatingsRDD.count() (expected 3615)')
movieIDsWithRatingsTakeOrdered = movieIDsWithRatingsRDD.takeOrdered(3)
Test.assertTrue(movieIDsWithRatingsTakeOrdered[0][0] == 1 and
len(list(movieIDsWithRatingsTakeOrdered[0][1])) == 993,
'incorrect count of ratings for movieIDsWithRatingsTakeOrdered[0] (expected 993)')
Test.assertTrue(movieIDsWithRatingsTakeOrdered[1][0] == 2 and
len(list(movieIDsWithRatingsTakeOrdered[1][1])) == 332,
'incorrect count of ratings for movieIDsWithRatingsTakeOrdered[1] (expected 332)')
Test.assertTrue(movieIDsWithRatingsTakeOrdered[2][0] == 3 and
len(list(movieIDsWithRatingsTakeOrdered[2][1])) == 299,
'incorrect count of ratings for movieIDsWithRatingsTakeOrdered[2] (expected 299)')
Test.assertEquals(movieIDsWithAvgRatingsRDD.count(), 3615,
'incorrect movieIDsWithAvgRatingsRDD.count() (expected 3615)')
Test.assertEquals(movieIDsWithAvgRatingsRDD.takeOrdered(3),
[(1, (993, 4.145015105740181)), (2, (332, 3.174698795180723)),
(3, (299, 3.0468227424749164))],
'incorrect movieIDsWithAvgRatingsRDD.takeOrdered(3)')
Test.assertEquals(movieNameWithAvgRatingsRDD.count(), 3615,
'incorrect movieNameWithAvgRatingsRDD.count() (expected 3615)')
Test.assertEquals(movieNameWithAvgRatingsRDD.takeOrdered(3),
[(1.0, u'Autopsy (Macchie Solari) (1975)', 1), (1.0, u'Better Living (1998)', 1),
(1.0, u'Big Squeeze, The (1996)', 3)],
'incorrect movieNameWithAvgRatingsRDD.takeOrdered(3)')
1 test passed. 1 test passed. 1 test passed. 1 test passed. 1 test passed. 1 test passed. 1 test passed. 1 test passed.
movieNameWithAvgRatingsRDD
to limit the results to movies with ratings from more than 500 people. We then use the sortFunction()
helper function to sort by the average rating to get the movies in order of their rating (highest rating first). You will end up with an RDD of the form: [(4.5349264705882355, u'Shawshank Redemption, The (1994)', 1088), (4.515798462852263, u"Schindler's List (1993)", 1171), (4.512893982808023, u'Godfather, The (1972)', 1047)]
¶# TODO: Replace <FILL IN> with appropriate code
# Apply an RDD transformation to `movieNameWithAvgRatingsRDD` to limit the results to movies with
# ratings from more than 500 people. We then use the `sortFunction()` helper function to sort by the
# average rating to get the movies in order of their rating (highest rating first)
movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD
.filter(lambda (avg_rating, movie_name, num_ratings): num_ratings > 500)
.sortBy(sortFunction, False))
print 'Movies with highest ratings: %s' % movieLimitedAndSortedByRatingRDD.take(20)
Movies with highest ratings: [(4.5349264705882355, u'Shawshank Redemption, The (1994)', 1088), (4.515798462852263, u"Schindler's List (1993)", 1171), (4.512893982808023, u'Godfather, The (1972)', 1047), (4.510460251046025, u'Raiders of the Lost Ark (1981)', 1195), (4.505415162454874, u'Usual Suspects, The (1995)', 831), (4.457256461232604, u'Rear Window (1954)', 503), (4.45468509984639, u'Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)', 651), (4.43953006219765, u'Star Wars: Episode IV - A New Hope (1977)', 1447), (4.4, u'Sixth Sense, The (1999)', 1110), (4.394285714285714, u'North by Northwest (1959)', 700), (4.379506641366224, u'Citizen Kane (1941)', 527), (4.375, u'Casablanca (1942)', 776), (4.363975155279503, u'Godfather: Part II, The (1974)', 805), (4.358816276202219, u"One Flew Over the Cuckoo's Nest (1975)", 811), (4.358173076923077, u'Silence of the Lambs, The (1991)', 1248), (4.335826477187734, u'Saving Private Ryan (1998)', 1337), (4.326241134751773, u'Chinatown (1974)', 564), (4.325383304940375, u'Life Is Beautiful (La Vita \ufffd bella) (1997)', 587), (4.324110671936759, u'Monty Python and the Holy Grail (1974)', 759), (4.3096, u'Matrix, The (1999)', 1250)]
# TEST Movies with Highest Average Ratings and more than 500 Reviews (1c)
Test.assertEquals(movieLimitedAndSortedByRatingRDD.count(), 194,
'incorrect movieLimitedAndSortedByRatingRDD.count()')
Test.assertEquals(movieLimitedAndSortedByRatingRDD.take(20),
[(4.5349264705882355, u'Shawshank Redemption, The (1994)', 1088),
(4.515798462852263, u"Schindler's List (1993)", 1171),
(4.512893982808023, u'Godfather, The (1972)', 1047),
(4.510460251046025, u'Raiders of the Lost Ark (1981)', 1195),
(4.505415162454874, u'Usual Suspects, The (1995)', 831),
(4.457256461232604, u'Rear Window (1954)', 503),
(4.45468509984639, u'Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)', 651),
(4.43953006219765, u'Star Wars: Episode IV - A New Hope (1977)', 1447),
(4.4, u'Sixth Sense, The (1999)', 1110), (4.394285714285714, u'North by Northwest (1959)', 700),
(4.379506641366224, u'Citizen Kane (1941)', 527), (4.375, u'Casablanca (1942)', 776),
(4.363975155279503, u'Godfather: Part II, The (1974)', 805),
(4.358816276202219, u"One Flew Over the Cuckoo's Nest (1975)", 811),
(4.358173076923077, u'Silence of the Lambs, The (1991)', 1248),
(4.335826477187734, u'Saving Private Ryan (1998)', 1337),
(4.326241134751773, u'Chinatown (1974)', 564),
(4.325383304940375, u'Life Is Beautiful (La Vita \ufffd bella) (1997)', 587),
(4.324110671936759, u'Monty Python and the Holy Grail (1974)', 759),
(4.3096, u'Matrix, The (1999)', 1250)], 'incorrect sortedByRatingRDD.take(20)')
1 test passed. 1 test passed.
[mllib]: https://spark.apache.org/mllib/ [collab]: https://en.wikipedia.org/?title=Collaborative_filtering [collab2]: http://recommender-systems.org/collaborative-filtering/
ratingsRDD
dataset into three pieces:¶randomSplit()
takes a set of splits and and seed and returns multiple RDDs.¶trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=0L)
print 'Training: %s, validation: %s, test: %s\n' % (trainingRDD.count(),
validationRDD.count(),
testRDD.count())
print trainingRDD.take(3)
print validationRDD.take(3)
print testRDD.take(3)
assert trainingRDD.count() == 292716
assert validationRDD.count() == 96902
assert testRDD.count() == 98032
assert trainingRDD.filter(lambda t: t == (1, 914, 3.0)).count() == 1
assert trainingRDD.filter(lambda t: t == (1, 2355, 5.0)).count() == 1
assert trainingRDD.filter(lambda t: t == (1, 595, 5.0)).count() == 1
assert validationRDD.filter(lambda t: t == (1, 1287, 5.0)).count() == 1
assert validationRDD.filter(lambda t: t == (1, 594, 4.0)).count() == 1
assert validationRDD.filter(lambda t: t == (1, 1270, 5.0)).count() == 1
assert testRDD.filter(lambda t: t == (1, 1193, 5.0)).count() == 1
assert testRDD.filter(lambda t: t == (1, 2398, 4.0)).count() == 1
assert testRDD.filter(lambda t: t == (1, 1035, 5.0)).count() == 1
Training: 292716, validation: 96902, test: 98032 [(1, 914, 3.0), (1, 2355, 5.0), (1, 595, 5.0)] [(1, 1287, 5.0), (1, 594, 4.0), (1, 1270, 5.0)] [(1, 1193, 5.0), (1, 2398, 4.0), (1, 1035, 5.0)]
randomSplit()
transformation.¶(actual rating - predicted rating)
for all users and movies for which we have the actual rating. Versions of Spark MLlib beginning with Spark 1.4 include a RegressionMetrics modiule that can be used to compute the RMSE. However, since we are using Spark 1.3.1, we will write our own function.¶predictedRDD
and actualRDD
RDDs. Both RDDs consist of tuples of the form (UserID, MovieID, Rating)¶predictedRDD
into the tuples of the form ((UserID, MovieID), Rating). For example, tuples like [((1, 1), 5), ((1, 2), 3), ((1, 3), 4), ((2, 1), 3), ((2, 2), 2), ((2, 3), 4)]
. You can perform this step with a single Spark transformation.¶actualRDD
into the tuples of the form ((UserID, MovieID), Rating). For example, tuples like [((1, 2), 3), ((1, 3), 5), ((2, 1), 5), ((2, 2), 1)]
. You can perform this step with a single Spark transformation.¶collect()
to perform this step. Note that not every (UserID, MovieID) pair will appear in both RDDs - if a pair does not appear in both RDDs, then it does not contribute to the RMSE. You will end up with an RDD with entries of the form $ (x_i - y_i)^2$ You might want to check out Python's math module to see how to compute these values¶collect()
), compute the total squared error: $ SE = \sum_{i = 1}^{n} (x_i - y_i)^2 $¶collect()
), to count the number of pairs for which you computed the total squared error¶collect()
on either RDD.¶# TODO: Replace <FILL IN> with appropriate code
import math
def computeError(predictedRDD, actualRDD):
""" Compute the root mean squared error between predicted and actual
Args:
predictedRDD: predicted ratings for each movie and each user where each entry is in the form
(UserID, MovieID, Rating)
actualRDD: actual ratings where each entry is in the form (UserID, MovieID, Rating)
Returns:
RSME (float): computed RSME value
"""
# Transform predictedRDD into the tuples of the form ((UserID, MovieID), Rating)
predictedReformattedRDD = predictedRDD.map(lambda (user, movie, rating): ((user, movie), rating))
# Transform actualRDD into the tuples of the form ((UserID, MovieID), Rating)
actualReformattedRDD = actualRDD.map(lambda (user, movie, rating): ((user, movie), rating))
# Compute the squared error for each matching entry (i.e., the same (User ID, Movie ID) in each
# RDD) in the reformatted RDDs using RDD transformtions - do not use collect()
squaredErrorsRDD = (predictedReformattedRDD
.join(actualReformattedRDD)
.map(lambda ((user, movie), (pred, actual)): (pred - actual) ** 2))
# Compute the total squared error - do not use collect()
totalError = squaredErrorsRDD.reduce(lambda a, b: a + b)
# Count the number of entries for which you computed the total squared error
numRatings = squaredErrorsRDD.count()
# Using the total squared error and the number of entries, compute the RSME
return math.sqrt(float(totalError) / numRatings)
# sc.parallelize turns a Python list into a Spark RDD.
testPredicted = sc.parallelize([
(1, 1, 5),
(1, 2, 3),
(1, 3, 4),
(2, 1, 3),
(2, 2, 2),
(2, 3, 4)])
testActual = sc.parallelize([
(1, 2, 3),
(1, 3, 5),
(2, 1, 5),
(2, 2, 1)])
testPredicted2 = sc.parallelize([
(2, 2, 5),
(1, 2, 5)])
testError = computeError(testPredicted, testActual)
print 'Error for test dataset (should be 1.22474487139): %s' % testError
testError2 = computeError(testPredicted2, testActual)
print 'Error for test dataset2 (should be 3.16227766017): %s' % testError2
testError3 = computeError(testActual, testActual)
print 'Error for testActual dataset (should be 0.0): %s' % testError3
Error for test dataset (should be 1.22474487139): 1.22474487139 Error for test dataset2 (should be 3.16227766017): 3.16227766017 Error for testActual dataset (should be 0.0): 0.0
# TEST Root Mean Square Error (2b)
Test.assertTrue(abs(testError - 1.22474487139) < 0.00000001,
'incorrect testError (expected 1.22474487139)')
Test.assertTrue(abs(testError2 - 3.16227766017) < 0.00000001,
'incorrect testError2 result (expected 3.16227766017)')
Test.assertTrue(abs(testError3 - 0.0) < 0.00000001,
'incorrect testActual result (expected 0.0)')
1 test passed. 1 test passed. 1 test passed.
ALS.train()
is the rank, which is the number of rows in the Users matrix (green in the diagram above) or the number of columns in the Movies matrix (blue in the diagram above). (In general, a lower rank will mean higher error on the training dataset, but a high rank may lead to overfitting.) We will train models with ranks of 4, 8, and 12 using the trainingRDD
dataset.¶ALS.train(trainingRDD, rank, seed=seed, iterations=iterations, lambda_=regularizationParameter)
with three parameters: an RDD consisting of tuples of the form (UserID, MovieID, rating) used to train the model, an integer rank (4, 8, or 12), a number of iterations to execute (we will use 5 for the iterations
parameter), and a regularization coefficient (we will use 0.1 for the regularizationParameter
).¶validationForPredictRDD
, consisting of (UserID, MovieID) pairs that you extract from validationRDD
. You will end up with an RDD of the form: [(1, 1287), (1, 594), (1, 1270)]
¶validationForPredictRDD
, we can predict rating values by calling model.predictAll() with the validationForPredictRDD
dataset, where model
is the model we generated with ALS.train(). predictAll
accepts an RDD with each entry in the format (userID, movieID) and outputs an RDD with each entry in the format (userID, movieID, rating).¶computeError()
function you wrote in part (2b) to compute the error between the predicted ratings and the actual ratings in validationRDD
.¶validationRDD
dataset?¶computeError()
function, since, unlike the Spark ALS implementation (and the Spark 1.4 RegressionMetrics module), this does not use a fast linear algebra library and needs to run some Python code for all 100k entries.¶# TODO: Replace <FILL IN> with appropriate code
from pyspark.mllib.recommendation import ALS
validationForPredictRDD = validationRDD.map(lambda (user, movie, rating): (user, movie))
seed = 5L
iterations = 5
regularizationParameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.03
minError = float('inf')
bestRank = -1
bestIteration = -1
for rank in ranks:
model = ALS.train(trainingRDD, rank, seed=seed, iterations=iterations,
lambda_=regularizationParameter)
predictedRatingsRDD = model.predictAll(validationForPredictRDD)
error = computeError(predictedRatingsRDD, validationRDD)
errors[err] = error
err += 1
print 'For rank %s the RMSE is %s' % (rank, error)
if error < minError:
minError = error
bestRank = rank
print 'The best model was trained with rank %s' % bestRank
For rank 4 the RMSE is 0.892734779484 For rank 8 the RMSE is 0.890121292255 For rank 12 the RMSE is 0.890216118367 The best model was trained with rank 8
# TEST Using ALS.train (2c)
Test.assertEquals(trainingRDD.getNumPartitions(), 2,
'incorrect number of partitions for trainingRDD (expected 2)')
Test.assertEquals(validationForPredictRDD.count(), 96902,
'incorrect size for validationForPredictRDD (expected 96902)')
Test.assertEquals(validationForPredictRDD.filter(lambda t: t == (1, 1907)).count(), 1,
'incorrect content for validationForPredictRDD')
Test.assertTrue(abs(errors[0] - 0.883710109497) < tolerance, 'incorrect errors[0]')
Test.assertTrue(abs(errors[1] - 0.878486305621) < tolerance, 'incorrect errors[1]')
Test.assertTrue(abs(errors[2] - 0.876832795659) < tolerance, 'incorrect errors[2]')
1 test passed. 1 test passed. 1 test passed. 1 test passed. 1 test passed. 1 test passed.
trainingRDD
and validationRDD
datasets to select the best model. Since we used these two datasets to determine what model is best, we cannot use them to test how good the model is - otherwise we would be very vulnerable to overfitting. To decide how good our model is, we need to use the testRDD
dataset. We will use the bestRank
you determined in part (2c) to create a model for predicting the ratings for the test dataset and then we will compute the RMSE.¶trainingRDD
, bestRank
from part (2c), and the parameters you used in in part (2c): seed=seed
, iterations=iterations
, and lambda_=regularizationParameter
- make sure you include all of the parameters.¶testForPredictingRDD
, consisting of (UserID, MovieID) pairs that you extract from testRDD
. You will end up with an RDD of the form: [(1, 1287), (1, 594), (1, 1270)]
¶testRDD
and your computeError
function to compute the RMSE between testRDD
and the predictedTestRDD
from the model.¶computeError()
function you wrote in part (2b) to compute the error between the predicted ratings and the actual ratings in testRDD
.¶# TODO: Replace <FILL IN> with appropriate code
myModel = ALS.train(trainingRDD, bestRank, seed=seed, iterations=iterations, lambda_=regularizationParameter)
testForPredictingRDD = testRDD.map(lambda (user, movie, rating): (user, movie))
predictedTestRDD = myModel.predictAll(testForPredictingRDD)
testRMSE = computeError(testRDD, predictedTestRDD)
print 'The model had a RMSE on the test set of %s' % testRMSE
The model had a RMSE on the test set of 0.891048561304
# TEST Testing Your Model (2d)
Test.assertTrue(abs(testRMSE - 0.87809838344) < tolerance, 'incorrect testRMSE')
1 test passed.
trainingRDD
to compute the average rating across all movies in that training dataset.¶testRDD
to create an RDD with entries of the form (userID, movieID, average rating).¶computeError
function to compute the RMSE between the testRDD
validation RDD that you just created and the testForAvgRDD
.¶# TODO: Replace <FILL IN> with appropriate code
trainingAvgRating = trainingRDD.map(lambda (user, movie, rating): rating).mean()
print 'The average rating for movies in the training set is %s' % trainingAvgRating
testForAvgRDD = testRDD.map(lambda (user, movie, rating): (user, movie, trainingAvgRating))
testAvgRMSE = computeError(testRDD, testForAvgRDD)
print 'The RMSE on the average set is %s' % testAvgRMSE
The average rating for movies in the training set is 3.57409571052 The RMSE on the average set is 1.12036693569
# TEST Comparing Your Model (2e)
Test.assertTrue(abs(trainingAvgRating - 3.57409571052) < 0.000001,
'incorrect trainingAvgRating (expected 3.57409571052)')
Test.assertTrue(abs(testAvgRMSE - 1.12036693569) < 0.000001,
'incorrect testAvgRMSE (expected 1.12036693569)')
1 test passed. 1 test passed.
print 'Most rated movies:'
print '(average rating, movie name, number of reviews)'
for ratingsTuple in movieLimitedAndSortedByRatingRDD.take(50):
print ratingsTuple
Most rated movies: (average rating, movie name, number of reviews) (4.5349264705882355, u'Shawshank Redemption, The (1994)', 1088) (4.515798462852263, u"Schindler's List (1993)", 1171) (4.512893982808023, u'Godfather, The (1972)', 1047) (4.510460251046025, u'Raiders of the Lost Ark (1981)', 1195) (4.505415162454874, u'Usual Suspects, The (1995)', 831) (4.457256461232604, u'Rear Window (1954)', 503) (4.45468509984639, u'Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)', 651) (4.43953006219765, u'Star Wars: Episode IV - A New Hope (1977)', 1447) (4.4, u'Sixth Sense, The (1999)', 1110) (4.394285714285714, u'North by Northwest (1959)', 700) (4.379506641366224, u'Citizen Kane (1941)', 527) (4.375, u'Casablanca (1942)', 776) (4.363975155279503, u'Godfather: Part II, The (1974)', 805) (4.358816276202219, u"One Flew Over the Cuckoo's Nest (1975)", 811) (4.358173076923077, u'Silence of the Lambs, The (1991)', 1248) (4.335826477187734, u'Saving Private Ryan (1998)', 1337) (4.326241134751773, u'Chinatown (1974)', 564) (4.325383304940375, u'Life Is Beautiful (La Vita \ufffd bella) (1997)', 587) (4.324110671936759, u'Monty Python and the Holy Grail (1974)', 759) (4.3096, u'Matrix, The (1999)', 1250) (4.309457579972183, u'Star Wars: Episode V - The Empire Strikes Back (1980)', 1438) (4.30379746835443, u'Young Frankenstein (1974)', 553) (4.301346801346801, u'Psycho (1960)', 594) (4.296438883541867, u'Pulp Fiction (1994)', 1039) (4.286535303776683, u'Fargo (1996)', 1218) (4.282367447595561, u'GoodFellas (1990)', 811) (4.27943661971831, u'American Beauty (1999)', 1775) (4.268053855569155, u'Wizard of Oz, The (1939)', 817) (4.267774699907664, u'Princess Bride, The (1987)', 1083) (4.253333333333333, u'Graduate, The (1967)', 600) (4.236263736263736, u'Run Lola Run (Lola rennt) (1998)', 546) (4.233807266982622, u'Amadeus (1984)', 633) (4.232558139534884, u'Toy Story 2 (1999)', 860) (4.232558139534884, u'This Is Spinal Tap (1984)', 516) (4.228494623655914, u'Almost Famous (2000)', 744) (4.2250755287009065, u'Christmas Story, A (1983)', 662) (4.216757741347905, u'Glory (1989)', 549) (4.213358070500927, u'Apocalypse Now (1979)', 539) (4.20992028343667, u'L.A. Confidential (1997)', 1129) (4.204733727810651, u'Blade Runner (1982)', 845) (4.1886120996441285, u'Sling Blade (1996)', 562) (4.184615384615385, u'Braveheart (1995)', 1300) (4.184168012924071, u'Butch Cassidy and the Sundance Kid (1969)', 619) (4.182509505703422, u'Good Will Hunting (1997)', 789) (4.166969147005445, u'Taxi Driver (1976)', 551) (4.162767039674466, u'Terminator, The (1984)', 983) (4.157545605306799, u'Reservoir Dogs (1992)', 603) (4.153333333333333, u'Jaws (1975)', 750) (4.149840595111583, u'Alien (1979)', 941) (4.145015105740181, u'Toy Story (1995)', 993)
myUserID
to 0 for you. Next, create a new RDD myRatingsRDD
with your ratings for at least 10 movie ratings. Each entry should be formatted as (myUserID, movieID, rating)
(i.e., each entry should be formatted in the same way as trainingRDD
). As in the original dataset, ratings should be between 1 and 5 (inclusive). If you have not seen at least 10 of these movies, you can increase the parameter passed to take()
in the above cell until there are 10 movies that you have seen (or you can also guess what your rating would be for movies you have not seen).¶# TODO: Replace <FILL IN> with appropriate code
myUserID = 0
# Note that the movie IDs are the *last* number on each line. A common error was to use the number of ratings as the movie ID
myRatedMovies = [
(myUserID, 1088, 5),
(myUserID, 1195, 5),
(myUserID, 1110, 4),
(myUserID, 1250, 5),
(myUserID, 1775, 4),
(myUserID, 789, 5),
(myUserID, 1039, 4),
(myUserID, 811, 5),
(myUserID, 1447, 4),
(myUserID, 1438, 4)
]
myRatingsRDD = sc.parallelize(myRatedMovies)
print 'My movie ratings: %s' % myRatingsRDD.take(10)
My movie ratings: [(0, 1088, 5), (0, 1195, 5), (0, 1110, 4), (0, 1250, 5), (0, 1775, 4), (0, 789, 5), (0, 1039, 4), (0, 811, 5), (0, 1447, 4), (0, 1438, 4)]
training
dataset so that the model you train will incorporate your preferences. Spark's union() transformation combines two RDDs; use union()
to create a new training dataset that includes your ratings and the data in the original training dataset.¶# TODO: Replace <FILL IN> with appropriate code
trainingWithMyRatingsRDD = trainingRDD.union(myRatingsRDD)
print ('The training dataset now has %s more entries than the original training dataset' %
(trainingWithMyRatingsRDD.count() - trainingRDD.count()))
assert (trainingWithMyRatingsRDD.count() - trainingRDD.count()) == myRatingsRDD.count()
The training dataset now has 10 more entries than the original training dataset
# TODO: Replace <FILL IN> with appropriate code
myRatingsModel = ALS.train(trainingWithMyRatingsRDD, bestRank, seed=seed, iterations=iterations,
lambda_=regularizationParameter)
testForPredictingRDD
, consisting of (UserID, MovieID) pairs that you extracted from testRDD
. The RDD has the form: [(1, 1287), (1, 594), (1, 1270)]
¶myRatingsModel.predictAll()
to predict rating values for the testForPredictingRDD
test dataset, set this as predictedTestMyRatingsRDD
¶testRDD
and your computeError
function to compute the RMSE between testRDD
and the predictedTestMyRatingsRDD
from the model.¶# TODO: Replace <FILL IN> with appropriate code
predictedTestMyRatingsRDD = myRatingsModel.predictAll(testForPredictingRDD)
testRMSEMyRatings = computeError(testRDD, predictedTestMyRatingsRDD)
print 'The model had a RMSE on the test set of %s' % testRMSEMyRatings
The model had a RMSE on the test set of 0.891985238664
predictAll
method to compute the error of the model. Here, use the predictAll
to predict what ratings you would give to the movies that you did not already provide ratings for.¶myRatedMovies
to transform the moviesRDD
into an RDD with entries that are pairs of the form (myUserID, Movie ID) and that does not contain any movies that you have rated. This transformation will yield an RDD of the form: [(0, 1), (0, 2), (0, 3), (0, 4)]
. Note that you can do this step with one RDD transformation.¶myUnratedMoviesRDD
, with myRatingsModel.predictAll() to predict your ratings for the movies.¶# TODO: Replace <FILL IN> with appropriate code
# Use the Python list myRatedMovies to transform the moviesRDD into an RDD with entries that are pairs of the form
# (myUserID, Movie ID) and that does not contain any movies that you have rated
myUnratedMoviesRDD = (moviesRDD
.map(lambda (movie, title): (myUserID, movie))
.filter(lambda (user, movie): (user, movie) not in [(u, m) for (u, m, r) in myRatedMovies]))
# Use the input RDD, myUnratedMoviesRDD, with myRatingsModel.predictAll() to predict your ratings for the movies
predictedRatingsRDD = myRatingsModel.predictAll(myUnratedMoviesRDD)
movieIDsWithAvgRatingsRDD
from Part (1b), which has the form (MovieID, (number of ratings, average rating)), into an RDD of the form (MovieID, number of ratings): [(2, 332), (4, 71), (6, 442)]
¶predictedRatingsRDD
into an RDD with entries that are pairs of the form (Movie ID, Predicted Rating): [(3456, -0.5501005376936687), (1080, 1.5885892024487962), (320, -3.7952255522487865)]
¶predictedRDD
and movieCountsRDD
to yield an RDD with tuples of the form (Movie ID, (Predicted Rating, number of ratings)): [(2050, (0.6694097486155939, 44)), (10, (5.29762541533513, 418)), (2060, (0.5055259373841172, 97))]
¶predictedWithCountsRDD
and moviesRDD
to yield an RDD with tuples of the form (Predicted Rating, Movie Name, number of ratings), for movies with more than 75 ratings. For example: [(7.983121900375243, u'Under Siege (1992)'), (7.9769201864261285, u'Fifth Element, The (1997)')]
¶# TODO: Replace <FILL IN> with appropriate code
# Transform movieIDsWithAvgRatingsRDD from part (1b), which has the form (MovieID, (number of ratings, average rating)),
# into and RDD of the form (MovieID, number of ratings)
movieCountsRDD = movieIDsWithAvgRatingsRDD.map(lambda (movie, (num, avg)): (movie, num))
# Transform predictedRatingsRDD into an RDD with entries that are pairs of the form (Movie ID, Predicted Rating)
predictedRDD = predictedRatingsRDD.map(lambda (user, movie, rating): (movie, rating))
# Use RDD transformations with predictedRDD and movieCountsRDD to yield an RDD with tuples of the form
# (Movie ID, (Predicted Rating, number of ratings))
predictedWithCountsRDD = predictedRDD.join(movieCountsRDD)
# Use RDD transformations with PredictedWithCountsRDD and moviesRDD to yield an RDD with tuples of the form
# (Predicted Rating, Movie Name, number of ratings), for movies with more than 75 ratings
ratingsWithNamesRDD = (predictedWithCountsRDD
.join(moviesRDD)
.map(lambda (movie, ((rating, num), name)): (rating, name, num))
.filter(lambda (rating, name, num): num > 75))
print(ratingsWithNamesRDD)
predictedHighestRatedMovies = ratingsWithNamesRDD.takeOrdered(20, key=lambda x: -x[0])
print ('My highest rated movies as predicted (for movies with more than 75 reviews):\n%s' %
'\n'.join(map(str, predictedHighestRatedMovies)))
PythonRDD[876] at RDD at PythonRDD.scala:43 My highest rated movies as predicted (for movies with more than 75 reviews): (5.579984994768239, u'White Christmas (1954)', 145) (5.523804848351718, u'Sound of Music, The (1965)', 541) (5.5169414042800895, u'Inherit the Wind (1960)', 133) (5.3733621714595, u"It's a Wonderful Life (1946)", 343) (5.263647320798345, u'Beyond the Mat (2000)', 99) (5.215331701703268, u"Schindler's List (1993)", 1171) (5.1402003964450085, u'12 Angry Men (1957)', 314) (5.139648908704283, u'Life Is Beautiful (La Vita \ufffd bella) (1997)', 587) (5.133829841121109, u'My Fair Lady (1964)', 314) (5.127143456303852, u'Remember the Titans (2000)', 344) (5.042261702583474, u'Forrest Gump (1994)', 1039) (5.028569264325199, u'Roman Holiday (1953)', 206) (5.013270518971352, u'Old Yeller (1957)', 146) (5.01153872185486, u'Shawshank Redemption, The (1994)', 1088) (5.004707567221567, u'Three Days of the Condor (1975)', 146) (5.004661352433759, u'Wizard of Oz, The (1939)', 817) (4.996589084620961, u'Quiet Man, The (1952)', 119) (4.9913782769131485, u'Saving Private Ryan (1998)', 1337) (4.957173055430395, u'Sixth Sense, The (1999)', 1110) (4.953793794678976, u'Lady Vanishes, The (1938)', 101)