# import libraries
import csv, time
import numpy as np
import pandas as pd
from pyspark import StorageLevel
# Read mock data
mock_data = []
with open("data/MOCK_DATA.csv") as mock_file:
mock_data = mock_file.readlines()
# randomize index
num_range = len(mock_data) - 1
sample_items = 100
num_iterations = 100000
# generate Big Data
with open("data/big_data.csv", "w") as big_data:
# write header
writer = csv.writer(big_data)
writer.writerow(mock_data[0].strip().split(","))
# write lines by random sampling 100 items over 1000 lines
next_line = 1
for i in range(0, num_iterations):
lines = []
choices = np.random.choice(num_range, sample_items)
for idx in choices:
line = mock_data[idx].strip().split(",")
line[0] = next_line
lines.append(line)
next_line += 1
writer.writerows(lines)
print "Done generating."
Done generating.
!ls data/
MOCK_DATA.csv sample_kmeans_data.txt big_data.csv sample_lda_libsvm_data.txt
!du -hs data/big_data.csv
638M data/big_data.csv
df = sqlContext.read \
.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("data/big_data.csv")
df.write.parquet("data/big_data_parquet", mode="overwrite")
print "Store parquet completed."
Store parquet completed.
!du -hs data/big_data_parquet
89M data/big_data_parquet
start = time.time()
with open("data/big_data.csv", "r") as big_data:
# read csv file
reader = csv.reader(big_data)
header = reader.next()
num_row = 0
for row in reader:
num_row += 1
print "Numer of rows:", num_row
done = time.time()
elapsed = done - start
print "Total running time:", elapsed, "seconds"
Numer of rows: 10000000 Total running time: 11.6682560444 seconds
start = time.time()
pd_big_data = pd.read_csv("data/big_data.csv")
pd_big_data["id"].count()
done = time.time()
elapsed = done - start
print "Total running time:", elapsed, "seconds"
Total running time: 9.40140104294 seconds
# loading csv to RDD
lines = sc.textFile("data/big_data.csv")
parts = lines.map(lambda l: l.split(","))
rdd_people = parts.map(lambda p: (p[0], p[1], p[2], p[3], p[4], p[5].strip()))
rdd_people.count()
10000001
# loading csv to Dataframe
df_people = sqlContext.read \
.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("data/big_data.csv")
df_people.count()
10000000
# loading parquet to Dataframe
df_people = sqlContext.read.parquet("data/big_data_parquet")
df_people.count()
10000000
# RDD cache
rdd_people.cache()
rdd_people.count()
10000001
# disk only
rdd_people.unpersist()
rdd_people.persist(storageLevel=StorageLevel(True, False, False, False, 1))
rdd_people.count()
10000001
rdd_people.count()
10000001
# Dataframe cache
df_people.cache()
df_people.count()
10000000
df_people.count()
10000000
# unpersist all
rdd_people.unpersist()
df_people.unpersist()
DataFrame[id: int, first_name: string, last_name: string, email: string, gender: string, ip_address: string]
# RDD cache
rdd_people.cache()
rdd_people.count()
10000001
# Dataframe cache
df_people.cache()
df_people.count()
10000000
rdd_people.distinct().count()
10000001
df_people.distinct().count()
10000000
rdd_people.filter(lambda x: x[4] == "Male").count()
5160545
df_people.filter(df_people["gender"] == "Male").count()
5160545
df_people.filter(df_people["gender"] == "Male")\
.select(df_people["first_name"], df_people["email"]\
.alias("mail")).show(5)
+----------+--------------------+ |first_name| mail| +----------+--------------------+ | Linoel| lcobden7g@hp.com| | Ripley|rchiplenhp@barnes...| | Rooney|rchesworth7e@utex...| | Lovell|lfellgatep8@umich...| | Cordell|cduplain4a@artist...| +----------+--------------------+ only showing top 5 rows
sqlContext.registerDataFrameAsTable(df_people, "tbl_people")
sqlContext.sql("""
SELECT first_name, email AS mail
FROM tbl_people
WHERE gender = 'Male'
""").show(5)
+----------+--------------------+ |first_name| mail| +----------+--------------------+ | Linoel| lcobden7g@hp.com| | Ripley|rchiplenhp@barnes...| | Rooney|rchesworth7e@utex...| | Lovell|lfellgatep8@umich...| | Cordell|cduplain4a@artist...| +----------+--------------------+ only showing top 5 rows
from pyspark.ml.clustering import KMeans
# Loads data.
dataset = spark.read.format("libsvm").load("data/sample_kmeans_data.txt")
print dataset.take(2)
# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)
# Evaluate clustering by computing Within Set Sum of Squared Errors.
wssse = model.computeCost(dataset)
print("Within Set Sum of Squared Errors = " + str(wssse))
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
print(center)
[Row(label=0.0, features=SparseVector(3, {})), Row(label=1.0, features=SparseVector(3, {0: 0.1, 1: 0.1, 2: 0.1}))] Within Set Sum of Squared Errors = 0.12 Cluster Centers: [ 0.1 0.1 0.1] [ 9.1 9.1 9.1]
from pyspark.ml.clustering import LDA
# Loads data.
dataset = spark.read.format("libsvm").load("data/sample_lda_libsvm_data.txt")
print dataset.take(1)
# Trains a LDA model.
lda = LDA(k=10, maxIter=10)
model = lda.fit(dataset)
ll = model.logLikelihood(dataset)
lp = model.logPerplexity(dataset)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))
# Describe topics.
topics = model.describeTopics(2)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)
# Shows the result
transformed = model.transform(dataset)
transformed.show(truncate=False)
[Row(label=0.0, features=SparseVector(11, {0: 1.0, 1: 2.0, 2: 6.0, 4: 2.0, 5: 3.0, 6: 1.0, 7: 1.0, 10: 3.0}))] The lower bound on the log likelihood of the entire corpus: -807.523779034 The upper bound on perplexity: 3.1058607098 The topics described by their top-weighted terms: +-----+-----------+------------------------------------------+ |topic|termIndices|termWeights | +-----+-----------+------------------------------------------+ |0 |[4, 7] |[0.10782279117289076, 0.09748059781126188]| |1 |[1, 6] |[0.16755680545542245, 0.14746675160950057]| |2 |[1, 3] |[0.10064404940528088, 0.10044227953257671]| |3 |[1, 3] |[0.10157580719995081, 0.0997449393879735] | |4 |[9, 10] |[0.10479880814180582, 0.10207371063193371]| |5 |[8, 5] |[0.10843493258130431, 0.09701505371078402]| |6 |[8, 5] |[0.09874157104646761, 0.09654281855423051]| |7 |[9, 4] |[0.1125248473532763, 0.09755082892584456] | |8 |[5, 4] |[0.1548707489550002, 0.14842696182913703] | |9 |[3, 10] |[0.23809170139198915, 0.10412979928333384]| +-----+-----------+------------------------------------------+ +-----+---------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |label|features |topicDistribution | +-----+---------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |0.0 |(11,[0,1,2,4,5,6,7,10],[1.0,2.0,6.0,2.0,3.0,1.0,1.0,3.0]) |[0.004830687376549748,0.9563377837515517,0.004830654392031491,0.004830691997365709,0.004830667868178218,0.004830690208597917,0.0048307249800948774,0.004830673837555869,0.004922723634004233,0.004924701954070474] | |1.0 |(11,[0,1,3,4,7,10],[1.0,3.0,1.0,3.0,2.0,1.0]) |[0.008057899175270963,0.008707406312071347,0.008058025956346778,0.008058095756917977,0.008057857450925326,0.00805788610035481,0.008057863295169719,0.008057922472956825,0.9266708229866527,0.008216220493333638] | |2.0 |(11,[0,1,2,5,6,8,9],[1.0,4.0,1.0,4.0,9.0,1.0,2.0]) |[0.004199740220179303,0.9620401573867432,0.004199832021596322,0.004199768006758077,0.0041998008830341094,0.0041998188779895085,0.004199829669347301,0.004199780879768142,0.004279810976748579,0.004281461077835567] | |3.0 |(11,[0,1,3,6,8,9,10],[2.0,1.0,3.0,5.0,2.0,3.0,9.0]) |[0.0037148977019415986,0.5171649236275588,0.0037149262092321953,0.003714916859012322,0.0037150043988300675,0.003714951767438243,0.0037149809617586303,0.003714886247016027,0.0037853064740571217,0.4530452057531551] | |4.0 |(11,[0,1,2,3,4,6,9,10],[3.0,1.0,1.0,9.0,3.0,2.0,1.0,3.0]) |[0.004024698590879546,0.00434854915127394,0.004024734924719693,0.004024704186015809,0.004024729103561261,0.004024699645915288,0.004024728350735192,0.004024732738832953,0.004101250338655413,0.9633771729694108] | |5.0 |(11,[0,1,3,4,5,6,7,8,9],[4.0,2.0,3.0,4.0,5.0,1.0,1.0,1.0,4.0]) |[0.0037149196791821672,0.004014144755782942,0.0037150238353502916,0.0037149350730528103,0.003714984985771291,0.0037149612383395674,0.003714972648035956,0.0037150076003473935,0.7875949774197026,0.18238607276443486] | |6.0 |(11,[0,1,3,6,8,9,10],[2.0,1.0,3.0,5.0,2.0,2.0,9.0]) |[0.0038636384437946217,0.4588911758399325,0.0038636648639211735,0.00386366180952517,0.0038637468045085977,0.0038636989406232065,0.003863727465055344,0.003863616485805724,0.00393686849521447,0.5101262008516194] | |7.0 |(11,[0,1,2,3,4,5,6,9,10],[1.0,1.0,1.0,9.0,2.0,1.0,2.0,1.0,3.0])|[0.004390951192466292,0.004744392472015055,0.004391008613455925,0.004390986297746747,0.004391002598536641,0.004390967203785097,0.004390995209555504,0.004390994508486955,0.00447440696608868,0.960044294937863] | |8.0 |(11,[0,1,3,4,5,6,7],[4.0,4.0,3.0,4.0,2.0,1.0,3.0]) |[0.004391055602376864,0.004744679184009837,0.00439115377848033,0.004391124194262692,0.0043910387568340484,0.0043910790167236875,0.004391072093598296,0.004391105932938527,0.7784515959242768,0.18606609551649894] | |9.0 |(11,[0,1,2,4,6,8,9,10],[2.0,8.0,2.0,3.0,2.0,2.0,7.0,2.0]) |[0.0033302159688284975,0.9698999591198246,0.0033302386018501102,0.0033302022818941343,0.0033302152189945153,0.003330227969565667,0.00333022302764557,0.0033302283560539235,0.0033935016881488007,0.0033949877671941944]| |10.0 |(11,[0,1,2,3,5,6,9,10],[1.0,1.0,1.0,9.0,2.0,2.0,3.0,3.0]) |[0.004199865804144868,0.004538523687990608,0.004199964924904667,0.0041998976831187276,0.004199967375784788,0.004199909609085674,0.004199952755541441,0.004199953028483417,0.004279696097068255,0.9617822690338775] | |11.0 |(11,[0,1,4,5,6,7,9],[4.0,1.0,4.0,5.0,1.0,3.0,1.0]) |[0.004830550832388801,0.005219225116360582,0.004830587434608386,0.004830531305324705,0.004830557229897102,0.004830570451296251,0.004830581174355882,0.004830594302879788,0.9560423013264151,0.004924500826473449] | +-----+---------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
sentenceData = spark.createDataFrame([
(0.0, "Hi I heard about Spark"),
(0.0, "I wish Java could use case classes"),
(1.0, "Logistic regression models are neat")
], ["label", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
print rescaledData.take(1)
# Trains a LDA model.
lda = LDA(k=2, maxIter=10)
model = lda.fit(rescaledData)
ll = model.logLikelihood(rescaledData)
lp = model.logPerplexity(rescaledData)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))
# Describe topics.
topics = model.describeTopics(2)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)
# Shows the result
transformed = model.transform(rescaledData)
transformed.show(1)
[Row(label=0.0, sentence=u'Hi I heard about Spark', words=[u'hi', u'i', u'heard', u'about', u'spark'], rawFeatures=SparseVector(20, {0: 1.0, 5: 1.0, 9: 1.0, 17: 2.0}), features=SparseVector(20, {0: 0.6931, 5: 0.6931, 9: 0.2877, 17: 1.3863}))] The lower bound on the log likelihood of the entire corpus: -37.6225456281 The upper bound on perplexity: 4.40556309703 The topics described by their top-weighted terms: +-----+-----------+-------------------------------------------+ |topic|termIndices|termWeights | +-----+-----------+-------------------------------------------+ |0 |[17, 15] |[0.05823196670532641, 0.05751424724780337] | |1 |[0, 3] |[0.062056681029749566, 0.05575172335627538]| +-----+-----------+-------------------------------------------+ +-----+--------------------+--------------------+--------------------+--------------------+--------------------+ |label| sentence| words| rawFeatures| features| topicDistribution| +-----+--------------------+--------------------+--------------------+--------------------+--------------------+ | 0.0|Hi I heard about ...|[hi, i, heard, ab...|(20,[0,5,9,17],[1...|(20,[0,5,9,17],[0...|[0.27399693051219...| +-----+--------------------+--------------------+--------------------+--------------------+--------------------+ only showing top 1 row
from pyspark.sql import Row
from pyspark.ml.clustering import LDA
from pyspark.ml.linalg import SparseVector
ls_row = [Row(label=1.0, features=SparseVector(20, {1: 0.0, 2: 0.0, 3: 0.0, 4: 0.5754, 5: 0.5754, 6: 0.0, 7: 0.2877, 8: 0.2877, 9: 0.0, 10: 0.0, 11: 0.5754, 13: 0.0, 15: 0.0, 16: 0.0, 17: 0.0, 19: 0.0})),
Row(label=1.0, features=SparseVector(20, {0: 1.1507, 1: 0.0, 2: 0.0, 3: 0.0, 5: 0.5754, 6: 0.0, 7: 0.5754, 8: 0.5754, 9: 0.0, 10: 0.0, 11: 0.2877, 12: 0.5754, 13: 0.0, 14: 0.5754, 15: 0.0, 16: 0.0, 17: 0.0, 18: 0.863, 19: 0.0})),
Row(label=1.0, features=SparseVector(20, {0: 0.863, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.5754, 6: 0.0, 9: 0.0, 10: 0.0, 12: 0.2877, 13: 0.0, 14: 0.5754, 15: 0.0, 16: 0.0, 17: 0.0, 18: 0.863, 19: 0.0}))]
rescaledData = sc.parallelize(ls_row).toDF()
# Trains a LDA model.
lda = LDA(k=2, maxIter=10)
model = lda.fit(rescaledData)
ll = model.logLikelihood(rescaledData)
lp = model.logPerplexity(rescaledData)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))
# Describe topics.
topics = model.describeTopics(2)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)
# Shows the result
transformed = model.transform(rescaledData)
transformed.take(1)
The lower bound on the log likelihood of the entire corpus: -44.9571717581 The upper bound on perplexity: 4.22351179184 The topics described by their top-weighted terms: +-----+-----------+-------------------------------------------+ |topic|termIndices|termWeights | +-----+-----------+-------------------------------------------+ |0 |[17, 15] |[0.05823196670532641, 0.05751424724780337] | |1 |[0, 3] |[0.062056681029749566, 0.05575172335627538]| +-----+-----------+-------------------------------------------+
[Row(features=SparseVector(20, {1: 0.0, 2: 0.0, 3: 0.0, 4: 0.5754, 5: 0.5754, 6: 0.0, 7: 0.2877, 8: 0.2877, 9: 0.0, 10: 0.0, 11: 0.5754, 13: 0.0, 15: 0.0, 16: 0.0, 17: 0.0, 19: 0.0}), label=1.0, topicDistribution=DenseVector([0.3716, 0.6284]))]