import sys import os from test_helper import Test baseDir = os.path.join('data') inputPath = os.path.join('cs100', 'lab4', 'small') ratingsFilename = os.path.join(baseDir, inputPath, 'ratings.dat.gz') moviesFilename = os.path.join(baseDir, inputPath, 'movies.dat') numPartitions = 2 rawRatings = sc.textFile(ratingsFilename).repartition(numPartitions) rawMovies = sc.textFile(moviesFilename) def get_ratings_tuple(entry): """ Parse a line in the ratings dataset Args: entry (str): a line in the ratings dataset in the form of UserID::MovieID::Rating::Timestamp Returns: tuple: (UserID, MovieID, Rating) """ items = entry.split('::') return int(items[0]), int(items[1]), float(items[2]) def get_movie_tuple(entry): """ Parse a line in the movies dataset Args: entry (str): a line in the movies dataset in the form of MovieID::Title::Genres Returns: tuple: (MovieID, Title) """ items = entry.split('::') return int(items[0]), items[1] ratingsRDD = rawRatings.map(get_ratings_tuple).cache() moviesRDD = rawMovies.map(get_movie_tuple).cache() ratingsCount = ratingsRDD.count() moviesCount = moviesRDD.count() print 'There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount) print 'Ratings: %s' % ratingsRDD.take(3) print 'Movies: %s' % moviesRDD.take(3) assert ratingsCount == 487650 assert moviesCount == 3883 assert moviesRDD.filter(lambda (id, title): title == 'Toy Story (1995)').count() == 1 assert (ratingsRDD.takeOrdered(1, key=lambda (user, movie, rating): movie) == [(1, 1, 5.0)]) tmp1 = [(1, u'alpha'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'delta')] tmp2 = [(1, u'delta'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'alpha')] oneRDD = sc.parallelize(tmp1) twoRDD = sc.parallelize(tmp2) oneSorted = oneRDD.sortByKey(True).collect() twoSorted = twoRDD.sortByKey(True).collect() print oneSorted print twoSorted assert set(oneSorted) == set(twoSorted) # Note that both lists have the same elements assert twoSorted[0][0] < twoSorted.pop()[0] # Check that it is sorted by the keys assert oneSorted[0:2] != twoSorted[0:2] # Note that the subset consisting of the first two elements does not match def sortFunction(tuple): """ Construct the sort string (does not perform actual sorting) Args: tuple: (rating, MovieName) Returns: sortString: the value to sort with, 'rating MovieName' """ key = unicode('%.3f' % tuple[0]) value = tuple[1] return (key + ' ' + value) print oneRDD.sortBy(sortFunction, True).collect() print twoRDD.sortBy(sortFunction, True).collect() oneSorted1 = oneRDD.takeOrdered(oneRDD.count(),key=sortFunction) twoSorted1 = twoRDD.takeOrdered(twoRDD.count(),key=sortFunction) print 'one is %s' % oneSorted1 print 'two is %s' % twoSorted1 assert oneSorted1 == twoSorted1 # TODO: Replace with appropriate code # First, implement a helper function `getCountsAndAverages` using only Python def getCountsAndAverages(IDandRatingsTuple): """ Calculate average rating Args: IDandRatingsTuple: a single tuple of (MovieID, (Rating1, Rating2, Rating3, ...)) Returns: tuple: a tuple of (MovieID, (number of ratings, averageRating)) """ return # TEST Number of Ratings and Average Ratings for a Movie (1a) Test.assertEquals(getCountsAndAverages((1, (1, 2, 3, 4))), (1, (4, 2.5)), 'incorrect getCountsAndAverages() with integer list') Test.assertEquals(getCountsAndAverages((100, (10.0, 20.0, 30.0))), (100, (3, 20.0)), 'incorrect getCountsAndAverages() with float list') Test.assertEquals(getCountsAndAverages((110, xrange(20))), (110, (20, 9.5)), 'incorrect getCountsAndAverages() with xrange') # TODO: Replace with appropriate code # From ratingsRDD with tuples of (UserID, MovieID, Rating) create an RDD with tuples of # the (MovieID, iterable of Ratings for that MovieID) movieIDsWithRatingsRDD = (ratingsRDD .) print 'movieIDsWithRatingsRDD: %s\n' % movieIDsWithRatingsRDD.take(3) # Using `movieIDsWithRatingsRDD`, compute the number of ratings and average rating for each movie to # yield tuples of the form (MovieID, (number of ratings, average rating)) movieIDsWithAvgRatingsRDD = movieIDsWithRatingsRDD. print 'movieIDsWithAvgRatingsRDD: %s\n' % movieIDsWithAvgRatingsRDD.take(3) # To `movieIDsWithAvgRatingsRDD`, apply RDD transformations that use `moviesRDD` to get the movie # names for `movieIDsWithAvgRatingsRDD`, yielding tuples of the form # (average rating, movie name, number of ratings) movieNameWithAvgRatingsRDD = (moviesRDD .) print 'movieNameWithAvgRatingsRDD: %s\n' % movieNameWithAvgRatingsRDD.take(3) # TEST Movies with Highest Average Ratings (1b) Test.assertEquals(movieIDsWithRatingsRDD.count(), 3615, 'incorrect movieIDsWithRatingsRDD.count() (expected 3615)') movieIDsWithRatingsTakeOrdered = movieIDsWithRatingsRDD.takeOrdered(3) Test.assertTrue(movieIDsWithRatingsTakeOrdered[0][0] == 1 and len(list(movieIDsWithRatingsTakeOrdered[0][1])) == 993, 'incorrect count of ratings for movieIDsWithRatingsTakeOrdered[0] (expected 993)') Test.assertTrue(movieIDsWithRatingsTakeOrdered[1][0] == 2 and len(list(movieIDsWithRatingsTakeOrdered[1][1])) == 332, 'incorrect count of ratings for movieIDsWithRatingsTakeOrdered[1] (expected 332)') Test.assertTrue(movieIDsWithRatingsTakeOrdered[2][0] == 3 and len(list(movieIDsWithRatingsTakeOrdered[2][1])) == 299, 'incorrect count of ratings for movieIDsWithRatingsTakeOrdered[2] (expected 299)') Test.assertEquals(movieIDsWithAvgRatingsRDD.count(), 3615, 'incorrect movieIDsWithAvgRatingsRDD.count() (expected 3615)') Test.assertEquals(movieIDsWithAvgRatingsRDD.takeOrdered(3), [(1, (993, 4.145015105740181)), (2, (332, 3.174698795180723)), (3, (299, 3.0468227424749164))], 'incorrect movieIDsWithAvgRatingsRDD.takeOrdered(3)') Test.assertEquals(movieNameWithAvgRatingsRDD.count(), 3615, 'incorrect movieNameWithAvgRatingsRDD.count() (expected 3615)') Test.assertEquals(movieNameWithAvgRatingsRDD.takeOrdered(3), [(1.0, u'Autopsy (Macchie Solari) (1975)', 1), (1.0, u'Better Living (1998)', 1), (1.0, u'Big Squeeze, The (1996)', 3)], 'incorrect movieNameWithAvgRatingsRDD.takeOrdered(3)') # TODO: Replace with appropriate code # Apply an RDD transformation to `movieNameWithAvgRatingsRDD` to limit the results to movies with # ratings from more than 500 people. We then use the `sortFunction()` helper function to sort by the # average rating to get the movies in order of their rating (highest rating first) movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD . .sortBy(sortFunction, False)) print 'Movies with highest ratings: %s' % movieLimitedAndSortedByRatingRDD.take(20) # TEST Movies with Highest Average Ratings and more than 500 Reviews (1c) Test.assertEquals(movieLimitedAndSortedByRatingRDD.count(), 194, 'incorrect movieLimitedAndSortedByRatingRDD.count()') Test.assertEquals(movieLimitedAndSortedByRatingRDD.take(20), [(4.5349264705882355, u'Shawshank Redemption, The (1994)', 1088), (4.515798462852263, u"Schindler's List (1993)", 1171), (4.512893982808023, u'Godfather, The (1972)', 1047), (4.510460251046025, u'Raiders of the Lost Ark (1981)', 1195), (4.505415162454874, u'Usual Suspects, The (1995)', 831), (4.457256461232604, u'Rear Window (1954)', 503), (4.45468509984639, u'Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)', 651), (4.43953006219765, u'Star Wars: Episode IV - A New Hope (1977)', 1447), (4.4, u'Sixth Sense, The (1999)', 1110), (4.394285714285714, u'North by Northwest (1959)', 700), (4.379506641366224, u'Citizen Kane (1941)', 527), (4.375, u'Casablanca (1942)', 776), (4.363975155279503, u'Godfather: Part II, The (1974)', 805), (4.358816276202219, u"One Flew Over the Cuckoo's Nest (1975)", 811), (4.358173076923077, u'Silence of the Lambs, The (1991)', 1248), (4.335826477187734, u'Saving Private Ryan (1998)', 1337), (4.326241134751773, u'Chinatown (1974)', 564), (4.325383304940375, u'Life Is Beautiful (La Vita \ufffd bella) (1997)', 587), (4.324110671936759, u'Monty Python and the Holy Grail (1974)', 759), (4.3096, u'Matrix, The (1999)', 1250)], 'incorrect sortedByRatingRDD.take(20)') trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=0L) print 'Training: %s, validation: %s, test: %s\n' % (trainingRDD.count(), validationRDD.count(), testRDD.count()) print trainingRDD.take(3) print validationRDD.take(3) print testRDD.take(3) assert trainingRDD.count() == 292716 assert validationRDD.count() == 96902 assert testRDD.count() == 98032 assert trainingRDD.filter(lambda t: t == (1, 914, 3.0)).count() == 1 assert trainingRDD.filter(lambda t: t == (1, 2355, 5.0)).count() == 1 assert trainingRDD.filter(lambda t: t == (1, 595, 5.0)).count() == 1 assert validationRDD.filter(lambda t: t == (1, 1287, 5.0)).count() == 1 assert validationRDD.filter(lambda t: t == (1, 594, 4.0)).count() == 1 assert validationRDD.filter(lambda t: t == (1, 1270, 5.0)).count() == 1 assert testRDD.filter(lambda t: t == (1, 1193, 5.0)).count() == 1 assert testRDD.filter(lambda t: t == (1, 2398, 4.0)).count() == 1 assert testRDD.filter(lambda t: t == (1, 1035, 5.0)).count() == 1 # TODO: Replace with appropriate code import math def computeError(predictedRDD, actualRDD): """ Compute the root mean squared error between predicted and actual Args: predictedRDD: predicted ratings for each movie and each user where each entry is in the form (UserID, MovieID, Rating) actualRDD: actual ratings where each entry is in the form (UserID, MovieID, Rating) Returns: RSME (float): computed RSME value """ # Transform predictedRDD into the tuples of the form ((UserID, MovieID), Rating) predictedReformattedRDD = predictedRDD. # Transform actualRDD into the tuples of the form ((UserID, MovieID), Rating) actualReformattedRDD = actualRDD. # Compute the squared error for each matching entry (i.e., the same (User ID, Movie ID) in each # RDD) in the reformatted RDDs using RDD transformtions - do not use collect() squaredErrorsRDD = (predictedReformattedRDD .) # Compute the total squared error - do not use collect() totalError = squaredErrorsRDD. # Count the number of entries for which you computed the total squared error numRatings = squaredErrorsRDD. # Using the total squared error and the number of entries, compute the RSME return # sc.parallelize turns a Python list into a Spark RDD. testPredicted = sc.parallelize([ (1, 1, 5), (1, 2, 3), (1, 3, 4), (2, 1, 3), (2, 2, 2), (2, 3, 4)]) testActual = sc.parallelize([ (1, 2, 3), (1, 3, 5), (2, 1, 5), (2, 2, 1)]) testPredicted2 = sc.parallelize([ (2, 2, 5), (1, 2, 5)]) testError = computeError(testPredicted, testActual) print 'Error for test dataset (should be 1.22474487139): %s' % testError testError2 = computeError(testPredicted2, testActual) print 'Error for test dataset2 (should be 3.16227766017): %s' % testError2 testError3 = computeError(testActual, testActual) print 'Error for testActual dataset (should be 0.0): %s' % testError3 # TEST Root Mean Square Error (2b) Test.assertTrue(abs(testError - 1.22474487139) < 0.00000001, 'incorrect testError (expected 1.22474487139)') Test.assertTrue(abs(testError2 - 3.16227766017) < 0.00000001, 'incorrect testError2 result (expected 3.16227766017)') Test.assertTrue(abs(testError3 - 0.0) < 0.00000001, 'incorrect testActual result (expected 0.0)') # TODO: Replace with appropriate code from pyspark.mllib.recommendation import ALS validationForPredictRDD = validationRDD. seed = 5L iterations = 5 regularizationParameter = 0.1 ranks = [4, 8, 12] errors = [0, 0, 0] err = 0 tolerance = 0.03 minError = float('inf') bestRank = -1 bestIteration = -1 for rank in ranks: model = ALS.train(trainingRDD, rank, seed=seed, iterations=iterations, lambda_=regularizationParameter) predictedRatingsRDD = model.predictAll(validationForPredictRDD) error = computeError(predictedRatingsRDD, validationRDD) errors[err] = error err += 1 print 'For rank %s the RMSE is %s' % (rank, error) if error < minError: minError = error bestRank = rank print 'The best model was trained with rank %s' % bestRank # TEST Using ALS.train (2c) Test.assertEquals(trainingRDD.getNumPartitions(), 2, 'incorrect number of partitions for trainingRDD (expected 2)') Test.assertEquals(validationForPredictRDD.count(), 96902, 'incorrect size for validationForPredictRDD (expected 96902)') Test.assertEquals(validationForPredictRDD.filter(lambda t: t == (1, 1907)).count(), 1, 'incorrect content for validationForPredictRDD') Test.assertTrue(abs(errors[0] - 0.883710109497) < tolerance, 'incorrect errors[0]') Test.assertTrue(abs(errors[1] - 0.878486305621) < tolerance, 'incorrect errors[1]') Test.assertTrue(abs(errors[2] - 0.876832795659) < tolerance, 'incorrect errors[2]') # TODO: Replace with appropriate code myModel = testForPredictingRDD = testRDD. predictedTestRDD = myModel. testRMSE = computeError(testRDD, predictedTestRDD) print 'The model had a RMSE on the test set of %s' % testRMSE # TEST Testing Your Model (2d) Test.assertTrue(abs(testRMSE - 0.87809838344) < tolerance, 'incorrect testRMSE') # TODO: Replace with appropriate code trainingAvgRating = trainingRDD. print 'The average rating for movies in the training set is %s' % trainingAvgRating testForAvgRDD = testRDD. testAvgRMSE = computeError(testRDD, testForAvgRDD) print 'The RMSE on the average set is %s' % testAvgRMSE # TEST Comparing Your Model (2e) Test.assertTrue(abs(trainingAvgRating - 3.57409571052) < 0.000001, 'incorrect trainingAvgRating (expected 3.57409571052)') Test.assertTrue(abs(testAvgRMSE - 1.12036693569) < 0.000001, 'incorrect testAvgRMSE (expected 1.12036693569)') print 'Most rated movies:' print '(average rating, movie name, number of reviews)' for ratingsTuple in movieLimitedAndSortedByRatingRDD.take(50): print ratingsTuple # TODO: Replace with appropriate code myUserID = 0 # Note that the movie IDs are the *last* number on each line. A common error was to use the number of ratings as the movie ID. myRatedMovies = [ # The format of each line is (myUserID, movie ID, your rating) # For example, to give the movie "Star Wars: Episode IV - A New Hope (1977)" a five rating, you would add the following line: # (myUserID, 260, 5), ] myRatingsRDD = sc.parallelize(myRatedMovies) print 'My movie ratings: %s' % myRatingsRDD.take(10) # TODO: Replace with appropriate code trainingWithMyRatingsRDD = print ('The training dataset now has %s more entries than the original training dataset' % (trainingWithMyRatingsRDD.count() - trainingRDD.count())) assert (trainingWithMyRatingsRDD.count() - trainingRDD.count()) == myRatingsRDD.count() # TODO: Replace with appropriate code myRatingsModel = ALS.train(trainingWithMyRatingsRDD, bestRank, ) # TODO: Replace with appropriate code predictedTestMyRatingsRDD = myRatingsModel. testRMSEMyRatings = print 'The model had a RMSE on the test set of %s' % testRMSEMyRatings # TODO: Replace with appropriate code # Use the Python list myRatedMovies to transform the moviesRDD into an RDD with entries that are pairs of the form (myUserID, Movie ID) and that does not contain any movies that you have rated. myUnratedMoviesRDD = (moviesRDD .) # Use the input RDD, myUnratedMoviesRDD, with myRatingsModel.predictAll() to predict your ratings for the movies predictedRatingsRDD = myRatingsModel. # TODO: Replace with appropriate code # Transform movieIDsWithAvgRatingsRDD from part (1b), which has the form (MovieID, (number of ratings, average rating)), into and RDD of the form (MovieID, number of ratings) movieCountsRDD = movieIDsWithAvgRatingsRDD. # Transform predictedRatingsRDD into an RDD with entries that are pairs of the form (Movie ID, Predicted Rating) predictedRDD = predictedRatingsRDD. # Use RDD transformations with predictedRDD and movieCountsRDD to yield an RDD with tuples of the form (Movie ID, (Predicted Rating, number of ratings)) predictedWithCountsRDD = (predictedRDD .) # Use RDD transformations with PredictedWithCountsRDD and moviesRDD to yield an RDD with tuples of the form (Predicted Rating, Movie Name, number of ratings), for movies with more than 75 ratings ratingsWithNamesRDD = (predictedWithCountsRDD .) predictedHighestRatedMovies = ratingsWithNamesRDD.takeOrdered(20, key=lambda x: -x[0]) print ('My highest rated movies as predicted (for movies with more than 75 reviews):\n%s' % '\n'.join(map(str, predictedHighestRatedMovies)))