Here we download the data, upload it to HDFS and create session objects using Spark.
!wget http://s3-eu-west-1.amazonaws.com/yc-rdata/yoochoose-data.7z
--2015-07-07 15:18:40-- http://s3-eu-west-1.amazonaws.com/yc-rdata/yoochoose-data.7z Resolving s3-eu-west-1.amazonaws.com (s3-eu-west-1.amazonaws.com)... 54.231.136.80 Connecting to s3-eu-west-1.amazonaws.com (s3-eu-west-1.amazonaws.com)|54.231.136.80|:80... connected. HTTP request sent, awaiting response... 200 OK Length: 287211932 (274M) [application/octet-stream] Saving to: `yoochoose-data.7z' 100%[======================================>] 287,211,932 31.0M/s in 14s 2015-07-07 15:18:54 (19.9 MB/s) - `yoochoose-data.7z' saved [287211932/287211932]
!7z x yoochoose-data.7z
7-Zip [64] 9.20 Copyright (c) 1999-2010 Igor Pavlov 2010-11-18 p7zip Version 9.20 (locale=en_US.UTF-8,Utf16=on,HugeFiles=on,8 CPUs) Processing archive: yoochoose-data.7z Extracting yoochoose-buys.dat Extracting yoochoose-clicks.dat Extracting yoochoose-test.dat Extracting dataset-README.txt Everything is Ok Files: 4 Size: 1914111754 Compressed: 287211932
!hdfs dfs -mkdir -p yoochoose/
!hdfs dfs -put yoochoose-*.dat yoochoose/
!rm yoochoose-* dataset-README.txt
import datetime
import operator
def parse_datetime(dt_str):
return datetime.datetime.strptime(dt_str, '%Y-%m-%dT%H:%M:%S.%fZ')
def parse_clicks(line):
parts = line.split(',')
session_id = int(parts[0])
timestamp, item_id, category = parse_datetime(parts[1]), parts[2], parts[3]
return session_id, (timestamp, item_id, category)
def parse_buys(line):
parts = line.split(',')
session_id = int(parts[0])
timestamp, item_id, price, quantity = parse_datetime(parts[1]), parts[2], float(parts[3]), int(parts[4])
return session_id, (timestamp, item_id, price, quantity)
def sort_sessions((session_id, (clicks, buys))):
clicks = sorted(clicks, key=operator.itemgetter(0)) if clicks is not None else []
buys = sorted(buys, key=operator.itemgetter(0)) if buys is not None else []
return session_id, (clicks, buys)
# Start Spark context
%spark YoochooseSessions
# read input
clicks = sc.textFile('yoochoose/yoochoose-clicks.dat', 40).map(parse_clicks).groupByKey()
clicksTest = sc.textFile('yoochoose/yoochoose-test.dat', 40).map(parse_clicks).groupByKey()
buys = sc.textFile('yoochoose/yoochoose-buys.dat', 40).map(parse_buys).groupByKey()
train_sessions = clicks.fullOuterJoin(buys).map(sort_sessions)
test_sessions = clicksTest.map(lambda (session_id, clicks): (session_id, (clicks, None))).map(sort_sessions)
# screate session files
train_sessions.saveAsPickleFile('yoochoose/train_sessions.pickle')
test_sessions.saveAsPickleFile('yoochoose/test_sessions.pickle')
sc.stop()