Ever written SQL like this?
db = "split-apply-combine_resources/Chinook_Sqlite.sqlite"
import sqlite3
conn = sqlite3.connect(db)
curs = conn.cursor()
print curs.execute("SELECT SUM(UnitPrice*Quantity) FROM InvoiceLine;").fetchone()
(2328.599999999957,)
Or Python code like this?
total_revenue = 0
for row in curs.execute("SELECT * FROM InvoiceLine;"):
InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity = row
total_revenue += UnitPrice*Quantity
print total_revenue
2328.6
Or how about some SQL with a GROUP BY?
sql = ("SELECT TrackId, sum(UnitPrice*Quantity) "
"FROM InvoiceLine "
"GROUP BY TrackId "
"ORDER BY SUM(UnitPrice*Quantity) DESC, TrackId DESC limit 3;")
for row in curs.execute(sql):
print row
(3250, 3.98) (3223, 3.98) (3214, 3.98)
Or Python where you use a dictionary to group items?
track_revenues = {}
for row in curs.execute("SELECT * FROM InvoiceLine;"):
InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity = row
invoice_revenue = UnitPrice*Quantity
track_revenues[TrackId] = track_revenues.get(TrackId, 0) + invoice_revenue
for track, revenue in sorted(track_revenues.iteritems(),
key=lambda item: (item[1], item[0]), reverse=True)[:3]:
print track, revenue
3250 3.98 3223 3.98 3214 3.98
Hadley Wickham, the man who revolutionized R
If you don’t spend much of your time coding in the open-source statistical programming language R, his name is likely not familiar to you -- but the statistician Hadley Wickham is, in his own words, “nerd famous.” The kind of famous where people at statistics conferences line up for selfies, ask him for autographs, and are generally in awe of him. “It’s utterly utterly bizarre,” he admits. “To be famous for writing R programs? It’s just crazy.”
from IPython.display import HTML
HTML('<iframe src="http://www.jstatsoft.org/v40/i01/paper" width=800 height=400></iframe>')
track_revenues = {}
for row in curs.execute("SELECT * FROM InvoiceLine;"):
InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity = row
invoice_revenue = UnitPrice*Quantity
track_revenues[TrackId] = track_revenues.get(TrackId, 0) + invoice_revenue
print sorted(track_revenues.iteritems(), key=lambda item: (item[1], item[0]), reverse=True)[:3]
[(3250, 3.98), (3223, 3.98), (3214, 3.98)]
def calc_revenue(row):
InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity = row
return UnitPrice*Quantity
track_revenues = {}
for row in curs.execute("SELECT * FROM InvoiceLine;"):
track_revenues[row[2]] = track_revenues.get(row[2], 0) + calc_revenue(row)
print sorted(track_revenues.iteritems(), key=lambda item: (item[1], item[0]), reverse=True)[:3]
[(3250, 3.98), (3223, 3.98), (3214, 3.98)]
import pandas as pd
s1 = pd.Series(dict(apples=1, potatoes=2))
s1
apples 1 potatoes 2 dtype: int64
s2 = pd.Series(dict(oranges=3, potatoes=4))
print s1+s2
apples NaN oranges NaN potatoes 6 dtype: float64
print s1.add(s2, fill_value=0)
apples 1 oranges 3 potatoes 6 dtype: float64
df = pd.DataFrame(dict(s1=s1, s2=s2))
df
s1 | s2 | |
---|---|---|
apples | 1 | NaN |
oranges | NaN | 3 |
potatoes | 2 | 4 |
print df.index
print df.columns
Index([u'apples', u'oranges', u'potatoes'], dtype='object') Index([u's1', u's2'], dtype='object')
df
s1 | s2 | |
---|---|---|
apples | 1 | NaN |
oranges | NaN | 3 |
potatoes | 2 | 4 |
df.sum()
s1 3 s2 7 dtype: float64
df.sum(axis=1)
apples 1 oranges 3 potatoes 6 dtype: float64
df.stack()
apples s1 1 oranges s2 3 potatoes s1 2 s2 4 dtype: float64
print df.stack().index
MultiIndex(levels=[[u'apples', u'oranges', u'potatoes'], [u's1', u's2']], labels=[[0, 1, 2, 2], [0, 1, 0, 1]])
HTML('<iframe src="http://pandas.pydata.org/pandas-docs/version/0.16.2/groupby.html" width=800 height=300></iframe>')
df = pd.read_sql('SELECT * FROM InvoiceLine', conn)
df['Revenue'] = df['UnitPrice']*df['Quantity']
track_revenues = df.groupby('TrackId')['Revenue'].sum()
track_revenues.sort(ascending=False)
print track_revenues[:3]
TrackId 3250 3.98 3223 3.98 3177 3.98 Name: Revenue, dtype: float64
print track_revenues.reset_index().sort(columns=['Revenue', 'TrackId'], ascending=False).set_index('TrackId')[:3]
Revenue TrackId 3250 3.98 3223 3.98 3214 3.98
Great for interactive work:
df.head()
, df.tail()
df.describe()
However ...
The key to these frameworks is adopting a functional [programming] mindset. In Python this means, think iterators!
See The Structure and Interpretation of Computer Programs (the "Wizard book")
Luckily, the Split-Apply-Combine pattern is well suited to this!
total_revenue = 0
for row in curs.execute("SELECT * FROM InvoiceLine;"):
InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity = row
total_revenue += UnitPrice*Quantity
total_revenue
2328.599999999957
reduce(lambda x,y: x+y, map(calc_revenue, curs.execute("SELECT * FROM InvoiceLine;")))
2328.599999999957
sum(calc_revenue(row) for row in curs.execute("SELECT * FROM InvoiceLine;"))
2328.599999999957
What about group by operations?
There is an itertools.groupby
function in the standard library.
However
Hence I find that I usually need to consult the documentation to use it correctly.
Use the toolz
library rather!
HTML('<iframe src="https://docs.python.org/2/library/itertools.html#itertools.groupby" width=800 height=200></iframe>')
HTML('<iframe src="https://toolz.readthedocs.org/en/latest/index.html" width=800 height=400></iframe>')
track_revenues = {}
for row in curs.execute("SELECT * FROM InvoiceLine;"):
track_revenues[row[2]] = track_revenues.get(row[2], 0) + calc_revenue(row)
print sorted(track_revenues.iteritems(), key=lambda item: (item[1], item[0]), reverse=True)[:3]
[(3250, 3.98), (3223, 3.98), (3214, 3.98)]
from toolz import groupby, valmap
sorted(valmap(sum,
valmap(lambda lst: map(calc_revenue, lst),
groupby(lambda row: row[2],
curs.execute("SELECT * FROM InvoiceLine")))
).iteritems()
, key=lambda t: (t[1], t[0]), reverse=True)[:3]
[(3250, 3.98), (3223, 3.98), (3214, 3.98)]
from toolz.curried import pipe, groupby, valmap, map, get
pipe(curs.execute("SELECT * FROM InvoiceLine"),
groupby(get(2)),
valmap(map(calc_revenue)),
valmap(sum),
lambda track_revenues: sorted(track_revenues.iteritems(), key=lambda t: (t[1], t[0]), reverse=True)[:3]
)
[(3250, 3.98), (3223, 3.98), (3214, 3.98)]
HTML('<iframe src="https://toolz.readthedocs.org/en/latest/streaming-analytics.html#streaming-split-apply-combine" width=800 height=300></iframe>')
from toolz.curried import reduceby
pipe(curs.execute("SELECT * FROM InvoiceLine"),
reduceby(get(2),
lambda track_revenue, row: track_revenue + calc_revenue(row),
init=0
),
lambda track_revenues: sorted(track_revenues.iteritems(), key=lambda t: (t[1], t[0]), reverse=True)[:3]
)
[(3250, 3.98), (3223, 3.98), (3214, 3.98)]
import glob
files = glob.glob('C:/ARGO/ARGO/notebooks/articles/github_archive/*')
print len(files), files[:3]
N = len(files) # 10
745 ['C:/ARGO/ARGO/notebooks/articles/github_archive\\2015-01-01-0.json.gz', 'C:/ARGO/ARGO/notebooks/articles/github_archive\\2015-01-01-1.json.gz', 'C:/ARGO/ARGO/notebooks/articles/github_archive\\2015-01-01-10.json.gz']
def count_types(filename):
import gzip
import json
from collections import Counter
try:
with gzip.open(filename) as f:
return dict(Counter(json.loads(line)['type'] for line in f))
except Exception, e:
print "Error in {!r}: {}".format(filename, e)
return {}
print count_types(files[0])
{u'ReleaseEvent': 24, u'PublicEvent': 2, u'PullRequestReviewCommentEvent': 85, u'ForkEvent': 213, u'MemberEvent': 16, u'PullRequestEvent': 315, u'IssueCommentEvent': 650, u'PushEvent': 4280, u'DeleteEvent': 141, u'CommitCommentEvent': 56, u'WatchEvent': 642, u'IssuesEvent': 373, u'CreateEvent': 815, u'GollumEvent': 90}
from collections import Counter
def update_counts(total_counts, file_counts):
total_counts.update(file_counts)
return total_counts
%%time
pmap = map
print reduce(update_counts,
pmap(count_types, files[:N]),
Counter())
Error in 'C:/ARGO/ARGO/notebooks/articles/github_archive\\2015-01-07-5.json.gz': CRC check failed 0xc9b8b241L != 0x3fdb8691L Error in 'C:/ARGO/ARGO/notebooks/articles/github_archive\\tmp': [Errno 13] Permission denied: 'C:/ARGO/ARGO/notebooks/articles/github_archive\\tmp' Counter({u'PushEvent': 7021744, u'CreateEvent': 1649671, u'IssueCommentEvent': 1321440, u'WatchEvent': 1319860, u'IssuesEvent': 692702, u'PullRequestEvent': 680308, u'ForkEvent': 490633, u'DeleteEvent': 256818, u'PullRequestReviewCommentEvent': 214188, u'GollumEvent': 150744, u'CommitCommentEvent': 96389, u'MemberEvent': 69718, u'ReleaseEvent': 44292, u'PublicEvent': 14596}) Wall time: 17min 54s
%%time
from IPython.parallel import Client
p = Client()[:]
pmap = p.map_sync
print reduce(update_counts,
pmap(count_types, files[:N]),
Counter())
Counter({u'PushEvent': 7021744, u'CreateEvent': 1649671, u'IssueCommentEvent': 1321440, u'WatchEvent': 1319860, u'IssuesEvent': 692702, u'PullRequestEvent': 680308, u'ForkEvent': 490633, u'DeleteEvent': 256818, u'PullRequestReviewCommentEvent': 214188, u'GollumEvent': 150744, u'CommitCommentEvent': 96389, u'MemberEvent': 69718, u'ReleaseEvent': 44292, u'PublicEvent': 14596}) Wall time: 4min 14s
HTML('<iframe src="http://spark.apache.org/docs/latest/api/python/pyspark.html" width=800 height=400></iframe>')