import collections
import datetime
import operator
import math
import functools
from utils import *
# Start Spark context
%spark YoochooseFeatures
sc.addPyFile('utils.py')
train_sessions, test_sessions = load_sessions(sc)
splits_purchase = {
'train': train_sessions.filter(random_subset('ex2:10:0,1,2,3,4,5,6,7,8')),
'valid': train_sessions.filter(random_subset('ex2:10:9')),
'test': test_sessions,
}
splits_item = {
'train': train_sessions.filter(random_subset('ex2:10:0,1,2,3,4,5,6,7,8')).filter(positive_session),
'valid': train_sessions.filter(random_subset('ex2:10:9')).filter(positive_session),
'valid_full': train_sessions.filter(random_subset('ex2:10:9')),
'test': test_sessions,
}
def extract_buys_info((session_id, (clicks, buys))):
for dt, item_id, qty, price in buys:
yield Row(session_id=session_id, item_id=item_id, qty=qty, price=price)
def extract_clicks_info((session_id, (clicks, buys))):
for dt, item_id, cat in clicks:
yield Row(session_id=session_id, item_id=item_id, cat=cat)
train_sessions.flatMap(extract_buys_info).toDF().registerTempTable('buy_events')
train_sessions.flatMap(extract_clicks_info).toDF().registerTempTable('click_events')
bought_item_stats = hive.sql("""
SELECT
item_id,
COUNT(1) as buys
FROM buy_events
GROUP BY item_id
ORDER BY buys DESC, item_id
""").toPandas()
clicked_cat_stats = hive.sql("""
SELECT
cat,
COUNT(DISTINCT session_id) as n_sessions
FROM click_events
GROUP BY cat
ORDER BY n_sessions DESC, cat
""").toPandas()
items_of_interest = list(bought_item_stats['item_id'])
print 'Items of interest:', len(items_of_interest)
cats_of_interest = list(clicked_cat_stats['cat'])
print 'Cats of interest:', len(cats_of_interest)
Items of interest: 19949 Cats of interest: 339
def features_from_timestamp(ts):
features = [
('categ', ts.strftime('m%m')),
('categ', ts.strftime('d%d')),
('categ', ts.strftime('md%m%d')),
('categ', ts.strftime('mdh%m%d%H')),
('categ', ts.strftime('H%H')),
('categ', ts.strftime('M%M')),
('categ', ts.weekday()),
('num', ts.month),
('num', ts.day),
('num', ts.hour),
('num', ts.minute),
('num', (ts - datetime.datetime(2014, 1, 1)).total_seconds()),
('num', ts.minute + ts.hour * 60),
('num', (ts.minute + ts.hour * 60) * 60 + ts.second),
]
return features
def extract_session_features(session, items_of_interest=[], cats_of_interest=[]):
session_id, (clicks, buys) = session
buyed = set()
buyed_price = collections.Counter()
buyed_qty = collections.Counter()
for ts, item_id, price, qty in buys:
buyed.add(item_id)
buyed_price[item_id] += price
buyed_qty[item_id] += qty
clicked_items = set()
clicked_cats = set()
clicked_itemcats = set()
clicked_item_cnt = collections.Counter()
clicked_cat_cnt = collections.Counter()
clicked_itemcat_cnt = collections.Counter()
click_first_time = {}
click_last_time = {}
session_start = None
session_last_click = None
item_cats = collections.defaultdict(set)
for ts, item_id, cat in clicks:
clicked_items.add(item_id)
clicked_cats.add(cat)
clicked_itemcats.add(item_id + '$' + cat)
clicked_item_cnt[item_id] += 1
clicked_cat_cnt[cat] += 1
clicked_itemcat_cnt[item_id + '$' + cat] += 1
click_first_time[item_id] = min(ts, click_first_time.get(item_id, ts))
click_last_time[item_id] = max(ts, click_last_time.get(item_id, ts))
item_cats[item_id].add(cat)
if session_start is None or ts < session_start:
session_start = ts
if session_last_click is None or ts > session_last_click:
session_last_click = ts
item_duration = collections.Counter()
item_periods = collections.Counter()
cat_duration = collections.Counter()
cat_periods = collections.Counter()
sum_duration = 0.0
clicks = sorted(clicks, key=operator.itemgetter(0))
for (ts, item_id, cat), (ts_next, item_id_next, cat_next) in zip(clicks[:-1], clicks[1:]):
if ts_next - ts > datetime.timedelta(hours=1):
duration = 10.0 # end of the session
else:
duration = (ts_next - ts).total_seconds()
duration = (ts_next - ts).total_seconds()
item_duration[item_id] += duration
item_periods[item_id] += 1
cat_duration[cat] += duration
cat_periods[cat] += 1
sum_duration += duration
features = []
# time features
features += features_from_timestamp(session_start)
features += features_from_timestamp(session_last_click)
# session length
features + [
('num', (session_last_click - session_start).total_seconds()),
]
# click counts
features.extend([
('num', len(clicks)),
('num', len(clicked_items)),
('num', len(clicked_cats)),
('num', len(clicked_itemcats)),
])
# top items/cats by the number of clicks in the session
ranked_items = sorted(clicked_item_cnt.items(), key=operator.itemgetter(1), reverse=True)
features += [
('categ', ranked_items[i][0] if i < len(ranked_items) else None)
for i in xrange(10)
]
ranked_cats = sorted(clicked_cat_cnt.items(), key=operator.itemgetter(1), reverse=True)
features += [
('categ', ranked_cats[i][0] if i < len(ranked_cats) else None)
for i in xrange(5)
]
# last/first item, clicked >= k times
def first_and_last_item_clickes_geq_k_times(seq_clicks, k):
candidates = [item_id for _, item_id, _ in seq_clicks if clicked_item_cnt[item_id] >= k]
if len(candidates) == 0:
return None, None
else:
return candidates[0], candidates[-1]
for k in (1, 2, 3, 4, 5, 6):
first_item, last_item = first_and_last_item_clickes_geq_k_times(clicks, k)
features += [
('categ', first_item),
('categ', last_item),
]
# dense statistics for several items/cats
features += [
('num', clicked_item_cnt.get(item_id, 0))
for item_id in items_of_interest
]
features += [
('num', item_duration.get(item_id, 0))
for item_id in items_of_interest
]
features += [
('num', clicked_cat_cnt.get(cat, 0))
for cat in cats_of_interest
]
features += [
('num', cat_duration.get(cat, 0))
for cat in cats_of_interest
]
target = int(len(buyed) > 0)
yield session_id, 0, target, features
def extract_session_item_features(session, items_of_interest=[], cats_of_interest=[]):
session_id, (clicks, buys) = session
buyed = set()
buyed_price = collections.Counter()
buyed_qty = collections.Counter()
for ts, item_id, price, qty in buys:
buyed.add(item_id)
buyed_price[item_id] += price
buyed_qty[item_id] += qty
clicked_items = set()
clicked_cats = set()
clicked_itemcats = set()
clicked_item_cnt = collections.Counter()
clicked_cat_cnt = collections.Counter()
clicked_itemcat_cnt = collections.Counter()
click_first_time = {}
click_last_time = {}
session_start = None
session_last_click = None
item_cats = collections.defaultdict(set)
for ts, item_id, cat in clicks:
clicked_items.add(item_id)
clicked_cats.add(cat)
clicked_itemcats.add(item_id + '$' + cat)
clicked_item_cnt[item_id] += 1
clicked_cat_cnt[cat] += 1
clicked_itemcat_cnt[item_id + '$' + cat] += 1
click_first_time[item_id] = min(ts, click_first_time.get(item_id, ts))
click_last_time[item_id] = max(ts, click_last_time.get(item_id, ts))
item_cats[item_id].add(cat)
if session_start is None or ts < session_start:
session_start = ts
if session_last_click is None or ts > session_last_click:
session_last_click = ts
item_duration = collections.Counter()
item_periods = collections.Counter()
cat_duration = collections.Counter()
cat_periods = collections.Counter()
sum_duration = 0.0
clicks = sorted(clicks, key=operator.itemgetter(0))
for (ts, item_id, cat), (ts_next, item_id_next, cat_next) in zip(clicks[:-1], clicks[1:]):
if ts_next - ts > datetime.timedelta(hours=1):
duration = 10.0 # end of the session
else:
duration = (ts_next - ts).total_seconds()
item_duration[item_id] += duration
item_periods[item_id] += 1
cat_duration[cat] += duration
cat_periods[cat] += 1
sum_duration += duration
for cur_item_id in clicked_items:
# target item_id
features = [
('categ', cur_item_id),
]
# time features
features += features_from_timestamp(session_start)
features += features_from_timestamp(session_last_click)
features += features_from_timestamp(click_first_time[cur_item_id])
features += features_from_timestamp(click_last_time[cur_item_id])
# durations
features += [
('num', (session_last_click - session_start).total_seconds()),
('num', (click_last_time[cur_item_id] - click_first_time[cur_item_id]).total_seconds()),
('num', item_duration[cur_item_id]),
('num', item_duration[cur_item_id] / (sum_duration + 1e-8)),
('num', item_duration[cur_item_id] / (item_periods[cur_item_id] + 1e-8)),
('num', sum(cat_duration[cat] for cat in item_cats[cur_item_id])),
('num', sum(cat_duration[cat] for cat in item_cats[cur_item_id]) / (sum_duration + 1e-8)),
('num', sum(cat_duration[cat] / (cat_periods[cat] + 1e-8) for cat in item_cats[cur_item_id])),
]
# click counts
features.extend([
('num', clicked_item_cnt[cur_item_id]),
('num', len(clicks)),
('num', float(clicked_item_cnt[cur_item_id]) / len(clicks)),
('num', len(clicked_items)),
('num', len(clicked_cats)),
('num', len(clicked_itemcats)),
('num', len(item_cats[cur_item_id])),
])
# top items/cats by the number of clicks in the session
ranked_items = sorted(clicked_item_cnt.items(), key=operator.itemgetter(1), reverse=True)
features += [
('categ', ranked_items[i][0] if i < len(ranked_items) else None)
for i in xrange(10)
]
ranked_cats = sorted(clicked_cat_cnt.items(), key=operator.itemgetter(1), reverse=True)
features += [
('categ', ranked_cats[i][0] if i < len(ranked_cats) else None)
for i in xrange(5)
]
# last/first item, clicked >= k times
def first_and_last_item_clickes_geq_k_times(seq_clicks, k):
candidates = [item_id for _, item_id, _ in seq_clicks if clicked_item_cnt[item_id] >= k]
if len(candidates) == 0:
return None, None
else:
return candidates[0], candidates[-1]
for k in (1, 2, 3, 4, 5, 6):
first_item, last_item = first_and_last_item_clickes_geq_k_times(clicks, k)
features += [
('categ', first_item),
('categ', last_item),
]
# dense statistics on items/cats of interest
features += [
('num', clicked_item_cnt.get(item_id, 0))
for item_id in items_of_interest
]
features += [
('num', item_duration.get(item_id, 0))
for item_id in items_of_interest
]
features += [
('num', clicked_cat_cnt.get(cat, 0))
for cat in cats_of_interest
]
features += [
('num', cat_duration.get(cat, 0))
for cat in cats_of_interest
]
target = int(cur_item_id in buyed)
yield session_id, cur_item_id, target, features
construct_feature_set(sc,
'features_purchase',
splits_purchase,
extract_session_features,
items_of_interest=items_of_interest[:100],
cats_of_interest=cats_of_interest[:50],
)
construct_feature_set(sc,
'features_item',
splits_item,
extract_session_item_features,
items_of_interest=items_of_interest[:100],
cats_of_interest=cats_of_interest[:50],
)
Constructing feature sets: features_purchase test train valid 1 min Constructing feature sets: features_item test train valid_full valid 1 min