#!/usr/bin/env python # coding: utf-8 # #Data Generation # In[1]: import numpy as np import csv def data_generate(fileName, w=[0,0]): size = 100 np.random.seed(0) x = np.random.uniform(-4, 4, 100) noise = np.random.normal(0, 2, 100) y = (x * w[0] + w[1] + noise) data = zip(y, x) with open(fileName,'wb') as f: writer = csv.writer(f) for row in data: writer.writerow(row) return True # The true y = 8x - 2. # In[2]: w = [8,-2] data_generate('data.csv', w) # #Data Visualization # In[3]: get_ipython().run_line_magic('matplotlib', 'inline') import matplotlib.pyplot as plt def dataPlot(file, w): with open(file, 'r') as f: reader = csv.reader(f) for row in reader: plt.plot(float(row[1]), float(row[0]),'o'+'r') plt.xlabel("x") plt.ylabel("y") x = [-4, 4] y = [(i * w[0] + w[1]) for i in x] plt.plot(x,y, linewidth=2.0) plt.grid() plt.show() # In[4]: dataPlot('data.csv',w) # #Start Spark # In[5]: import os import sys spark_home = os.environ['SPARK_HOME'] = '/Users/liang/Downloads/spark-1.4.1-bin-hadoop2.6/' if not spark_home: raise ValueError('SPARK_HOME enviroment variable is not set') sys.path.insert(0,os.path.join(spark_home,'python')) sys.path.insert(0,os.path.join(spark_home,'python/lib/py4j-0.8.2.1-src.zip')) execfile(os.path.join(spark_home,'python/pyspark/shell.py')) # #Gradient descent (no regularization) # In[6]: from collections import namedtuple import numpy as np Point = namedtuple('Point', 'x y') def readPoint(line): d = line.split(',') x = [float(i) for i in d[1:]] x.append(1.0) #bias term return Point(x, float(d[0])) def linearRegressionGD(data, wInitial=None, learningRate=0.05, iterations=50): featureLen = len(data.take(1)[0].x) n = data.count() if wInitial is None: w = np.random.normal(size=featureLen) # w should be broadcasted if it is large else: w = wInitial for i in range(iterations): wBroadcast = sc.broadcast(w) gradient = data.map(lambda p: -2 * (p.y - np.dot(wBroadcast.value, p.x)) * np.array(p.x)) \ .reduce(lambda a, b: a + b) w = w - learningRate * gradient/n return w # In[7]: data = sc.textFile('data.csv').map(readPoint).cache() linearRegressionGD(data) # ##Plot w in iterations # In[8]: def ierationsPlot(fileName, truew): x = [-4, 4] w = truew y = [(i * w[0] + w[1]) for i in x] plt.plot(x, y, 'b', label="True line", linewidth=4.0) np.random.seed(400) w = np.random.normal(0,1,2) y = [(i * w[0] + w[1]) for i in x] plt.plot(x, y, 'r--', label="After 0 Iterations", linewidth=2.0) data = sc.textFile(fileName).map(readPoint).cache() w = linearRegressionGD(data, w, iterations=1) y = [(i * w[0] + w[1]) for i in x] plt.plot(x, y, 'g--', label="After 1 Iterations", linewidth=2.0) w = linearRegressionGD(data, w, iterations=2) y = [(i * w[0] + w[1]) for i in x] plt.plot(x, y, 'm--', label="After 2 Iterations", linewidth=2.0) w = linearRegressionGD(data, w, iterations=3) y = [(i * w[0] + w[1]) for i in x] plt.plot(x, y, 'y--', label="After 3 Iterations", linewidth=2.0) plt.legend(bbox_to_anchor=(1.05, 1), loc=2, fontsize=20, borderaxespad=0.) plt.xlabel("x") plt.ylabel("y") plt.grid() plt.show() # In[9]: ierationsPlot('data.csv',w) # #Gradient descent (regularization) # In[10]: def linearRegressionGDReg(data, wInitial=None, learningRate=0.05, iterations=50, regParam=0.01, regType=None): featureLen = len(data.take(1)[0].x) n = data.count() if wInitial is None: w = np.random.normal(size=featureLen) # w should be broadcasted if it is large else: w = wInitial for i in range(iterations): wBroadcast = sc.broadcast(w) gradient = data.map(lambda p: -2 * (p.y-np.dot(wBroadcast.value,p.x)) * np.array(p.x)) \ .reduce(lambda a, b: a + b) if regType == "Ridge": wReg = w * 1 wReg[-1] = 0 #last value of weight vector is bias term, ignored in regularization elif regType == "Lasso": wReg = w * 1 wReg[-1] = 0 #last value of weight vector is bias term, ignored in regularization wReg = (wReg>0).astype(int) * 2-1 else: wReg = np.zeros(w.shape[0]) gradient = gradient + regParam * wReg #gradient: GD of Sqaured Error+ GD of regularized term w = w - learningRate * gradient / n return w # ##Ridge Regression # In[11]: np.random.seed(400) linearRegressionGDReg(data, iterations=50, regParam=0.1, regType="Ridge") # ##Lasso Regression # In[12]: np.random.seed(400) linearRegressionGDReg(data, iterations=50, regParam=0.1, regType="Lasso")