Apache Spark™ is a fast and general engine for large-scale data processing.
It provides high-level APIs in Java, Scala and Python, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.
Sum the squares of the integers from 1 to 10.
sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).map(lambda x: x**2).sum()
385
# Most common words in "THE DEVELOPMENT OF EMBROIDERY IN AMERICA"
rdd = sc.textFile("example.txt")
rdd \
.flatMap(lambda line: line.split()) \
.map(lambda word: word.strip().lower()) \
.filter(lambda word: word not in stopwords) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.map(lambda (key, cnt): (cnt, key)) \
.top(10)
[(117, u'upon'), (105, u'embroidery'), (103, u'one'), (101, u'art'), (94, u'work'), (86, u'made'), (84, u'new'), (82, u'project'), (70, u'embroidered'), (62, u'worked')]
These examples is running locally on my laptop.
The data file (example.txt) is loaded into a local Resilient Distributed Dataset (RDD).
If my Spark Context (sc
) were created on a Spark cluster, the data would have be partitioned across the worker nodes.
%%time
rdd = sc.parallelize(xrange(10**8)).map(lambda x: float(x) ** 2)
CPU times: user 856 µs, sys: 654 µs, total: 1.51 ms Wall time: 2.33 ms
%%time
_ = rdd.count()
CPU times: user 4.85 ms, sys: 1.98 ms, total: 6.83 ms Wall time: 8.87 s
Key difference:
(There's also a Java API in case you really hate life.)
It must be slower, right?
Spark’s core developers have worked extensively to bridge the performance gap between JVM languages and Python.
In particular, PySpark can now run on PyPy to leverage the just-in-time compiler. (Up to 50x speedup)
The way Python processes communicate with the main Spark JVM programs have also been redesigned to enable worker reuse.
Major difference:
Spark keep data in worker memory while tends to keep data on disk.
According to the Spark webpage it can run "100x faster than Hadoop by exploiting in memory computing and other optimizations "
Spark officially sets a new record in large-scale sorting¶
Using Spark on 206 EC2 machines, we sorted 100 TB of data on disk in 23 minutes. In comparison, the previous world record set by Hadoop MapReduce used 2100 machines and took 72 minutes. This means that Spark sorted the same data 3X faster using 10X fewer machines.
Also:
RDD is a key development: RDD's provide "immutable resilient distributed collection of records".
Unlike existing storage
abstractions for clusters, which require data replication for fault tolerance, RDDs offer an API based on coarsegrained transformations that lets them recover data efficiently using lineage.
See: Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing, Spark: Cluster Computing with Working Sets, Spark Research
Also:
Spark provides 80+ high(er)-level, functional-style operators beyond simple "map" and "reduce". (Not even to mention high-level tools Spark Streaming, Spark SQL, MLib, and GraphX.)
For example:
Native Python Code:
No Java:
High(er)-level operators:
Functional style:
Spark imitates Scala’s collections API and functional style, which is a boon to Java and Scala developers, but also somewhat familiar to developers coming from Python. source
pyspark-pictures is a handy help for the Spark API:
rdd1.cartesian(rdd2)
$ IPYTHON=1 pyspark
You should see:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.2.1
/_/
Using Python version 2.7.6 (default, Sep 9 2014 15:04:36)
SparkContext available as sc.
>>>
$ IPYTHON_OPTS="notebook --matplotlib inline" pyspark
This creates a special IPython notebook that is initialized with a SparkContext object called sc
:
sc
<pyspark.context.SparkContext at 0x107fd1b50>
(You can also create IPython profiles to automate some of this.)
These commands will start Pyspark in local mode. As opposed to cluster mode.
The exact same code can be run in local and cluster modes! It just depends on how you initialize your Spark session.
# Load a Python iterable into an RDD
sc.parallelize(range(10))
ParallelCollectionRDD[16] at parallelize at PythonRDD.scala:364
# Load a text file
sc.textFile("example.txt") # Each line is a separate element in the RDD
example.txt MappedRDD[18] at textFile at NativeMethodAccessorImpl.java:-2
# Load text files
sc.textFile("example.txt,example2.txt").collect()[-1001:-991]
[u'least surpass the work sent by the Decorative Art societies of most of', u'our American cities.', u'', u'', u'', u'', u'CHAPTER VII -- AMERICAN TAPESTRY', u'', u'', u'The Society of Decorative Art, has proved itself a means for the']
These can be used to load text files from Amazon S3.
SparkContext.wholeTextFile
...
...lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with
textFile
, which would return one record per line in each file
SparkContext.newAPIHadoopRDD
PySpark can also read any Hadoop InputFormat or write any Hadoop OutputFormat, for both ‘new’ and ‘old’ Hadoop MapReduce APIs.
For example, Cassandra.
rdd.collect()
converts a RDD object into a Python list on the host machine.
rdd.saveAsTextFile()
saves an RDD as a string. See also rdd.saveAsPickleFile()
.
rdd.saveAsNewAPIHadoopDataset()
saves an RDD object to a Hadoop data source (e.g. HDFS, Cassandra).
Sort last three presidents by last name
rdd = sc.parallelize(["Barack Hussein Obama", "George Walker Bush", "William Jefferson Clinton"])
rdd.sortBy(keyfunc=lambda k: k.split(" ")[-1]).collect()
['George Walker Bush', 'William Jefferson Clinton', 'Barack Hussein Obama']
Join Datasets
rdd1 = sc.parallelize([("a", 1), ("b", 2), ("c", 3)])
rdd2 = sc.parallelize([("a", 6), ("b", 7), ("b", 8), ("d", 9)])
rdd1.join(rdd2).collect()
[('a', (1, 6)), ('b', (2, 7)), ('b', (2, 8))]
rdd1.fullOuterJoin(rdd2).collect()
[('a', (1, 6)), ('c', (3, None)), ('b', (2, 7)), ('b', (2, 8)), ('d', (None, 9))]
Let's load in the Wake County Real Estate Data.
raw_real_estate = sc.textFile("all.txt")
print raw_real_estate.take(1)[0][:250]
FISHER, GEORGE NORMAN 2720 BEDFORD AVE RALEIGH NC 27607-7114 000000101 01 001506 WAKE FOREST RD RA 01 0
wake_county_real_estate = raw_real_estate.map(lambda row:
dict(
owner = row[0:35].strip().title(),
last_name = row[0:35].strip().title().split(",")[0],
address = row[70:105].strip().title(),
sale_price = int(row[273:(284)].strip() or -1),
value = int(row[305:(316)].strip() or -1),
use = int(row[653:656].strip() or -1),
heated_area = int(row[471:482].strip() or -1),
year_built = int(row[455:459].strip() or -1),
height = row[509:510].strip(),
))
sqlContext = SQLContext(sc)
schemaWake = sqlContext.inferSchema(wake_county_real_estate.map(lambda d: Row(**d))) \
.registerTempTable("wake")
Who owns the most expensive church buildings in Raleigh?
pd.DataFrame.from_records(
sqlContext.sql("""SELECT DISTINCT owner, address,
year_built, value
FROM wake
WHERE value > 4000000 AND
use = 66 AND
owner LIKE '%Church%'
""").collect(),
columns=["Name","Street","Year Built","Value"])
Name | Street | Year Built | Value | |
---|---|---|---|---|
0 | Crosspointe Church At Cary | 6911 Carpenter Fire Station Rd | 2003 | 6853333 |
1 | Edenton St Methodist Church | 228 W Edenton St | 2002 | 6207300 |
2 | Edenton St Methodist Church | 228 W Edenton St | 1959 | 6207300 |
3 | First Presbyterian Church | 111 W Morgan St | 2013 | 4617350 |
4 | Providence Baptist Church | 6339 Glenwood Ave Ste 451 | 1972 | 5540832 |
5 | First Presbyterian Church | 111 W Morgan St | 1987 | 4617350 |
6 | Tabernacle Baptist Church Of Ral | 8304 Leesville Rd | 2001 | 4600500 |
What is the 43rd richest American's house worth?
sqlContext.sql("""SELECT MAX(value) as price
FROM wake
WHERE owner
LIKE 'Goodnight, James H% & Ann B%'
GROUP BY last_name
""").collect()[0].price
3996360
(We could have done these same queries with the 'native' Spark functional method chaining.)
Again, if you wanted to load terabytes of real estate data from HDFS or S3 (for example), you could run this exact same code on a Spark cluster.
Constructs a DataFrame from the users table in Hive.
users = sc.table("users")
Create a new DataFrame that contains “young users” only
young = users.filter(users.age < 21)
Count the number of young users by gender
young.groupBy("gender").count()
From JSON files in S3
logs = sc.load("s3n://path/to/data.json", "json")
Join young users with another DataFrame called logs
young.join(logs, logs.userId == users.userId, "left_outer")
Convert Spark DataFrame to Pandas
pandas_df = young.toPandas()
Create a Spark DataFrame from Pandas
spark_df = context.createDataFrame(pandas_df)
Subset to Apartment Buildings and Office Buildings
subset = wake_county_real_estate.filter(lambda d:
d["use"] in [7, 34])
subset = subset.filter(lambda d: d["heated_area"] > 0
and d["year_built"] > 1900) \
.map(lambda d: LabeledPoint(
1 if d["use"] == 7 else 0,
[d["year_built"],
d["heated_area"]]))
subset.take(2)
[LabeledPoint(0.0, [1989.0,4008.0]), LabeledPoint(0.0, [1976.0,5426.0])]
(trainingData, testData) = subset.randomSplit([0.7, 0.3])
tree = DecisionTree.trainClassifier(trainingData, 2, categoricalFeaturesInfo={})
predictions = tree.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
print 'Test Error = ' + str(testErr)
Test Error = 0.141468682505
# This is much better performance than on a random classifier
labelsAndPredictions = testData.map(lambda lp: (lp.label, choice([0, 1])))
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
print 'Test Error = ' + str(testErr)
Test Error = 0.497840172786
Spark 1.2 brought MLib pipelines to the Scala API.
inspired by the scikit-learn project
(For more on scikit-learn pipelines.)
Scala Pipeline:
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.01)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
val model = pipeline.fit(trainingDataset)
(Python Pipelines are coming in 1.2.)
O'Reilly also has Advanced Analytics with Spark and Learning Spark. They're Scala-focused, but still valuable.