from pyspark import SparkContext
sc = SparkContext( 'local[4]')
def isprime(n):
"""
check if integer n is a prime
"""
# make sure n is a positive integer
n = abs(int(n))
# 0 and 1 are not primes
if n < 2:
return False
# 2 is the only even prime number
if n == 2:
return True
# all other even numbers are not primes
if not n & 1:
return False
# range starts with 3 and only needs to go up the square root of n
# for all odd numbers
for x in range(3, int(n**0.5)+1, 2):
if n % x == 0:
return False
return True
%%timeit
import numpy as np
nums = xrange(1000000)
print np.sum([1 for x in nums if isprime(x)])
78498 78498 78498 78498 1 loops, best of 3: 4.81 s per loop
%%timeit
nums = sc.parallelize(xrange(1000000))
print nums.filter(isprime).count()
78498 78498 78498 78498 1 loops, best of 3: 2.71 s per loop
vname = !head -1 titanic.csv
vname = vname[0].split(',')
#!sed 1d titanic.csv > titanic_noheader.csv
raw = sc.textFile('titanic_noheader.csv')
raw.first() # 原始数据
u'0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S'
# 处理title
def extract_name(x):
import re
return re.search("\"(.*)\"", x).group(1)
names = raw.map(extract_name)
names.take(4)
[u'Braund, Mr. Owen Harris', u'Cumings, Mrs. John Bradley (Florence Briggs Thayer)', u'Heikkinen, Miss. Laina', u'Futrelle, Mrs. Jacques Heath (Lily May Peel)']
import re
title = names.map(lambda x: re.search(r", (.*?)\. ", x).group(1))
sorted(title.countByValue().iteritems(),key=lambda (k,v): v,reverse=True)
[(u'Mr', 517), (u'Miss', 182), (u'Mrs', 125), (u'Master', 40), (u'Dr', 7), (u'Rev', 6), (u'Major', 2), (u'Mlle', 2), (u'Col', 2), (u'Sir', 1), (u'the Countess', 1), (u'Don', 1), (u'Capt', 1), (u'Lady', 1), (u'Jonkheer', 1), (u'Ms', 1), (u'Mme', 1)]
top_title = [x[0] for x in sorted(title.countByValue().iteritems(),key=lambda (k,v): v,reverse=True)[:4]]
top_title
[u'Mr', u'Miss', u'Mrs', u'Master']
def assign_title(x):
if x in top_title: return x
else: return u'other'
title_less = title.map(assign_title)
title_less.take(4)
[u'Mr', u'Mrs', u'Miss', u'Mrs']
# 处理其它数据
def split_rest(x):
import re
rec = re.sub("\"(.*)\",", '', x)
return rec.split(',')
df = raw.map(split_rest)
df.first()
[u'0', u'3', u'male', u'22', u'1', u'0', u'A/5 21171', u'7.25', u'', u'S']
# 观察数据
vname.remove('name')
# 取值个数
m = len(df.first())
for i in range(m):
print '%dth variable:%s distinct value: %s' %(i, vname[i],df.map(lambda row: row[i]).distinct().count())
0th variable:survived distinct value: 2 1th variable:pclass distinct value: 3 2th variable:sex distinct value: 2 3th variable:age distinct value: 89 4th variable:sibsp distinct value: 7 5th variable:parch distinct value: 7 6th variable:ticket distinct value: 681 7th variable:fare distinct value: 248 8th variable:cabin distinct value: 148 9th variable:embarked distinct value: 4
# 缺失个数
for i in range(m):
print '%dth variable:%s miss value: %s' %(i, vname[i],df.map(lambda row: row[i]=='').sum())
0th variable:survived miss value: 0 1th variable:pclass miss value: 0 2th variable:sex miss value: 0 3th variable:age miss value: 177 4th variable:sibsp miss value: 0 5th variable:parch miss value: 0 6th variable:ticket miss value: 0 7th variable:fare miss value: 0 8th variable:cabin miss value: 687 9th variable:embarked miss value: 2
# 处理年龄缺失
age = df.map(lambda x: x[3])
title_age = title.zip(age)
title_age = title_age.mapValues(lambda x: float(x) if x!='' else -1)
import numpy as np
def miss_mean(data):
res = [x for x in data if x!=-1]
return np.mean(res)
age_dict = dict(title_age.groupByKey().map(lambda (k,v): (k, miss_mean(v.data))).collect())
age_dict
{u'Capt': 70.0, u'Col': 58.0, u'Don': 40.0, u'Dr': 42.0, u'Jonkheer': 38.0, u'Lady': 48.0, u'Major': 48.5, u'Master': 4.5741666666666667, u'Miss': 21.773972602739725, u'Mlle': 24.0, u'Mme': 24.0, u'Mr': 32.368090452261306, u'Mrs': 35.898148148148145, u'Ms': 28.0, u'Rev': 43.166666666666664, u'Sir': 49.0, u'the Countess': 33.0}
def age_func((title,age)):
if age== -1: res = (title, age_dict[title])
else: res = (title, age)
return res
title_age = title_age.map(age_func)
age_imputed = title_age.values()
age_imputed.take(4)
[22.0, 38.0, 26.0, 35.0]
# 处理 embarked缺失
df.map(lambda record: record[9]).countByValue()
defaultdict(<type 'int'>, {u'Q': 77, u'': 2, u'S': 644, u'C': 168})
def embarked_func(record):
if record[9]=='' : return u'S'
else: return record[9]
embarked= df.map(embarked_func)
# 将四个类别变量转为0-1二元变量
title_dict = title_less.distinct().zipWithIndex().collectAsMap()
title_dict
{u'Master': 1, u'Miss': 0, u'Mr': 3, u'Mrs': 4, u'other': 2}
def create_vector(term, term_dict):
#from scipy import sparse as sp
num_terms = len(term_dict)
#x = sp.csc_matrix((1, num_terms))
x = [0]*num_terms
idx = term_dict[term]
x[idx] = 1
return x
create_vector(u'Master',title_dict)
[0, 1, 0, 0, 0]
title_ind = title_less.map(lambda x: create_vector(x,title_dict))
title_ind.take(4)
[[0, 0, 0, 1, 0], [0, 0, 0, 0, 1], [1, 0, 0, 0, 0], [0, 0, 0, 0, 1]]
pclass_dict = df.map(lambda x: x[1]).distinct().zipWithIndex().collectAsMap()
pclass_dict
{u'1': 0, u'2': 2, u'3': 1}
pclass_ind = df.map(lambda x: create_vector(x[1],pclass_dict))
pclass_ind.take(4)
[[0, 1, 0], [1, 0, 0], [0, 1, 0], [1, 0, 0]]
embarked_dict = embarked.distinct().zipWithIndex().collectAsMap()
embarked_dict
{u'C': 2, u'Q': 0, u'S': 1}
embarked_ind = embarked.map(lambda x: create_vector(x,embarked_dict))
embarked_ind.take(4)
[[0, 1, 0], [0, 0, 1], [0, 1, 0], [0, 1, 0]]
gender_ind = df.map(lambda x: 1 if x[2]==u'male' else 0)
# 合并数据
restdf = df.map(lambda x: [int(x[0]),int(x[4]), int(x[5]), float(x[7])]).zipWithIndex().map(lambda (v,k): (k,v))
restdf.take(4)
[(0, [0, 1, 0, 7.25]), (1, [1, 1, 0, 71.2833]), (2, [1, 0, 0, 7.925]), (3, [1, 1, 0, 53.1])]
title_ind = title_ind.zipWithIndex().map(lambda (v,k): (k,v))
title_ind.take(4)
[(0, [0, 0, 0, 1, 0]), (1, [0, 0, 0, 0, 1]), (2, [1, 0, 0, 0, 0]), (3, [0, 0, 0, 0, 1])]
pclass_ind = pclass_ind.zipWithIndex().map(lambda (v,k): (k,v))
pclass_ind.take(4)
[(0, [0, 1, 0]), (1, [1, 0, 0]), (2, [0, 1, 0]), (3, [1, 0, 0])]
embarked_ind = embarked_ind.zipWithIndex().map(lambda (v,k): (k,v))
embarked_ind.take(4)
[(0, [0, 1, 0]), (1, [0, 0, 1]), (2, [0, 1, 0]), (3, [0, 1, 0])]
gender_ind = gender_ind.zipWithIndex().map(lambda (v,k): (k,[v]))
gender_ind.take(4)
[(0, [1]), (1, [0]), (2, [0]), (3, [0])]
age_imputed = age_imputed.zipWithIndex().map(lambda (v,k): (k,[v]))
age_imputed.take(4)
[(0, [22.0]), (1, [38.0]), (2, [26.0]), (3, [35.0])]
finaldf = restdf.union(embarked_ind).reduceByKey(lambda x,y: x + y)
finaldf = finaldf.union(age_imputed).reduceByKey(lambda x,y: x + y)
finaldf = finaldf.union(gender_ind).reduceByKey(lambda x,y: x + y)
finaldf = finaldf.union(title_ind).reduceByKey(lambda x,y: x + y)
finaldf = finaldf.union(pclass_ind).reduceByKey(lambda x,y: x + y)
finaldf.take(4)
[(0, [0, 1, 0, 7.25, 0, 1, 0, 22.0, 1, 0, 0, 0, 1, 0, 0, 1, 0]), (384, [0, 0, 0, 7.8958, 0, 1, 0, 32.368090452261306, 1, 0, 0, 0, 1, 0, 0, 1, 0]), (132, [0, 1, 0, 14.5, 0, 1, 0, 47.0, 0, 0, 0, 0, 0, 1, 0, 1, 0]), (588, [0, 0, 0, 8.05, 0, 1, 0, 22.0, 1, 0, 0, 0, 1, 0, 0, 1, 0])]
# 准备建模需要格式
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
def parsePoint(line):
features = line[1][1:]
target = line[1][0]
return LabeledPoint(target, features)
modeldata = finaldf.map(parsePoint)
modeldata.first()
LabeledPoint(0.0, [1.0,0.0,7.25,0.0,1.0,0.0,22.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0])
# 数据切分
train, test = modeldata.randomSplit([0.75,0.25])
# 建模
model = LogisticRegressionWithSGD.train(train,iterations =1000,regType='l2')
# 评估
labelsAndPreds = test.map(lambda p: (p.label, model.predict(p.features)))
testErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(test.count())
print("Training Error = " + str(testErr))
Training Error = 0.308056872038