import numpy as np
import csv
def data_generate(fileName, w=[0,0]):
size = 100
np.random.seed(0)
x = np.random.uniform(-4, 4, 100)
noise = np.random.normal(0, 2, 100)
y = (x * w[0] + w[1] + noise)
data = zip(y, x)
with open(fileName,'wb') as f:
writer = csv.writer(f)
for row in data:
writer.writerow(row)
return True
The true y = 8x - 2.
w = [8,-2]
data_generate('data.csv', w)
True
%matplotlib inline
import matplotlib.pyplot as plt
def dataPlot(file, w):
with open(file, 'r') as f:
reader = csv.reader(f)
for row in reader:
plt.plot(float(row[1]), float(row[0]),'o'+'r')
plt.xlabel("x")
plt.ylabel("y")
x = [-4, 4]
y = [(i * w[0] + w[1]) for i in x]
plt.plot(x,y, linewidth=2.0)
plt.grid()
plt.show()
dataPlot('data.csv',w)
import os
import sys
spark_home = os.environ['SPARK_HOME'] = '/Users/liang/Downloads/spark-1.4.1-bin-hadoop2.6/'
if not spark_home:
raise ValueError('SPARK_HOME enviroment variable is not set')
sys.path.insert(0,os.path.join(spark_home,'python'))
sys.path.insert(0,os.path.join(spark_home,'python/lib/py4j-0.8.2.1-src.zip'))
execfile(os.path.join(spark_home,'python/pyspark/shell.py'))
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.4.1 /_/ Using Python version 2.7.10 (default, May 28 2015 17:04:42) SparkContext available as sc, HiveContext available as sqlContext.
from collections import namedtuple
import numpy as np
Point = namedtuple('Point', 'x y')
def readPoint(line):
d = line.split(',')
x = [float(i) for i in d[1:]]
x.append(1.0) #bias term
return Point(x, float(d[0]))
def linearRegressionGD(data, wInitial=None, learningRate=0.05, iterations=50):
featureLen = len(data.take(1)[0].x)
n = data.count()
if wInitial is None:
w = np.random.normal(size=featureLen) # w should be broadcasted if it is large
else:
w = wInitial
for i in range(iterations):
wBroadcast = sc.broadcast(w)
gradient = data.map(lambda p: -2 * (p.y - np.dot(wBroadcast.value, p.x)) * np.array(p.x)) \
.reduce(lambda a, b: a + b)
w = w - learningRate * gradient/n
return w
data = sc.textFile('data.csv').map(readPoint).cache()
linearRegressionGD(data)
array([ 7.98418741, -1.61969498])
def ierationsPlot(fileName, truew):
x = [-4, 4]
w = truew
y = [(i * w[0] + w[1]) for i in x]
plt.plot(x, y, 'b', label="True line", linewidth=4.0)
np.random.seed(400)
w = np.random.normal(0,1,2)
y = [(i * w[0] + w[1]) for i in x]
plt.plot(x, y, 'r--', label="After 0 Iterations", linewidth=2.0)
data = sc.textFile(fileName).map(readPoint).cache()
w = linearRegressionGD(data, w, iterations=1)
y = [(i * w[0] + w[1]) for i in x]
plt.plot(x, y, 'g--', label="After 1 Iterations", linewidth=2.0)
w = linearRegressionGD(data, w, iterations=2)
y = [(i * w[0] + w[1]) for i in x]
plt.plot(x, y, 'm--', label="After 2 Iterations", linewidth=2.0)
w = linearRegressionGD(data, w, iterations=3)
y = [(i * w[0] + w[1]) for i in x]
plt.plot(x, y, 'y--', label="After 3 Iterations", linewidth=2.0)
plt.legend(bbox_to_anchor=(1.05, 1), loc=2, fontsize=20, borderaxespad=0.)
plt.xlabel("x")
plt.ylabel("y")
plt.grid()
plt.show()
ierationsPlot('data.csv',w)
def linearRegressionGDReg(data, wInitial=None, learningRate=0.05, iterations=50, regParam=0.01, regType=None):
featureLen = len(data.take(1)[0].x)
n = data.count()
if wInitial is None:
w = np.random.normal(size=featureLen) # w should be broadcasted if it is large
else:
w = wInitial
for i in range(iterations):
wBroadcast = sc.broadcast(w)
gradient = data.map(lambda p: -2 * (p.y-np.dot(wBroadcast.value,p.x)) * np.array(p.x)) \
.reduce(lambda a, b: a + b)
if regType == "Ridge":
wReg = w * 1
wReg[-1] = 0 #last value of weight vector is bias term, ignored in regularization
elif regType == "Lasso":
wReg = w * 1
wReg[-1] = 0 #last value of weight vector is bias term, ignored in regularization
wReg = (wReg>0).astype(int) * 2-1
else:
wReg = np.zeros(w.shape[0])
gradient = gradient + regParam * wReg #gradient: GD of Sqaured Error+ GD of regularized term
w = w - learningRate * gradient / n
return w
np.random.seed(400)
linearRegressionGDReg(data, iterations=50, regParam=0.1, regType="Ridge")
array([ 7.98398871, -1.60876051])
np.random.seed(400)
linearRegressionGDReg(data, iterations=50, regParam=0.1, regType="Lasso")
array([ 7.98466543, -1.60811709])