This is a two step process. First we use u.data to determine how often particular movie id is rated. Second we use output of the first step and join it to u.item dataset in order to get movie titles. Our final output will contain following three fields:
Below is some information about data
def dataStrToTuple(inString, delimiter="\t"):
"""
Splits inString using delimiter and returns a tuple of integers.
It assumes that all the input tokens are integer
"""
tokens = [int(y) for y in inString.split(delimiter)]
return tuple(tokens)
# load dataset
data = sc.textFile("data/meetup/movielens/u.data", 5)
# convert input record to tuple
rdd1 = data.map(dataStrToTuple)
# extract movie id as the first element so that we can use it as a key.
# Value can be left null/empty as in the next stage we only need to count number of records per key
rdd2 = rdd1.map(lambda x: (x[1],1))
# print rdd2.take(5)
# type(rdd2)
# group by movie id and count number of elements
cnts = rdd2.countByKey()
# print "".join(["{0},{1}\n".format(k, rdd3[k]) for k in rdd3.keys()[0:5]])
# type(cnts)
# countByKey returns a collection object
for row in sorted(cnts.items(), key=lambda x: x[1], reverse=True)[0:10]:
print row
(50, 583) (258, 509) (100, 508) (181, 507) (294, 485) (286, 481) (288, 478) (1, 452) (300, 431) (121, 429)
countByKey returned a collection object that we sorted in memory. How would you modify the above program so that we use spark's distributed computing engine to extract top 10 most rated movie
rdd4 = rdd2.reduceByKey(lambda x, y: x + y)
print rdd4.takeOrdered(10, key=lambda x: -1 * x[1])
[(50, 583), (258, 509), (100, 508), (181, 507), (294, 485), (286, 481), (288, 478), (1, 452), (300, 431), (121, 429)]
Above takeOrdered uses only value (i.e. number of times a movie was rated) to extract top 10 movies. However if there are lot of movies that were rated by the same number of people then its possible that different analyst will return different list for top 10 movies. Can you modify the above code to sort movies both by value and movie id.
print rdd4.takeOrdered(10, key=lambda x: (-1 * x[1], x[0]))
[(50, 583), (258, 509), (100, 508), (181, 507), (294, 485), (286, 481), (288, 478), (1, 452), (300, 431), (121, 429)]
To make sense of the top 10 most rated movie now lets add movie title. Movie information is stored in u.item file.
def extractIdTitleFromItem(x):
tokens = x.split("|")
return (int(tokens[0]), tokens[1])
items = sc.textFile("data/meetup/movielens/u.item", 5) \
.map(extractIdTitleFromItem)
print items.take(5)
[(1, u'Toy Story (1995)'), (2, u'GoldenEye (1995)'), (3, u'Four Rooms (1995)'), (4, u'Get Shorty (1995)'), (5, u'Copycat (1995)')]
# Join data and items
ranked = rdd2.reduceByKey(lambda x, y: x + y)\
.join(items)\
.takeOrdered(10, key=lambda x: (-1 * x[1][0], x[1][1]))
for row in ranked:
print "{0}. {1} -> {2}".format(row[0], row[1][1], row[1][0])
50. Star Wars (1977) -> 583 258. Contact (1997) -> 509 100. Fargo (1996) -> 508 181. Return of the Jedi (1983) -> 507 294. Liar Liar (1997) -> 485 286. English Patient, The (1996) -> 481 288. Scream (1996) -> 478 1. Toy Story (1995) -> 452 300. Air Force One (1997) -> 431 121. Independence Day (ID4) (1996) -> 429
Currently the code is very frazile and we have to make sure that we are using correct indexes at each position. Let's use SQLContext to define tables and then ue
# Import SQLContext and data types
from pyspark.sql import SQLContext
from pyspark.sql.types import *
# sc is an existing SparkContext.
sqlContext = SQLContext(sc)
# Construct Rating Schema
schema = StructType([
StructField("user", IntegerType(), True),
StructField("movie", IntegerType(), True),
StructField("rating", IntegerType(), True),
StructField("timestamp", IntegerType(), True)
])
# Apply the schema to the RDD.
schemaRating = sqlContext.createDataFrame(rdd1, schema)
# Register the DataFrame as a table.
schemaRating.registerTempTable("rating")
type(schemaRating)
pyspark.sql.dataframe.DataFrame
# Construct Movie Schema
schema = StructType([
StructField('movie', IntegerType(), True)
, StructField('title', StringType(), True)
]
)
# Apply the schema to the RDD.
schemaItem = sqlContext.createDataFrame(items, schema)
# Register the DataFrame as a table.
schemaItem.registerTempTable("item")
# SQL can be run over DataFrames that have been registered as a table.
results = sqlContext.sql("""
SELECT A.movie, B.title, A.cnt
FROM
(
SELECT movie, count(*) as cnt
FROM rating
GROUP BY movie
ORDER BY cnt DESC
LIMIT 10
) A
JOIN item B
ON (A.movie = B.movie)
""")
for row in results.collect():
print row
Row(movie=1, title=u'Toy Story (1995)', cnt=452) Row(movie=50, title=u'Star Wars (1977)', cnt=583) Row(movie=100, title=u'Fargo (1996)', cnt=508) Row(movie=121, title=u'Independence Day (ID4) (1996)', cnt=429) Row(movie=181, title=u'Return of the Jedi (1983)', cnt=507) Row(movie=258, title=u'Contact (1997)', cnt=509) Row(movie=286, title=u'English Patient, The (1996)', cnt=481) Row(movie=288, title=u'Scream (1996)', cnt=478) Row(movie=294, title=u'Liar Liar (1997)', cnt=485) Row(movie=300, title=u'Air Force One (1997)', cnt=431)
# DataFrame Operations: https://spark.apache.org/docs/1.3.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame
from pyspark.sql.functions import desc
rdd2 = schemaRating.groupBy('movie').count().orderBy(desc("count")).limit(10)