import startspark
import pyspark
sc = startspark.create_spark_instance() # run once
The key abstraction of spark is *Resilient Distributed Dataset (RDD)*, which represents a distributed collection of items.
acted
to calcualte valuestransformed
to new RDDstextfile = sc.textFile("startspark.py") ## construction
print textfile.count() ## action - number of items (lines)
print textfile.first() ## action - first item
22 import os, sys
## filter to transform the RDD to a new RDD
lineswithspark = textfile.filter(lambda line: "spark" in line)
print lineswithspark.first()
print lineswithspark.count()
SPARK_HOME = path.abspath("/home/dola/opt/spark-1.1.1/") 9
## combination and action and transformation can do a lot of things
## e.g., find the line with most words in a file - use reduce to find max
## map is a transform, reduce is an action
textfile.map(lambda line: len(line.split())).reduce(lambda a, b: a if a>=b else b)
7
## word count - "hello world" map reduce example
## unlike reduce, reduceByKey generates another RDD
word_counts = sc.textFile("README.md").flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
print word_counts.collect()
[(u'and', 2), (u'useful', 1), (u'is', 1), (u'am', 2), (u'not', 1), (u'But', 1), (u'learning', 2), (u'go', 1), (u'plan', 1), (u'spark', 1), (u'are', 1), (u'for', 1), (u'how', 1), (u'with', 1), (u'least', 1), (u'machine', 1), (u'to', 2), (u'collections', 1), (u'tutorials', 1), (u'tasks.', 1), (u'include', 1), (u'(impyla).', 1), (u'sure', 1), (u'that', 1), (u'I', 3), (u'some', 2), (u'here', 1), (u'framework', 1), (u'preparing', 1), (u'across.', 1), (u'The', 1), (u'Impala', 1), (u'a', 1), (u'articles', 1), (u'about', 1), (u'this', 1), (u'of', 1), (u'yet.', 1), (u'at', 1), (u'came', 1), (u'Blaze', 1)]
Cluster-wide in-memory cache is one of the biggest selling point of spark. It is generally required for hot data that will be continousely accessed in an iterative algorithm.
## it is easy to cache a RDD
textfile.cache() ## side effect instead of returning a new RDD
textfile.count()
22
When working with Spark, we can pass Python functions to Spark, which are automatically serialized along with any variables that they reference. For applications that use custom classes or third-party libraries, we can also add code dependencies to spark-submit
through its --py-files
argument by packaging them into a .zip file (see spark-submit --help
for details). For example,
"""SimpleApp.py"""
from pyspark import SparkContext
logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
submit the python standalone to spark
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--master local[4] \
SimpleApp.py
...
Lines with a: 46, Lines with b: 23
As in the above example, in practice, when running on a cluster, you will not want to hardcode master in the program, but rather launch the application with spark-submit
and receive it there. However, for local testing and unit tests, you can pass “local” to run Spark in-process.
## parallelize an object in driver program to form an RDD
import numpy as np
data = np.arange(10) ## not just python list!
para_data = sc.parallelize(data)
para_data.count()
!rm -fR data/temp-data/
para_data.saveAsPickleFile("data/temp-data", )
print sc.pickleFile("data/temp-data/").collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
## several ways of load files
!cat data/a.txt
!cat data/b.txt
allInOne = sc.textFile("data/*.txt")
print allInOne.count()
nameToFiles = sc.wholeTextFiles("data/*.txt")
print nameToFiles.count()
print nameToFiles.collect()
hello this is a hello this is b 4 2 [(u'/home/dola/workspace/dola/tutorials/learn-spark/data/a.txt', u'hello\nthis is a\n'), (u'/home/dola/workspace/dola/tutorials/learn-spark/data/b.txt', u'hello\nthis is b\n')]
Shared Variables
Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method.
It is based on the discussion on stackoverflow
import random
import time
N = 12500000
def sample(p):
x, y = random.random(), random.random()
return 1 if x*x + y*y < 1 else 0
## The following code will probably run on a single core, because spark implicitly partition
## it to one partition
tic = time.time()
count = sc.parallelize(xrange(0, N)).map(sample).reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / N)
print 'time: %g s' % (time.time() - tic)
Pi is roughly 3.141940 time: 9.78296 s
## It can be made better by explicitly partitioning the data into multi parts.
## But it will still have to generate the huge range in the driver thread - with single core
ncores = 16
tic = time.time()
count = sc.parallelize(xrange(0, N), ncores).map(sample).reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / N)
print 'time: %g s' % (time.time() - tic)
Pi is roughly 3.141770 time: 9.82372 s
## so a better solution is to partition the data separately
N = 12500000
part = 16
tic = time.time()
count = ( sc.parallelize([None] * part, part)
.flatMap(lambda blah: [sample(p) for p in xrange( N/part)])
.reduce(lambda a, b: a + b)
)
print "Pi is roughly %f" % (4.0 * count / N)
print 'time: %g s' % (time.time() - tic)
Pi is roughly 3.141796 time: 6.60225 s