# Examples¶

## Example 1 - Total Revenue¶

Ever written SQL like this?

In [1]:
db = "split-apply-combine_resources/Chinook_Sqlite.sqlite"
import sqlite3
conn = sqlite3.connect(db)
curs = conn.cursor()

print curs.execute("SELECT SUM(UnitPrice*Quantity) FROM InvoiceLine;").fetchone()

(2328.599999999957,)


Or Python code like this?

In [2]:
total_revenue = 0
for row in curs.execute("SELECT * FROM InvoiceLine;"):
InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity = row
total_revenue += UnitPrice*Quantity
print total_revenue

2328.6


## Example 2 - Revenue per track¶

Or how about some SQL with a GROUP BY?

In [3]:
sql = ("SELECT TrackId, sum(UnitPrice*Quantity) "
"FROM InvoiceLine "
"GROUP BY TrackId "
"ORDER BY SUM(UnitPrice*Quantity) DESC, TrackId DESC limit 3;")
for row in curs.execute(sql):
print row

(3250, 3.98)
(3223, 3.98)
(3214, 3.98)


Or Python where you use a dictionary to group items?

In [4]:
track_revenues = {}
for row in curs.execute("SELECT * FROM InvoiceLine;"):
InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity = row
invoice_revenue = UnitPrice*Quantity
track_revenues[TrackId] = track_revenues.get(TrackId, 0) + invoice_revenue

for track, revenue in sorted(track_revenues.iteritems(),
key=lambda item: (item[1], item[0]), reverse=True)[:3]:
print track, revenue

3250 3.98
3223 3.98
3214 3.98


# The Split-Apply-Combine Pattern¶

Hadley Wickham, the man who revolutionized R

If you don’t spend much of your time coding in the open-source statistical programming language R, his name is likely not familiar to you -- but the statistician Hadley Wickham is, in his own words, “nerd famous.” The kind of famous where people at statistics conferences line up for selfies, ask him for autographs, and are generally in awe of him. “It’s utterly utterly bizarre,” he admits. “To be famous for writing R programs? It’s just crazy.”

In [5]:
from IPython.display import HTML
HTML('<iframe src="http://www.jstatsoft.org/v40/i01/paper" width=800 height=400></iframe>')

Out[5]:

## The Basic Pattern¶

1. Split the data by some grouping variable
2. Apply some function to each group independently
3. Combine the data into some output dataset

### Example 2 - revisited¶

In [6]:
track_revenues = {}
for row in curs.execute("SELECT * FROM InvoiceLine;"):
InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity = row
invoice_revenue = UnitPrice*Quantity
track_revenues[TrackId] = track_revenues.get(TrackId, 0) + invoice_revenue

print sorted(track_revenues.iteritems(), key=lambda item: (item[1], item[0]), reverse=True)[:3]

[(3250, 3.98), (3223, 3.98), (3214, 3.98)]

In [7]:
def calc_revenue(row):
InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity = row
return UnitPrice*Quantity

In [8]:
track_revenues = {}
for row in curs.execute("SELECT * FROM InvoiceLine;"):
track_revenues[row[2]] = track_revenues.get(row[2], 0) + calc_revenue(row)

print sorted(track_revenues.iteritems(), key=lambda item: (item[1], item[0]), reverse=True)[:3]

[(3250, 3.98), (3223, 3.98), (3214, 3.98)]


# Pandas - Python Data Analysis Library¶

• Provides high-performance, easy-to-use data structures and data analysis tools.
• My default tool for interactive data analysis.
• Provides core data structures Series, DataFrame and Panel (although the latter are largely obviated by MultiIndexed DataFrames)

### pandas.Series¶

• Basically "labelled arrays"
• Combines dict and numpy.array interfaces
• numpy.array performance
In [9]:
import pandas as pd
s1 = pd.Series(dict(apples=1, potatoes=2))
s1

Out[9]:
apples      1
potatoes    2
dtype: int64
In [10]:
s2 = pd.Series(dict(oranges=3, potatoes=4))
print s1+s2

apples     NaN
oranges    NaN
potatoes     6
dtype: float64

In [11]:
print s1.add(s2, fill_value=0)

apples      1
oranges     3
potatoes    6
dtype: float64


### pandas.DataFrame¶

• Basically in-memory database tables
• Can have columns of different dtypes
• Indexed rows and columns
• Hierarchical indexing allows for representing Panel data (pandas.MultiIndex)
In [12]:
df = pd.DataFrame(dict(s1=s1, s2=s2))
df

Out[12]:
s1 s2
apples 1 NaN
oranges NaN 3
potatoes 2 4
In [13]:
print df.index
print df.columns

Index([u'apples', u'oranges', u'potatoes'], dtype='object')
Index([u's1', u's2'], dtype='object')


### Pandas Data Analysis¶

In [14]:
df

Out[14]:
s1 s2
apples 1 NaN
oranges NaN 3
potatoes 2 4
In [15]:
df.sum()

Out[15]:
s1    3
s2    7
dtype: float64
In [16]:
df.sum(axis=1)

Out[16]:
apples      1
oranges     3
potatoes    6
dtype: float64
In [17]:
df.stack()

Out[17]:
apples    s1    1
oranges   s2    3
potatoes  s1    2
s2    4
dtype: float64
In [18]:
print df.stack().index

MultiIndex(levels=[[u'apples', u'oranges', u'potatoes'], [u's1', u's2']],
labels=[[0, 1, 2, 2], [0, 1, 0, 1]])


### Split-Apply-Combine in Pandas¶

• Uses groupby to
• split the data into groups based on some criteria
• apply a function on each group independently
• combining the results into a data structure
• The apply step is usually one of
• aggregate
• transform
• or filter
In [19]:
HTML('<iframe src="http://pandas.pydata.org/pandas-docs/version/0.16.2/groupby.html" width=800 height=300></iframe>')

Out[19]:

### Example 2 - using Pandas¶

In [20]:
df = pd.read_sql('SELECT * FROM InvoiceLine', conn)

In [21]:
df['Revenue'] = df['UnitPrice']*df['Quantity']
track_revenues = df.groupby('TrackId')['Revenue'].sum()
track_revenues.sort(ascending=False)
print track_revenues[:3]

TrackId
3250    3.98
3223    3.98
3177    3.98
Name: Revenue, dtype: float64

In [22]:
print track_revenues.reset_index().sort(columns=['Revenue', 'TrackId'], ascending=False).set_index('TrackId')[:3]

         Revenue
TrackId
3250        3.98
3223        3.98
3214        3.98


Great for interactive work:

• tab-completion!
• df.head(), df.tail()
• df.describe()

However ...

# MapReduce¶

• If you want to process Big Data, you need some MapReduce framework like one of the following

The key to these frameworks is adopting a functional [programming] mindset. In Python this means, think iterators!

See The Structure and Interpretation of Computer Programs (the "Wizard book")

Luckily, the Split-Apply-Combine pattern is well suited to this!

## Example 1 - revisited¶

In [23]:
total_revenue = 0
for row in curs.execute("SELECT * FROM InvoiceLine;"):
InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity = row
total_revenue += UnitPrice*Quantity
total_revenue

Out[23]:
2328.599999999957
In [24]:
reduce(lambda x,y: x+y, map(calc_revenue, curs.execute("SELECT * FROM InvoiceLine;")))

Out[24]:
2328.599999999957
In [25]:
sum(calc_revenue(row) for row in curs.execute("SELECT * FROM InvoiceLine;"))

Out[25]:
2328.599999999957

There is an itertools.groupby function in the standard library.

However

• it requires the data to be sorted,
• returns iterables which are shared with the original iterable.

Hence I find that I usually need to consult the documentation to use it correctly.

Use the toolz library rather!

In [26]:
HTML('<iframe src="https://docs.python.org/2/library/itertools.html#itertools.groupby" width=800 height=200></iframe>')

Out[26]:

# PyToolz¶

In [27]:
HTML('<iframe src="https://toolz.readthedocs.org/en/latest/index.html" width=800 height=400></iframe>')

Out[27]:

## Example 2 - revisited¶

In [28]:
track_revenues = {}
for row in curs.execute("SELECT * FROM InvoiceLine;"):
track_revenues[row[2]] = track_revenues.get(row[2], 0) + calc_revenue(row)

print sorted(track_revenues.iteritems(), key=lambda item: (item[1], item[0]), reverse=True)[:3]

[(3250, 3.98), (3223, 3.98), (3214, 3.98)]

In [29]:
from toolz import groupby, valmap
sorted(valmap(sum,
valmap(lambda lst: map(calc_revenue, lst),
groupby(lambda row: row[2],
curs.execute("SELECT * FROM InvoiceLine")))
).iteritems()
, key=lambda t: (t[1], t[0]), reverse=True)[:3]

Out[29]:
[(3250, 3.98), (3223, 3.98), (3214, 3.98)]
In [30]:
from toolz.curried import pipe, groupby, valmap, map, get
pipe(curs.execute("SELECT * FROM InvoiceLine"),
groupby(get(2)),
valmap(map(calc_revenue)),
valmap(sum),
lambda track_revenues: sorted(track_revenues.iteritems(), key=lambda t: (t[1], t[0]), reverse=True)[:3]
)

Out[30]:
[(3250, 3.98), (3223, 3.98), (3214, 3.98)]
In [31]:
HTML('<iframe src="https://toolz.readthedocs.org/en/latest/streaming-analytics.html#streaming-split-apply-combine" width=800 height=300></iframe>')

Out[31]:
In [32]:
from toolz.curried import reduceby
pipe(curs.execute("SELECT * FROM InvoiceLine"),
reduceby(get(2),
lambda track_revenue, row: track_revenue + calc_revenue(row),
init=0
),
lambda track_revenues: sorted(track_revenues.iteritems(), key=lambda t: (t[1], t[0]), reverse=True)[:3]
)

Out[32]:
[(3250, 3.98), (3223, 3.98), (3214, 3.98)]

## toolz example - multiprocessing¶

In [33]:
import glob
files = glob.glob('C:/ARGO/ARGO/notebooks/articles/github_archive/*')
print len(files), files[:3]
N = len(files)    # 10

745 ['C:/ARGO/ARGO/notebooks/articles/github_archive\\2015-01-01-0.json.gz', 'C:/ARGO/ARGO/notebooks/articles/github_archive\\2015-01-01-1.json.gz', 'C:/ARGO/ARGO/notebooks/articles/github_archive\\2015-01-01-10.json.gz']

In [34]:
def count_types(filename):
import gzip
import json
from collections import Counter
try:
with gzip.open(filename) as f:
return dict(Counter(json.loads(line)['type'] for line in f))
except Exception, e:
print "Error in {!r}: {}".format(filename, e)
return {}
print count_types(files[0])

{u'ReleaseEvent': 24, u'PublicEvent': 2, u'PullRequestReviewCommentEvent': 85, u'ForkEvent': 213, u'MemberEvent': 16, u'PullRequestEvent': 315, u'IssueCommentEvent': 650, u'PushEvent': 4280, u'DeleteEvent': 141, u'CommitCommentEvent': 56, u'WatchEvent': 642, u'IssuesEvent': 373, u'CreateEvent': 815, u'GollumEvent': 90}

In [35]:
from collections import Counter
def update_counts(total_counts, file_counts):
total_counts.update(file_counts)

In [36]:
%%time
pmap = map
print reduce(update_counts,
pmap(count_types, files[:N]),
Counter())

Error in 'C:/ARGO/ARGO/notebooks/articles/github_archive\\2015-01-07-5.json.gz': CRC check failed 0xc9b8b241L != 0x3fdb8691L
Error in 'C:/ARGO/ARGO/notebooks/articles/github_archive\\tmp': [Errno 13] Permission denied: 'C:/ARGO/ARGO/notebooks/articles/github_archive\\tmp'
Counter({u'PushEvent': 7021744, u'CreateEvent': 1649671, u'IssueCommentEvent': 1321440, u'WatchEvent': 1319860, u'IssuesEvent': 692702, u'PullRequestEvent': 680308, u'ForkEvent': 490633, u'DeleteEvent': 256818, u'PullRequestReviewCommentEvent': 214188, u'GollumEvent': 150744, u'CommitCommentEvent': 96389, u'MemberEvent': 69718, u'ReleaseEvent': 44292, u'PublicEvent': 14596})
Wall time: 17min 54s

In [37]:
%%time
from IPython.parallel import Client
p = Client()[:]
pmap = p.map_sync
print reduce(update_counts,
pmap(count_types, files[:N]),
Counter())

Counter({u'PushEvent': 7021744, u'CreateEvent': 1649671, u'IssueCommentEvent': 1321440, u'WatchEvent': 1319860, u'IssuesEvent': 692702, u'PullRequestEvent': 680308, u'ForkEvent': 490633, u'DeleteEvent': 256818, u'PullRequestReviewCommentEvent': 214188, u'GollumEvent': 150744, u'CommitCommentEvent': 96389, u'MemberEvent': 69718, u'ReleaseEvent': 44292, u'PublicEvent': 14596})
Wall time: 4min 14s


# Next time¶

## Blaze¶

HTML('<iframe src="http://spark.apache.org/docs/latest/api/python/pyspark.html" width=800 height=400></iframe>')