The Split-Apply-Combine Pattern in Data Science and Python

Tobias Brandt

"data science" vs "data analysis"

Examples

Chinook Database

Example 1 - Total Revenue

Ever written SQL like this?

In [1]:
db = "split-apply-combine_resources/Chinook_Sqlite.sqlite"
import sqlite3
conn = sqlite3.connect(db)
curs = conn.cursor()

print curs.execute("SELECT SUM(UnitPrice*Quantity) FROM InvoiceLine;").fetchone()
(2328.599999999957,)

Or Python code like this?

In [2]:
total_revenue = 0
for row in curs.execute("SELECT * FROM InvoiceLine;"):
    InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity = row
    total_revenue += UnitPrice*Quantity
print total_revenue
2328.6

Example 2 - Revenue per track

Or how about some SQL with a GROUP BY?

In [3]:
sql = ("SELECT TrackId, sum(UnitPrice*Quantity) "
        "FROM InvoiceLine "
        "GROUP BY TrackId "
        "ORDER BY SUM(UnitPrice*Quantity) DESC, TrackId DESC limit 3;")
for row in curs.execute(sql):
    print row
(3250, 3.98)
(3223, 3.98)
(3214, 3.98)

Or Python where you use a dictionary to group items?

In [4]:
track_revenues = {}
for row in curs.execute("SELECT * FROM InvoiceLine;"):
    InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity = row
    invoice_revenue = UnitPrice*Quantity
    track_revenues[TrackId] = track_revenues.get(TrackId, 0) + invoice_revenue
    
for track, revenue in sorted(track_revenues.iteritems(),
                             key=lambda item: (item[1], item[0]), reverse=True)[:3]:
    print track, revenue
3250 3.98
3223 3.98
3214 3.98

The Split-Apply-Combine Pattern

Hadley Wickham

Hadley Wickham, the man who revolutionized R

If you don’t spend much of your time coding in the open-source statistical programming language R, his name is likely not familiar to you -- but the statistician Hadley Wickham is, in his own words, “nerd famous.” The kind of famous where people at statistics conferences line up for selfies, ask him for autographs, and are generally in awe of him. “It’s utterly utterly bizarre,” he admits. “To be famous for writing R programs? It’s just crazy.”

In [5]:
from IPython.display import HTML
HTML('<iframe src="http://www.jstatsoft.org/v40/i01/paper" width=800 height=400></iframe>')
Out[5]:

The Basic Pattern

  1. Split the data by some grouping variable
  2. Apply some function to each group independently
  3. Combine the data into some output dataset

Example 2 - revisited

In [6]:
track_revenues = {}
for row in curs.execute("SELECT * FROM InvoiceLine;"):
    InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity = row
    invoice_revenue = UnitPrice*Quantity
    track_revenues[TrackId] = track_revenues.get(TrackId, 0) + invoice_revenue
    
print sorted(track_revenues.iteritems(), key=lambda item: (item[1], item[0]), reverse=True)[:3]
[(3250, 3.98), (3223, 3.98), (3214, 3.98)]
In [7]:
def calc_revenue(row):
    InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity = row
    return UnitPrice*Quantity
In [8]:
track_revenues = {}
for row in curs.execute("SELECT * FROM InvoiceLine;"):
    track_revenues[row[2]] = track_revenues.get(row[2], 0) + calc_revenue(row)
    
print sorted(track_revenues.iteritems(), key=lambda item: (item[1], item[0]), reverse=True)[:3]
[(3250, 3.98), (3223, 3.98), (3214, 3.98)]

Pandas - Python Data Analysis Library

  • Provides high-performance, easy-to-use data structures and data analysis tools.
  • My default tool for interactive data analysis.
  • Provides core data structures Series, DataFrame and Panel (although the latter are largely obviated by MultiIndexed DataFrames)

pandas.Series

  • Basically "labelled arrays"
  • Combines dict and numpy.array interfaces
  • numpy.array performance
In [9]:
import pandas as pd
s1 = pd.Series(dict(apples=1, potatoes=2))
s1
Out[9]:
apples      1
potatoes    2
dtype: int64
In [10]:
s2 = pd.Series(dict(oranges=3, potatoes=4))
print s1+s2
apples     NaN
oranges    NaN
potatoes     6
dtype: float64
In [11]:
print s1.add(s2, fill_value=0)
apples      1
oranges     3
potatoes    6
dtype: float64

pandas.DataFrame

  • Basically in-memory database tables
  • Can have columns of different dtypes
  • Indexed rows and columns
  • Hierarchical indexing allows for representing Panel data (pandas.MultiIndex)
In [12]:
df = pd.DataFrame(dict(s1=s1, s2=s2))
df
Out[12]:
s1 s2
apples 1 NaN
oranges NaN 3
potatoes 2 4
In [13]:
print df.index
print df.columns
Index([u'apples', u'oranges', u'potatoes'], dtype='object')
Index([u's1', u's2'], dtype='object')

Pandas Data Analysis

In [14]:
df
Out[14]:
s1 s2
apples 1 NaN
oranges NaN 3
potatoes 2 4
In [15]:
df.sum()
Out[15]:
s1    3
s2    7
dtype: float64
In [16]:
df.sum(axis=1)
Out[16]:
apples      1
oranges     3
potatoes    6
dtype: float64
In [17]:
df.stack()
Out[17]:
apples    s1    1
oranges   s2    3
potatoes  s1    2
          s2    4
dtype: float64
In [18]:
print df.stack().index
MultiIndex(levels=[[u'apples', u'oranges', u'potatoes'], [u's1', u's2']],
           labels=[[0, 1, 2, 2], [0, 1, 0, 1]])

Split-Apply-Combine in Pandas

  • Uses groupby to
    • split the data into groups based on some criteria
    • apply a function on each group independently
    • combining the results into a data structure
  • The apply step is usually one of
    • aggregate
    • transform
    • or filter
In [19]:
HTML('<iframe src="http://pandas.pydata.org/pandas-docs/version/0.16.2/groupby.html" width=800 height=300></iframe>')
Out[19]:

Example 2 - using Pandas

In [20]:
df = pd.read_sql('SELECT * FROM InvoiceLine', conn)
In [21]:
df['Revenue'] = df['UnitPrice']*df['Quantity']
track_revenues = df.groupby('TrackId')['Revenue'].sum()
track_revenues.sort(ascending=False)
print track_revenues[:3]
TrackId
3250    3.98
3223    3.98
3177    3.98
Name: Revenue, dtype: float64
In [22]:
print track_revenues.reset_index().sort(columns=['Revenue', 'TrackId'], ascending=False).set_index('TrackId')[:3]
         Revenue
TrackId         
3250        3.98
3223        3.98
3214        3.98

Great for interactive work:

  • tab-completion!
  • df.head(), df.tail()
  • df.describe()

However ...

Pandas currently only handles in-memory datasets!

Does my data look big in this?

MapReduce

  • If you want to process Big Data, you need some MapReduce framework like one of the following

The key to these frameworks is adopting a functional [programming] mindset. In Python this means, think iterators!

See The Structure and Interpretation of Computer Programs (the "Wizard book")

Luckily, the Split-Apply-Combine pattern is well suited to this!

Example 1 - revisited

In [23]:
total_revenue = 0
for row in curs.execute("SELECT * FROM InvoiceLine;"):
    InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity = row
    total_revenue += UnitPrice*Quantity
total_revenue
Out[23]:
2328.599999999957
In [24]:
reduce(lambda x,y: x+y, map(calc_revenue, curs.execute("SELECT * FROM InvoiceLine;")))
Out[24]:
2328.599999999957
In [25]:
sum(calc_revenue(row) for row in curs.execute("SELECT * FROM InvoiceLine;"))
Out[25]:
2328.599999999957

What about group by operations?

There is an itertools.groupby function in the standard library.

However

  • it requires the data to be sorted,
  • returns iterables which are shared with the original iterable.

Hence I find that I usually need to consult the documentation to use it correctly.

Use the toolz library rather!

In [26]:
HTML('<iframe src="https://docs.python.org/2/library/itertools.html#itertools.groupby" width=800 height=200></iframe>')
Out[26]:

PyToolz

In [27]:
HTML('<iframe src="https://toolz.readthedocs.org/en/latest/index.html" width=800 height=400></iframe>')
Out[27]:

Example 2 - revisited

In [28]:
track_revenues = {}
for row in curs.execute("SELECT * FROM InvoiceLine;"):
    track_revenues[row[2]] = track_revenues.get(row[2], 0) + calc_revenue(row)
    
print sorted(track_revenues.iteritems(), key=lambda item: (item[1], item[0]), reverse=True)[:3]
[(3250, 3.98), (3223, 3.98), (3214, 3.98)]
In [29]:
from toolz import groupby, valmap
sorted(valmap(sum,
              valmap(lambda lst: map(calc_revenue, lst),
                     groupby(lambda row: row[2],
                             curs.execute("SELECT * FROM InvoiceLine")))
             ).iteritems()
       , key=lambda t: (t[1], t[0]), reverse=True)[:3]
Out[29]:
[(3250, 3.98), (3223, 3.98), (3214, 3.98)]
In [30]:
from toolz.curried import pipe, groupby, valmap, map, get
pipe(curs.execute("SELECT * FROM InvoiceLine"),
     groupby(get(2)),
     valmap(map(calc_revenue)),
     valmap(sum),
     lambda track_revenues: sorted(track_revenues.iteritems(), key=lambda t: (t[1], t[0]), reverse=True)[:3]
     )
Out[30]:
[(3250, 3.98), (3223, 3.98), (3214, 3.98)]
In [31]:
HTML('<iframe src="https://toolz.readthedocs.org/en/latest/streaming-analytics.html#streaming-split-apply-combine" width=800 height=300></iframe>')
Out[31]:
In [32]:
from toolz.curried import reduceby
pipe(curs.execute("SELECT * FROM InvoiceLine"),
     reduceby(get(2),
              lambda track_revenue, row: track_revenue + calc_revenue(row),
              init=0
             ),
     lambda track_revenues: sorted(track_revenues.iteritems(), key=lambda t: (t[1], t[0]), reverse=True)[:3]
     )
Out[32]:
[(3250, 3.98), (3223, 3.98), (3214, 3.98)]

toolz example - multiprocessing

In [33]:
import glob
files = glob.glob('C:/ARGO/ARGO/notebooks/articles/github_archive/*')
print len(files), files[:3]
N = len(files)    # 10
745 ['C:/ARGO/ARGO/notebooks/articles/github_archive\\2015-01-01-0.json.gz', 'C:/ARGO/ARGO/notebooks/articles/github_archive\\2015-01-01-1.json.gz', 'C:/ARGO/ARGO/notebooks/articles/github_archive\\2015-01-01-10.json.gz']
In [34]:
def count_types(filename):
    import gzip
    import json
    from collections import Counter
    try:
        with gzip.open(filename) as f:
            return dict(Counter(json.loads(line)['type'] for line in f))
    except Exception, e:
        print "Error in {!r}: {}".format(filename, e)
        return {}
print count_types(files[0])
{u'ReleaseEvent': 24, u'PublicEvent': 2, u'PullRequestReviewCommentEvent': 85, u'ForkEvent': 213, u'MemberEvent': 16, u'PullRequestEvent': 315, u'IssueCommentEvent': 650, u'PushEvent': 4280, u'DeleteEvent': 141, u'CommitCommentEvent': 56, u'WatchEvent': 642, u'IssuesEvent': 373, u'CreateEvent': 815, u'GollumEvent': 90}
In [35]:
from collections import Counter
def update_counts(total_counts, file_counts):
    total_counts.update(file_counts)
    return total_counts
In [36]:
%%time
pmap = map
print reduce(update_counts,
             pmap(count_types, files[:N]),
             Counter())
Error in 'C:/ARGO/ARGO/notebooks/articles/github_archive\\2015-01-07-5.json.gz': CRC check failed 0xc9b8b241L != 0x3fdb8691L
Error in 'C:/ARGO/ARGO/notebooks/articles/github_archive\\tmp': [Errno 13] Permission denied: 'C:/ARGO/ARGO/notebooks/articles/github_archive\\tmp'
Counter({u'PushEvent': 7021744, u'CreateEvent': 1649671, u'IssueCommentEvent': 1321440, u'WatchEvent': 1319860, u'IssuesEvent': 692702, u'PullRequestEvent': 680308, u'ForkEvent': 490633, u'DeleteEvent': 256818, u'PullRequestReviewCommentEvent': 214188, u'GollumEvent': 150744, u'CommitCommentEvent': 96389, u'MemberEvent': 69718, u'ReleaseEvent': 44292, u'PublicEvent': 14596})
Wall time: 17min 54s
In [37]:
%%time
from IPython.parallel import Client
p = Client()[:]
pmap = p.map_sync
print reduce(update_counts,
             pmap(count_types, files[:N]),
             Counter())
Counter({u'PushEvent': 7021744, u'CreateEvent': 1649671, u'IssueCommentEvent': 1321440, u'WatchEvent': 1319860, u'IssuesEvent': 692702, u'PullRequestEvent': 680308, u'ForkEvent': 490633, u'DeleteEvent': 256818, u'PullRequestReviewCommentEvent': 214188, u'GollumEvent': 150744, u'CommitCommentEvent': 96389, u'MemberEvent': 69718, u'ReleaseEvent': 44292, u'PublicEvent': 14596})
Wall time: 4min 14s

Next time

Blaze

Dask

In [38]:
HTML('<iframe src="http://spark.apache.org/docs/latest/api/python/pyspark.html" width=800 height=400></iframe>')
Out[38]:

Thank you!

If this stuff interests you, let's chat!