Imgur

Introduction to (Py)Spark

![](http://ak.picdn.net/shutterstock/videos/5081630/preview/stock-footage-electrical-spark-and-smoke-between-two-insulated-copper-wires-looped.jpg)

Introduction to (Py)Spark

Apache Spark™ is a fast and general engine for large-scale data processing.

It provides high-level APIs in Java, Scala and Python, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

source

Introduction to (Py)Spark

  • Originally developed at Berkeley's AMPLab in 2009.
  • BSD-ed in 2010.
  • Donated to Apache in 2013.
  • Apache Top-Level Project in 2014.
  • 1.0.0 released in May 2014.
  • Currently on 1.2.0 (released December 2014).
  • Backed by Databricks (databricks.com).

Example

Sum the squares of the integers from 1 to 10.

In [1]:
sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).map(lambda x: x**2).sum()
Out[1]:
385

Example

In [3]:
# Most common words in "THE DEVELOPMENT OF EMBROIDERY IN AMERICA"

rdd = sc.textFile("example.txt")
rdd \
    .flatMap(lambda line: line.split()) \
    .map(lambda word: word.strip().lower()) \
    .filter(lambda word: word not in stopwords) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .map(lambda (key, cnt): (cnt, key)) \
    .top(10)
Out[3]:
[(117, u'upon'),
 (105, u'embroidery'),
 (103, u'one'),
 (101, u'art'),
 (94, u'work'),
 (86, u'made'),
 (84, u'new'),
 (82, u'project'),
 (70, u'embroidered'),
 (62, u'worked')]

Example

These examples is running locally on my laptop.

The data file (example.txt) is loaded into a local Resilient Distributed Dataset (RDD).

If my Spark Context (sc) were created on a Spark cluster, the data would have be partitioned across the worker nodes.

Example

(Py)spark evaluates expressions lazily. "The transformations are only computed when an action requires a result to be returned to the driver program." source

In [8]:
%%time
rdd = sc.parallelize(xrange(10**8)).map(lambda x: float(x) ** 2)
CPU times: user 856 µs, sys: 654 µs, total: 1.51 ms
Wall time: 2.33 ms
In [9]:
%%time 
_ = rdd.count()
CPU times: user 4.85 ms, sys: 1.98 ms, total: 6.83 ms
Wall time: 8.87 s

Spark vs Pyspark?

Spark is written in Scala. The 'native' API is in Scala.

Pyspark is a very lightweight wrapper around the native API. (You can see its implementation here.)

Spark vs Pyspark?

Key difference:

  • Python (unlike Scala) is dynamically typed. (RDDs can hold objects of multiple types!)
  • Pyspark sometimes lags behind Spark in feature releases.

(There's also a Java API in case you really hate life.)

Spark vs Pyspark?

It must be slower, right?

Spark’s core developers have worked extensively to bridge the performance gap between JVM languages and Python.

In particular, PySpark can now run on PyPy to leverage the just-in-time compiler. (Up to 50x speedup)

The way Python processes communicate with the main Spark JVM programs have also been redesigned to enable worker reuse.

source

How is this better than Hadoop?

Major difference:

Spark keep data in worker memory while tends to keep data on disk.

According to the Spark webpage it can run "100x faster than Hadoop by exploiting in memory computing and other optimizations "

How is this better than Hadoop?

Spark officially sets a new record in large-scale sorting

Using Spark on 206 EC2 machines, we sorted 100 TB of data on disk in 23 minutes. In comparison, the previous world record set by Hadoop MapReduce used 2100 machines and took 72 minutes. This means that Spark sorted the same data 3X faster using 10X fewer machines.

source

How is this better than Hadoop?

Also:

RDD is a key development: RDD's provide "immutable resilient distributed collection of records".

Unlike existing storage abstractions for clusters, which require data replication for fault tolerance, RDDs offer an API based on coarsegrained transformations that lets them recover data efficiently using lineage.

See: Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing, Spark: Cluster Computing with Working Sets, Spark Research

How is this better than Hadoop?

Also:

Spark provides 80+ high(er)-level, functional-style operators beyond simple "map" and "reduce". (Not even to mention high-level tools Spark Streaming, Spark SQL, MLib, and GraphX.)

For example:

  • count
  • countApprox
  • flatMap
  • filter
  • flatMap
  • groupBy
  • map
  • reduce
  • reduceByKey
  • sample
  • sortBy
  • union

How is this better than Hadoop?

Native Python Code:

  • Unlike Hive/Pig

No Java:

  • Unlike native Hadoop

High(er)-level operators:

  • Unlike mrjob

Functional style:

Spark imitates Scala’s collections API and functional style, which is a boon to Java and Scala developers, but also somewhat familiar to developers coming from Python. source

pyspark-pictures is a handy help for the Spark API:

rdd1.cartesian(rdd2)

Installing (Py)Spark locally

For Mac users using Homebrew:

$ brew install apache-spark

Install Java SDK

Launching the Pyspark REPL

$ IPYTHON=1 pyspark

You should see:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.2.1
      /_/

Using Python version 2.7.6 (default, Sep  9 2014 15:04:36)
SparkContext available as sc.
>>>

Launching the Pyspark in an IPython notebook

$ IPYTHON_OPTS="notebook --matplotlib inline" pyspark

This creates a special IPython notebook that is initialized with a SparkContext object called sc:

In [9]:
sc
Out[9]:
<pyspark.context.SparkContext at 0x107fd1b50>

These commands will start Pyspark in local mode. As opposed to cluster mode.

The exact same code can be run in local and cluster modes! It just depends on how you initialize your Spark session.

Getting Data

In [10]:
# Load a Python iterable into an RDD

sc.parallelize(range(10))
Out[10]:
ParallelCollectionRDD[16] at parallelize at PythonRDD.scala:364
In [11]:
# Load a text file

sc.textFile("example.txt") # Each line is a separate element in the RDD
Out[11]:
example.txt MappedRDD[18] at textFile at NativeMethodAccessorImpl.java:-2
In [12]:
# Load text files

sc.textFile("example.txt,example2.txt").collect()[-1001:-991]
Out[12]:
[u'least surpass the work sent by the Decorative Art societies of most of',
 u'our American cities.',
 u'',
 u'',
 u'',
 u'',
 u'CHAPTER VII -- AMERICAN TAPESTRY',
 u'',
 u'',
 u'The Society of Decorative Art, has proved itself a means for the']

These can be used to load text files from Amazon S3.

SparkContext.wholeTextFile...

...lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file

SparkContext.newAPIHadoopRDD

PySpark can also read any Hadoop InputFormat or write any Hadoop OutputFormat, for both ‘new’ and ‘old’ Hadoop MapReduce APIs.

For example, Cassandra.

Saving Data

rdd.collect() converts a RDD object into a Python list on the host machine.

rdd.saveAsTextFile() saves an RDD as a string. See also rdd.saveAsPickleFile().

rdd.saveAsNewAPIHadoopDataset() saves an RDD object to a Hadoop data source (e.g. HDFS, Cassandra).

Manipulating Data

Sort last three presidents by last name

In [70]:
rdd = sc.parallelize(["Barack Hussein Obama", "George Walker Bush", "William Jefferson Clinton"])

rdd.sortBy(keyfunc=lambda k: k.split(" ")[-1]).collect()
Out[70]:
['George Walker Bush', 'William Jefferson Clinton', 'Barack Hussein Obama']

Manipulating Data

Join Datasets

In [73]:
rdd1 = sc.parallelize([("a", 1), ("b", 2), ("c", 3)])
rdd2 = sc.parallelize([("a", 6), ("b", 7), ("b", 8), ("d", 9)])
In [74]:
rdd1.join(rdd2).collect()
Out[74]:
[('a', (1, 6)), ('b', (2, 7)), ('b', (2, 8))]
In [75]:
rdd1.fullOuterJoin(rdd2).collect()
Out[75]:
[('a', (1, 6)),
 ('c', (3, None)),
 ('b', (2, 7)),
 ('b', (2, 8)),
 ('d', (None, 9))]

Manipulating Data

Let's load in the Wake County Real Estate Data.

In [80]:
raw_real_estate = sc.textFile("all.txt")
print raw_real_estate.take(1)[0][:250]
FISHER, GEORGE NORMAN                                                 2720 BEDFORD AVE                   RALEIGH NC 27607-7114                                                 000000101 01 001506  WAKE FOREST              RD    RA     01             0

Manipulating Data

In [29]:
wake_county_real_estate = raw_real_estate.map(lambda row: 
        dict(
         owner = row[0:35].strip().title(),
         last_name = row[0:35].strip().title().split(",")[0],
         address = row[70:105].strip().title(), 
         sale_price = int(row[273:(284)].strip() or -1),
         value = int(row[305:(316)].strip() or -1),
         use = int(row[653:656].strip() or -1),
         heated_area = int(row[471:482].strip() or -1),
         year_built = int(row[455:459].strip() or -1),
         height = row[509:510].strip(),
        ))

sqlContext = SQLContext(sc)
schemaWake = sqlContext.inferSchema(wake_county_real_estate.map(lambda d: Row(**d))) \
                       .registerTempTable("wake")

Manipulating Data

Who owns the most expensive church buildings in Raleigh?

In [81]:
pd.DataFrame.from_records(
    sqlContext.sql("""SELECT DISTINCT owner, address, 
                                      year_built, value
        FROM wake 
        WHERE value > 4000000 AND 
              use = 66 AND 
              owner LIKE '%Church%'
        """).collect(),
columns=["Name","Street","Year Built","Value"])
Out[81]:
Name Street Year Built Value
0 Crosspointe Church At Cary 6911 Carpenter Fire Station Rd 2003 6853333
1 Edenton St Methodist Church 228 W Edenton St 2002 6207300
2 Edenton St Methodist Church 228 W Edenton St 1959 6207300
3 First Presbyterian Church 111 W Morgan St 2013 4617350
4 Providence Baptist Church 6339 Glenwood Ave Ste 451 1972 5540832
5 First Presbyterian Church 111 W Morgan St 1987 4617350
6 Tabernacle Baptist Church Of Ral 8304 Leesville Rd 2001 4600500

Manipulating Data

What is the 43rd richest American's house worth?

In [27]:
sqlContext.sql("""SELECT MAX(value) as price 
                  FROM wake 
                  WHERE owner 
                  LIKE 'Goodnight, James H% & Ann B%'
                  GROUP BY last_name
                  """).collect()[0].price
Out[27]:
3996360

(We could have done these same queries with the 'native' Spark functional method chaining.)

Manipulating Data

Again, if you wanted to load terabytes of real estate data from HDFS or S3 (for example), you could run this exact same code on a Spark cluster.

Data Frames! (Coming Soon in 1.3)

Constructs a DataFrame from the users table in Hive.

users = sc.table("users")

Create a new DataFrame that contains “young users” only

young = users.filter(users.age < 21)

Count the number of young users by gender

young.groupBy("gender").count()

Data Frames! (Coming Soon in 1.3)

From JSON files in S3

logs = sc.load("s3n://path/to/data.json", "json")

Join young users with another DataFrame called logs

young.join(logs, logs.userId == users.userId, "left_outer")

source

Data Frames! (Coming Soon in 1.3)

Convert Spark DataFrame to Pandas

pandas_df = young.toPandas()

Create a Spark DataFrame from Pandas

spark_df = context.createDataFrame(pandas_df)

Machine Learning with (Py)Spark

Subset to Apartment Buildings and Office Buildings

In [85]:
subset = wake_county_real_estate.filter(lambda d: 
                                        d["use"] in [7, 34])
subset = subset.filter(lambda d: d["heated_area"] > 0 
              and d["year_built"] > 1900) \
              .map(lambda d: LabeledPoint(
                                1 if d["use"] == 7 else 0, 
                                [d["year_built"], 
                                 d["heated_area"]]))
    
subset.take(2)
Out[85]:
[LabeledPoint(0.0, [1989.0,4008.0]), LabeledPoint(0.0, [1976.0,5426.0])]

Machine Learning with (Py)Spark

In [86]:
(trainingData, testData) = subset.randomSplit([0.7, 0.3])
In [87]:
tree = DecisionTree.trainClassifier(trainingData, 2, categoricalFeaturesInfo={})
In [88]:
predictions = tree.predict(testData.map(lambda x: x.features)) 
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
print 'Test Error = ' + str(testErr)
Test Error = 0.141468682505
In [92]:
# This is much better performance than on a random classifier

labelsAndPredictions = testData.map(lambda lp: (lp.label, choice([0, 1])))
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
print 'Test Error = ' + str(testErr)
Test Error = 0.497840172786

Machine Learning with (Py)Spark

Spark 1.2 brought MLib pipelines to the Scala API.

inspired by the scikit-learn project

(For more on scikit-learn pipelines.)

Machine Learning with (Py)Spark

Scala Pipeline:

val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")
val hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")
val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.01)
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))

val model = pipeline.fit(trainingDataset)

(Python Pipelines are coming in 1.2.)

Future of Spark

Learning More

O'Reilly also has Advanced Analytics with Spark and Learning Spark. They're Scala-focused, but still valuable.

Thank you!

Questions?