Spark extras

Generating Big Data

In [1]:
# import libraries
import csv, time
import numpy as np
import pandas as pd
from pyspark import StorageLevel
In [2]:
# Read mock data
mock_data = []
with open("data/MOCK_DATA.csv") as mock_file:
    mock_data = mock_file.readlines()

# randomize index
num_range = len(mock_data) - 1
sample_items = 100
num_iterations = 100000

# generate Big Data
with open("data/big_data.csv", "w") as big_data:
    # write header
    writer = csv.writer(big_data)
    writer.writerow(mock_data[0].strip().split(","))
    
    # write lines by random sampling 100 items over 1000 lines
    next_line = 1
    for i in range(0, num_iterations):
        lines = []
        choices = np.random.choice(num_range, sample_items)
        for idx in choices:
            line = mock_data[idx].strip().split(",")
            line[0] = next_line
            lines.append(line)
            next_line += 1

        writer.writerows(lines)

print "Done generating."        
Done generating.
In [3]:
!ls data/
MOCK_DATA.csv              sample_kmeans_data.txt
big_data.csv               sample_lda_libsvm_data.txt
In [4]:
!du -hs data/big_data.csv
638M	data/big_data.csv

Store as Parquet format

In [5]:
df = sqlContext.read \
    .format("com.databricks.spark.csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("data/big_data.csv")

df.write.parquet("data/big_data_parquet", mode="overwrite")
print "Store parquet completed."
Store parquet completed.
In [6]:
!du -hs data/big_data_parquet
 89M	data/big_data_parquet

Self implement vs Pandas

In [7]:
start = time.time()
with open("data/big_data.csv", "r") as big_data:
    # read csv file
    reader = csv.reader(big_data)
    header = reader.next()
    num_row = 0
    for row in reader:
        num_row += 1

print "Numer of rows:", num_row
done = time.time()
elapsed = done - start
print "Total running time:", elapsed, "seconds"
Numer of rows: 10000000
Total running time: 11.6682560444 seconds
In [8]:
start = time.time()

pd_big_data = pd.read_csv("data/big_data.csv")
pd_big_data["id"].count()

done = time.time()
elapsed = done - start
print "Total running time:", elapsed, "seconds"
Total running time: 9.40140104294 seconds

RDD vs Dataframe

Loading

In [9]:
# loading csv to RDD
lines = sc.textFile("data/big_data.csv")
parts = lines.map(lambda l: l.split(","))
rdd_people = parts.map(lambda p: (p[0], p[1], p[2], p[3], p[4], p[5].strip()))
rdd_people.count()
Out[9]:
10000001
In [10]:
# loading csv to Dataframe
df_people = sqlContext.read \
    .format("com.databricks.spark.csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("data/big_data.csv")
df_people.count()
Out[10]:
10000000
In [11]:
# loading parquet to Dataframe
df_people = sqlContext.read.parquet("data/big_data_parquet")
df_people.count()
Out[11]:
10000000

Caching

  • DISK_ONLY = StorageLevel(True, False, False, False, 1)
  • DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
  • MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)
  • MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)
  • MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False, 1)
  • MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)
  • MEMORY_ONLY = StorageLevel(False, True, False, False, 1)
  • MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
  • MEMORY_ONLY_SER = StorageLevel(False, True, False, False, 1)
  • MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)
  • OFF_HEAP = StorageLevel(True, True, True, False, 1)
In [12]:
# RDD cache
rdd_people.cache()
rdd_people.count()
Out[12]:
10000001
In [13]:
# disk only
rdd_people.unpersist()
rdd_people.persist(storageLevel=StorageLevel(True, False, False, False, 1))
rdd_people.count()
Out[13]:
10000001
In [14]:
rdd_people.count()
Out[14]:
10000001
In [15]:
# Dataframe cache
df_people.cache()
df_people.count()
Out[15]:
10000000
In [16]:
df_people.count()
Out[16]:
10000000
In [17]:
# unpersist all
rdd_people.unpersist()
df_people.unpersist()
Out[17]:
DataFrame[id: int, first_name: string, last_name: string, email: string, gender: string, ip_address: string]

Processing

In [18]:
# RDD cache
rdd_people.cache()
rdd_people.count()
Out[18]:
10000001
In [19]:
# Dataframe cache
df_people.cache()
df_people.count()
Out[19]:
10000000
In [20]:
rdd_people.distinct().count()
Out[20]:
10000001
In [21]:
df_people.distinct().count()
Out[21]:
10000000
In [22]:
rdd_people.filter(lambda x: x[4] == "Male").count()
Out[22]:
5160545
In [23]:
df_people.filter(df_people["gender"] == "Male").count()
Out[23]:
5160545

SQL vs Built-in functions

In [24]:
df_people.filter(df_people["gender"] == "Male")\
        .select(df_people["first_name"], df_people["email"]\
        .alias("mail")).show(5)
+----------+--------------------+
|first_name|                mail|
+----------+--------------------+
|    Linoel|    [email protected]|
|    Ripley|[email protected]|
|    Rooney|[email protected]|
|    Lovell|[email protected]|
|   Cordell|[email protected]|
+----------+--------------------+
only showing top 5 rows

In [25]:
sqlContext.registerDataFrameAsTable(df_people, "tbl_people")
sqlContext.sql("""
    SELECT first_name, email AS mail
    FROM tbl_people
    WHERE gender = 'Male'
""").show(5)
+----------+--------------------+
|first_name|                mail|
+----------+--------------------+
|    Linoel|    [email protected]|
|    Ripley|[email protected]|
|    Rooney|[email protected]|
|    Lovell|[email protected]|
|   Cordell|[email protected]|
+----------+--------------------+
only showing top 5 rows

Machine Learning

k-Means

In [26]:
from pyspark.ml.clustering import KMeans

# Loads data.
dataset = spark.read.format("libsvm").load("data/sample_kmeans_data.txt")
print dataset.take(2)

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# Evaluate clustering by computing Within Set Sum of Squared Errors.
wssse = model.computeCost(dataset)
print("Within Set Sum of Squared Errors = " + str(wssse))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)
[Row(label=0.0, features=SparseVector(3, {})), Row(label=1.0, features=SparseVector(3, {0: 0.1, 1: 0.1, 2: 0.1}))]
Within Set Sum of Squared Errors = 0.12
Cluster Centers: 
[ 0.1  0.1  0.1]
[ 9.1  9.1  9.1]

LDA

In [27]:
from pyspark.ml.clustering import LDA

# Loads data.
dataset = spark.read.format("libsvm").load("data/sample_lda_libsvm_data.txt")
print dataset.take(1)

# Trains a LDA model.
lda = LDA(k=10, maxIter=10)
model = lda.fit(dataset)

ll = model.logLikelihood(dataset)
lp = model.logPerplexity(dataset)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

# Describe topics.
topics = model.describeTopics(2)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the result
transformed = model.transform(dataset)
transformed.show(truncate=False)
[Row(label=0.0, features=SparseVector(11, {0: 1.0, 1: 2.0, 2: 6.0, 4: 2.0, 5: 3.0, 6: 1.0, 7: 1.0, 10: 3.0}))]
The lower bound on the log likelihood of the entire corpus: -807.523779034
The upper bound on perplexity: 3.1058607098
The topics described by their top-weighted terms:
+-----+-----------+------------------------------------------+
|topic|termIndices|termWeights                               |
+-----+-----------+------------------------------------------+
|0    |[4, 7]     |[0.10782279117289076, 0.09748059781126188]|
|1    |[1, 6]     |[0.16755680545542245, 0.14746675160950057]|
|2    |[1, 3]     |[0.10064404940528088, 0.10044227953257671]|
|3    |[1, 3]     |[0.10157580719995081, 0.0997449393879735] |
|4    |[9, 10]    |[0.10479880814180582, 0.10207371063193371]|
|5    |[8, 5]     |[0.10843493258130431, 0.09701505371078402]|
|6    |[8, 5]     |[0.09874157104646761, 0.09654281855423051]|
|7    |[9, 4]     |[0.1125248473532763, 0.09755082892584456] |
|8    |[5, 4]     |[0.1548707489550002, 0.14842696182913703] |
|9    |[3, 10]    |[0.23809170139198915, 0.10412979928333384]|
+-----+-----------+------------------------------------------+

+-----+---------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|features                                                       |topicDistribution                                                                                                                                                                                                      |
+-----+---------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0.0  |(11,[0,1,2,4,5,6,7,10],[1.0,2.0,6.0,2.0,3.0,1.0,1.0,3.0])      |[0.004830687376549748,0.9563377837515517,0.004830654392031491,0.004830691997365709,0.004830667868178218,0.004830690208597917,0.0048307249800948774,0.004830673837555869,0.004922723634004233,0.004924701954070474]     |
|1.0  |(11,[0,1,3,4,7,10],[1.0,3.0,1.0,3.0,2.0,1.0])                  |[0.008057899175270963,0.008707406312071347,0.008058025956346778,0.008058095756917977,0.008057857450925326,0.00805788610035481,0.008057863295169719,0.008057922472956825,0.9266708229866527,0.008216220493333638]       |
|2.0  |(11,[0,1,2,5,6,8,9],[1.0,4.0,1.0,4.0,9.0,1.0,2.0])             |[0.004199740220179303,0.9620401573867432,0.004199832021596322,0.004199768006758077,0.0041998008830341094,0.0041998188779895085,0.004199829669347301,0.004199780879768142,0.004279810976748579,0.004281461077835567]    |
|3.0  |(11,[0,1,3,6,8,9,10],[2.0,1.0,3.0,5.0,2.0,3.0,9.0])            |[0.0037148977019415986,0.5171649236275588,0.0037149262092321953,0.003714916859012322,0.0037150043988300675,0.003714951767438243,0.0037149809617586303,0.003714886247016027,0.0037853064740571217,0.4530452057531551]   |
|4.0  |(11,[0,1,2,3,4,6,9,10],[3.0,1.0,1.0,9.0,3.0,2.0,1.0,3.0])      |[0.004024698590879546,0.00434854915127394,0.004024734924719693,0.004024704186015809,0.004024729103561261,0.004024699645915288,0.004024728350735192,0.004024732738832953,0.004101250338655413,0.9633771729694108]       |
|5.0  |(11,[0,1,3,4,5,6,7,8,9],[4.0,2.0,3.0,4.0,5.0,1.0,1.0,1.0,4.0]) |[0.0037149196791821672,0.004014144755782942,0.0037150238353502916,0.0037149350730528103,0.003714984985771291,0.0037149612383395674,0.003714972648035956,0.0037150076003473935,0.7875949774197026,0.18238607276443486]  |
|6.0  |(11,[0,1,3,6,8,9,10],[2.0,1.0,3.0,5.0,2.0,2.0,9.0])            |[0.0038636384437946217,0.4588911758399325,0.0038636648639211735,0.00386366180952517,0.0038637468045085977,0.0038636989406232065,0.003863727465055344,0.003863616485805724,0.00393686849521447,0.5101262008516194]      |
|7.0  |(11,[0,1,2,3,4,5,6,9,10],[1.0,1.0,1.0,9.0,2.0,1.0,2.0,1.0,3.0])|[0.004390951192466292,0.004744392472015055,0.004391008613455925,0.004390986297746747,0.004391002598536641,0.004390967203785097,0.004390995209555504,0.004390994508486955,0.00447440696608868,0.960044294937863]        |
|8.0  |(11,[0,1,3,4,5,6,7],[4.0,4.0,3.0,4.0,2.0,1.0,3.0])             |[0.004391055602376864,0.004744679184009837,0.00439115377848033,0.004391124194262692,0.0043910387568340484,0.0043910790167236875,0.004391072093598296,0.004391105932938527,0.7784515959242768,0.18606609551649894]      |
|9.0  |(11,[0,1,2,4,6,8,9,10],[2.0,8.0,2.0,3.0,2.0,2.0,7.0,2.0])      |[0.0033302159688284975,0.9698999591198246,0.0033302386018501102,0.0033302022818941343,0.0033302152189945153,0.003330227969565667,0.00333022302764557,0.0033302283560539235,0.0033935016881488007,0.0033949877671941944]|
|10.0 |(11,[0,1,2,3,5,6,9,10],[1.0,1.0,1.0,9.0,2.0,2.0,3.0,3.0])      |[0.004199865804144868,0.004538523687990608,0.004199964924904667,0.0041998976831187276,0.004199967375784788,0.004199909609085674,0.004199952755541441,0.004199953028483417,0.004279696097068255,0.9617822690338775]     |
|11.0 |(11,[0,1,4,5,6,7,9],[4.0,1.0,4.0,5.0,1.0,3.0,1.0])             |[0.004830550832388801,0.005219225116360582,0.004830587434608386,0.004830531305324705,0.004830557229897102,0.004830570451296251,0.004830581174355882,0.004830594302879788,0.9560423013264151,0.004924500826473449]      |
+-----+---------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

TF-IDF

In [28]:
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
print rescaledData.take(1)

# Trains a LDA model.
lda = LDA(k=2, maxIter=10)
model = lda.fit(rescaledData)

ll = model.logLikelihood(rescaledData)
lp = model.logPerplexity(rescaledData)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

# Describe topics.
topics = model.describeTopics(2)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the result
transformed = model.transform(rescaledData)
transformed.show(1)
[Row(label=0.0, sentence=u'Hi I heard about Spark', words=[u'hi', u'i', u'heard', u'about', u'spark'], rawFeatures=SparseVector(20, {0: 1.0, 5: 1.0, 9: 1.0, 17: 2.0}), features=SparseVector(20, {0: 0.6931, 5: 0.6931, 9: 0.2877, 17: 1.3863}))]
The lower bound on the log likelihood of the entire corpus: -37.6225456281
The upper bound on perplexity: 4.40556309703
The topics described by their top-weighted terms:
+-----+-----------+-------------------------------------------+
|topic|termIndices|termWeights                                |
+-----+-----------+-------------------------------------------+
|0    |[17, 15]   |[0.05823196670532641, 0.05751424724780337] |
|1    |[0, 3]     |[0.062056681029749566, 0.05575172335627538]|
+-----+-----------+-------------------------------------------+

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|label|            sentence|               words|         rawFeatures|            features|   topicDistribution|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|  0.0|Hi I heard about ...|[hi, i, heard, ab...|(20,[0,5,9,17],[1...|(20,[0,5,9,17],[0...|[0.27399693051219...|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 1 row

Another LDA

In [29]:
from pyspark.sql import Row
from pyspark.ml.clustering import LDA
from pyspark.ml.linalg import SparseVector

ls_row = [Row(label=1.0, features=SparseVector(20, {1: 0.0, 2: 0.0, 3: 0.0, 4: 0.5754, 5: 0.5754, 6: 0.0, 7: 0.2877, 8: 0.2877, 9: 0.0, 10: 0.0, 11: 0.5754, 13: 0.0, 15: 0.0, 16: 0.0, 17: 0.0, 19: 0.0})),
 Row(label=1.0, features=SparseVector(20, {0: 1.1507, 1: 0.0, 2: 0.0, 3: 0.0, 5: 0.5754, 6: 0.0, 7: 0.5754, 8: 0.5754, 9: 0.0, 10: 0.0, 11: 0.2877, 12: 0.5754, 13: 0.0, 14: 0.5754, 15: 0.0, 16: 0.0, 17: 0.0, 18: 0.863, 19: 0.0})),
 Row(label=1.0, features=SparseVector(20, {0: 0.863, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.5754, 6: 0.0, 9: 0.0, 10: 0.0, 12: 0.2877, 13: 0.0, 14: 0.5754, 15: 0.0, 16: 0.0, 17: 0.0, 18: 0.863, 19: 0.0}))]

rescaledData = sc.parallelize(ls_row).toDF()

# Trains a LDA model.
lda = LDA(k=2, maxIter=10)
model = lda.fit(rescaledData)

ll = model.logLikelihood(rescaledData)
lp = model.logPerplexity(rescaledData)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

# Describe topics.
topics = model.describeTopics(2)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the result
transformed = model.transform(rescaledData)
transformed.take(1)
The lower bound on the log likelihood of the entire corpus: -44.9571717581
The upper bound on perplexity: 4.22351179184
The topics described by their top-weighted terms:
+-----+-----------+-------------------------------------------+
|topic|termIndices|termWeights                                |
+-----+-----------+-------------------------------------------+
|0    |[17, 15]   |[0.05823196670532641, 0.05751424724780337] |
|1    |[0, 3]     |[0.062056681029749566, 0.05575172335627538]|
+-----+-----------+-------------------------------------------+

Out[29]:
[Row(features=SparseVector(20, {1: 0.0, 2: 0.0, 3: 0.0, 4: 0.5754, 5: 0.5754, 6: 0.0, 7: 0.2877, 8: 0.2877, 9: 0.0, 10: 0.0, 11: 0.5754, 13: 0.0, 15: 0.0, 16: 0.0, 17: 0.0, 19: 0.0}), label=1.0, topicDistribution=DenseVector([0.3716, 0.6284]))]