!python --version
! date
Python 3.4.5 :: Continuum Analytics, Inc. Thu Nov 3 15:00:16 CST 2016
sc
<pyspark.context.SparkContext at 0x10490e978>
sc.master
'local[*]'
import datetime as dt
import time
import pandas as pd, numpy as np
import pprint
import matplotlib.pyplot as plt
import seaborn as sns
%pylab inline
Populating the interactive namespace from numpy and matplotlib
//anaconda/envs/g_dash/lib/python3.4/site-packages/IPython/html.py:14: ShimWarning: The `IPython.html` package has been deprecated. You should import from `notebook` instead. `IPython.html.widgets` has moved to `ipywidgets`. "`IPython.html.widgets` has moved to `ipywidgets`.", ShimWarning)
# create RDD
intRDD = sc.parallelize([6,7,1,2,0])
intRDD2 = sc.parallelize(["apple", "car", "pan"])
type(intRDD)
pyspark.rdd.RDD
intRDD.collect()
[6, 7, 1, 2, 0]
intRDD2.collect()
['apple', 'car', 'pan']
def addone(x):
return (x+1)
# set env variable via export PYSPARK_PYTHON=python3 for setting SPARK run in python 3
intRDD.map(addone).collect()
[7, 8, 2, 3, 1]
# or via lammbda
intRDD.map(lambda x : x + 1 ).collect()
[7, 8, 2, 3, 1]
# work with words RDD
intRDD2.map(lambda x : "object: "+ x ).collect()
['object: apple', 'object: car', 'object: pan']
intRDD.filter(lambda x : x >5).collect()
[6, 7]
intRDD.filter(lambda x : x > 1 & x < 5).collect()
[6, 7, 2]
intRDD2.filter(lambda x : "ar" in x ).collect()
['car']
intRDD3 = sc.parallelize([1,1,2,2,3])
intRDD4 = sc.parallelize(["apple","apple","car", "car", "pan"])
intRDD3.distinct().collect()
[1, 2, 3]
intRDD4.distinct().collect()
['car', 'pan', 'apple']
sRDD = intRDD.randomSplit([0.4,0.6])
sRDD[0].collect()
[7, 1, 2]
sRDD[1].collect()
[6, 0]
gRDD = intRDD.groupBy(lambda x : "even" if (x%2 ==0 ) else "odd").collect()
type(gRDD)
list
print (gRDD)
[('even', <pyspark.resultiterable.ResultIterable object at 0x109bd0e48>), ('odd', <pyspark.resultiterable.ResultIterable object at 0x109bd00b8>)]
gRDD[0][0] , sorted(gRDD[0][1])
('even', [0, 2, 6])
gRDD[1][0] , sorted(gRDD[1][1])
('odd', [1, 7])
intRDD5 = sc.parallelize([3,1,2,2,5])
intRDD6 = sc.parallelize([1,0])
intRDD7 = sc.parallelize([4,5,6])
intRDD5.union(intRDD6).union(intRDD7).collect()
[3, 1, 2, 2, 5, 1, 0, 4, 5, 6]
intRDD5.intersection(intRDD6).collect()
[1]
intRDD5.subtract(intRDD7).collect()
[1, 2, 2, 3]
intRDD = sc.parallelize([6,7,1,2,0])
intRDD.first()
6
intRDD.take(3)
[6, 7, 1]
#take orderly 3 elements
intRDD.takeOrdered(3)
[0, 1, 2]
# first sort RDD orderly, then take first 3 elements
intRDD.takeOrdered(3, key = lambda x: -x)
[7, 6, 2]
print (intRDD.stats())
print (intRDD.min())
print (intRDD.max())
print (intRDD.stdev())
print (intRDD.sum())
print (intRDD.mean())
(count: 5, mean: 3.2, stdev: 2.78567765544, max: 7.0, min: 0.0) 0 7 2.78567765544 16 3.2
kvRDD1 = sc.parallelize([(1,2),(3,6),(5,6),(0,9)])
kvRDD1.collect()
[(1, 2), (3, 6), (5, 6), (0, 9)]
# get keys
kvRDD1.keys().collect()
[1, 3, 5, 0]
# get values
kvRDD1.values().collect()
[2, 6, 6, 9]
kvRDD1.filter(lambda keyvalue : keyvalue[0] < 3 ).collect()
[(1, 2), (0, 9)]
kvRDD1.filter(lambda keyvalue : keyvalue[1] < 3 ).collect()
[(1, 2)]
kvRDD1.sortByKey(ascending=True).collect()
[(0, 9), (1, 2), (3, 6), (5, 6)]
kvRDD2 = sc.parallelize([(1,2),(1,3),(5,6),(3,9),(3,1)])
kvRDD2.collect()
[(1, 2), (1, 3), (5, 6), (3, 9), (3, 1)]
# (1, 2+3) , (5,6), (3, 9+1)
# sum up values of (key,value) with same key
kvRDD2.reduceByKey(lambda x,y : x+y).collect()
[(1, 5), (5, 6), (3, 10)]
kvRDD3 = sc.parallelize([(1,2),(1,3),(5,6),(3,9),(3,1)])
kvRDD4 = sc.parallelize([(3,8)])
# key - value RDD join
kvRDD3.join(kvRDD4).collect()
[(3, (9, 8)), (3, (1, 8))]
# key - value left OUTER RDD join
kvRDD3.leftOuterJoin(kvRDD4).collect()
[(1, (2, None)), (1, (3, None)), (3, (9, 8)), (3, (1, 8)), (5, (6, None))]
# key - value left OUTER RDD join
kvRDD3.rightOuterJoin(kvRDD4).collect()
[(3, (9, 8)), (3, (1, 8))]
# key - value subtract by key
kvRDD3.subtractByKey(kvRDD4).collect()
[(1, 2), (1, 3), (5, 6)]
kvRDD3.first()
(1, 2)
kvRDD3.first()[0]
1
kvRDD3.first()[1]
2
kvRDD3.take(2)
[(1, 2), (1, 3)]
kvRDD3.countByKey()
defaultdict(int, {1: 2, 3: 2, 5: 1})
KV = kvRDD3.collectAsMap()
KV
{1: 3, 3: 1, 5: 6}
type(KV)
dict
# key - value lookup
kvRDD3.lookup(5)
[6]
kvFruit = sc.parallelize([(1,"apple"),(2,"banana"),(3,"peach"),(4,"grape")])
fruitMap = kvFruit.collectAsMap()
fruitMap
{1: 'apple', 2: 'banana', 3: 'peach', 4: 'grape'}
# Broadcast "bcFruitMap"
bcFruitMap = sc.broadcast(fruitMap)
fruitIds = sc.parallelize([2,4,1,3])
fruitIds.collect()
[2, 4, 1, 3]
# using broadcast dict
fruitNames = fruitIds.map(lambda x : bcFruitMap.value[x]).collect()
fruitNames
['banana', 'grape', 'apple', 'peach']
# pass
# pass
# fruit is a txt file with data like this :
"""
apple apple orange
banana grape grape
"""
textfile = sc.textFile("fruit.txt")
textfile.collect()
['apple apple orange', 'banana grape grape']
# map
stringRDD = textfile.map(lambda line : line.split(" "))
stringRDD.collect()
[['apple', 'apple', 'orange'], ['banana', 'grape', 'grape']]
# flatmap
stringRDD = textfile.flatMap(lambda line : line.split(" "))
stringRDD.collect()
['apple', 'apple', 'orange', 'banana', 'grape', 'grape']
# key-value pair by Map
stringRDD.map(lambda word : (word,1)).collect()
[('apple', 1), ('apple', 1), ('orange', 1), ('banana', 1), ('grape', 1), ('grape', 1)]
# word count by reduceByKey
stringRDD.map(lambda word : (word,1)).reduceByKey(lambda x,y : x+y).collect()
[('orange', 1), ('banana', 1), ('apple', 2), ('grape', 2)]