import pandas as pd
import numpy as np
import dask.dataframe as dd
import matplotlib.pyplot as plt
import seaborn as sns
Script to download the data: download.py
# %load download.py
"""
Download taxi data from S3 to local
"""
from pathlib import Path
import sys
import argparse
import s3fs
from distributed import Client, wait
def parse_args(args=None):
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('-s', '--scheduler', default=None,
help='Scheduler address')
return parser.parse_args(args)
def fetch(key):
fs = s3fs.S3FileSystem(anon=True)
dest = Path('data').joinpath(Path(key).name)
dest.parent.mkdir(exist_ok=True)
fs.get(key, str(dest))
return key
def main(args=None):
args = parse_args(args)
client = Client(args.scheduler)
keys = [
f'nyc-tlc/trip data/yellow_tripdata_2009-{m:0>2}.csv'
for m in range(1, 13)
]
results = client.map(fetch, keys)
wait(results)
if __name__ == '__main__':
sys.exit(main())
pd.options.display.max_rows = 10
%matplotlib inline
ls -lh data/*.csv
-rw-r--r-- 1 taugspurger staff 2.4G Sep 9 06:14 data/yellow_tripdata_2009-01.csv -rw-r--r-- 1 taugspurger staff 2.2G Sep 9 10:56 data/yellow_tripdata_2009-02.csv -rw-r--r-- 1 taugspurger staff 2.4G Sep 9 11:03 data/yellow_tripdata_2009-03.csv -rw-r--r-- 1 taugspurger staff 2.4G Sep 9 11:10 data/yellow_tripdata_2009-04.csv -rw-r--r-- 1 taugspurger staff 2.5G Sep 9 11:17 data/yellow_tripdata_2009-05.csv -rw-r--r-- 1 taugspurger staff 2.4G Sep 9 11:23 data/yellow_tripdata_2009-06.csv -rw-r--r-- 1 taugspurger staff 2.3G Sep 9 11:30 data/yellow_tripdata_2009-07.csv -rw-r--r-- 1 taugspurger staff 2.3G Sep 9 11:36 data/yellow_tripdata_2009-08.csv -rw-r--r-- 1 taugspurger staff 2.4G Sep 9 11:44 data/yellow_tripdata_2009-09.csv -rw-r--r-- 1 taugspurger staff 2.6G Sep 9 11:52 data/yellow_tripdata_2009-10.csv -rw-r--r-- 1 taugspurger staff 2.4G Sep 9 11:59 data/yellow_tripdata_2009-11.csv -rw-r--r-- 1 taugspurger staff 2.5G Sep 9 12:07 data/yellow_tripdata_2009-12.csv
Load the first DataFrame
into memory.
%%time
dtype = {
'vendor_name': 'category',
'Payment_Type': 'category',
}
df = pd.read_csv("data/yellow_tripdata_2009-01.csv", dtype=dtype,
parse_dates=['Trip_Pickup_DateTime', 'Trip_Dropoff_DateTime'],)
CPU times: user 1min 6s, sys: 9.81 s, total: 1min 16s Wall time: 1min 17s
df.head()
vendor_name | Trip_Pickup_DateTime | Trip_Dropoff_DateTime | Passenger_Count | Trip_Distance | Start_Lon | Start_Lat | Rate_Code | store_and_forward | End_Lon | End_Lat | Payment_Type | Fare_Amt | surcharge | mta_tax | Tip_Amt | Tolls_Amt | Total_Amt | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | VTS | 2009-01-04 02:52:00 | 2009-01-04 03:02:00 | 1 | 2.63 | -73.991957 | 40.721567 | NaN | NaN | -73.993803 | 40.695922 | CASH | 8.9 | 0.5 | NaN | 0.00 | 0.0 | 9.40 |
1 | VTS | 2009-01-04 03:31:00 | 2009-01-04 03:38:00 | 3 | 4.55 | -73.982102 | 40.736290 | NaN | NaN | -73.955850 | 40.768030 | Credit | 12.1 | 0.5 | NaN | 2.00 | 0.0 | 14.60 |
2 | VTS | 2009-01-03 15:43:00 | 2009-01-03 15:57:00 | 5 | 10.35 | -74.002587 | 40.739748 | NaN | NaN | -73.869983 | 40.770225 | Credit | 23.7 | 0.0 | NaN | 4.74 | 0.0 | 28.44 |
3 | DDS | 2009-01-01 20:52:58 | 2009-01-01 21:14:00 | 1 | 5.00 | -73.974267 | 40.790955 | NaN | NaN | -73.996558 | 40.731849 | CREDIT | 14.9 | 0.5 | NaN | 3.05 | 0.0 | 18.45 |
4 | DDS | 2009-01-24 16:18:23 | 2009-01-24 16:24:56 | 1 | 0.40 | -74.001580 | 40.719382 | NaN | NaN | -74.008378 | 40.720350 | CASH | 3.7 | 0.0 | NaN | 0.00 | 0.0 | 3.70 |
Let's predict whether or not the person tips.
X = df.drop("Tip_Amt", axis=1)
y = df['Tip_Amt'] > 0
We're in-memory, so all this is normal.
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y)
len(X_train)
10569309
len(X_test)
3523104
I notice that there are some minor differences in the spelling on "Payment Type":
df.Payment_Type.cat.categories
Index(['CASH', 'CREDIT', 'Cash', 'Credit', 'Dispute', 'No Charge'], dtype='object')
We'll consolidate those by just lower-casing them:
df.Payment_Type.str.lower()
0 cash 1 credit 2 credit 3 credit 4 cash ... 14092408 cash 14092409 credit 14092410 cash 14092411 cash 14092412 credit Name: Payment_Type, Length: 14092413, dtype: object
And since we're good sci-kittens, we'll package all this up in a pipeline.
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import FunctionTransformer
class ColumnSelector(TransformerMixin):
"Select `columns` from `X`"
def __init__(self, columns):
self.columns = columns
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
return X[self.columns]
class HourExtractor(TransformerMixin):
"Transform each datetime64 column in `columns` to integer hours"
def __init__(self, columns):
self.columns = columns
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
return X.assign(**{col: lambda x: x[col].dt.hour for col in self.columns})
def payment_lowerer(X):
"""Lowercase all the Payment_Type values"""
return X.assign(Payment_Type=X.Payment_Type.str.lower())
class CategoricalEncoder(TransformerMixin):
"""Convert to Categorical with specific `categories`"""
def __init__(self, categories):
self.categories = categories
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
for col, categories in self.categories.items():
X[col] = X[col].astype('category').cat.set_categories(categories)
return X
class StandardScaler(TransformerMixin):
"Scale a subset of the columns in a DataFrame"
def __init__(self, columns):
self.columns = columns
def fit(self, X, y=None):
self.μs = X[self.columns].mean()
self.σs = X[self.columns].std()
return self
def transform(self, X, y=None):
X = X.copy()
X[self.columns] = X[self.columns].sub(self.μs).div(self.σs)
return X
# The columns at the start of the pipeline
columns = ['vendor_name', 'Trip_Pickup_DateTime',
'Passenger_Count', 'Trip_Distance',
'Payment_Type', 'Fare_Amt', 'surcharge']
# The mapping of {column: set of categories}
categories = {
'vendor_name': ['CMT', 'DDS', 'VTS'],
'Payment_Type': ['cash', 'credit', 'dispute', 'no charge'],
}
scale = ['Trip_Distance', 'Fare_Amt', 'surcharge']
pipe = make_pipeline(
ColumnSelector(columns),
HourExtractor(['Trip_Pickup_DateTime']),
FunctionTransformer(payment_lowerer, validate=False),
CategoricalEncoder(categories),
FunctionTransformer(pd.get_dummies, validate=False),
StandardScaler(scale),
LogisticRegression(),
)
pipe
Pipeline(memory=None, steps=[('columnselector', <__main__.ColumnSelector object at 0x1561b2dd8>), ('hourextractor', <__main__.HourExtractor object at 0x1561b2278>), ('functiontransformer-1', FunctionTransformer(accept_sparse=False, func=<function payment_lowerer at 0x1f3508e18>, inv_kw_args=None, inve...ty='l2', random_state=None, solver='liblinear', tol=0.0001, verbose=0, warm_start=False))])
pipe.steps
[('columnselector', <__main__.ColumnSelector at 0x1561b2dd8>), ('hourextractor', <__main__.HourExtractor at 0x1561b2278>), ('functiontransformer-1', FunctionTransformer(accept_sparse=False, func=<function payment_lowerer at 0x1f3508e18>, inv_kw_args=None, inverse_func=None, kw_args=None, pass_y='deprecated', validate=False)), ('categoricalencoder', <__main__.CategoricalEncoder at 0x1561b2668>), ('functiontransformer-2', FunctionTransformer(accept_sparse=False, func=<function get_dummies at 0x111a19e18>, inv_kw_args=None, inverse_func=None, kw_args=None, pass_y='deprecated', validate=False)), ('standardscaler', <__main__.StandardScaler at 0x1561b2198>), ('logisticregression', LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True, intercept_scaling=1, max_iter=100, multi_class='ovr', n_jobs=1, penalty='l2', random_state=None, solver='liblinear', tol=0.0001, verbose=0, warm_start=False))]
%time pipe.fit(X_train, y_train)
CPU times: user 59.6 s, sys: 5.81 s, total: 1min 5s Wall time: 1min 7s
Pipeline(memory=None, steps=[('columnselector', <__main__.ColumnSelector object at 0x1561b2dd8>), ('hourextractor', <__main__.HourExtractor object at 0x1561b2278>), ('functiontransformer-1', FunctionTransformer(accept_sparse=False, func=<function payment_lowerer at 0x1f3508e18>, inv_kw_args=None, inve...ty='l2', random_state=None, solver='liblinear', tol=0.0001, verbose=0, warm_start=False))])
pipe.score(X_train, y_train)
0.99314373342666018
pipe.score(X_test, y_test)
0.99316284730737436
def mkpipe():
pipe = make_pipeline(
ColumnSelector(columns),
HourExtractor(['Trip_Pickup_DateTime']),
FunctionTransformer(payment_lowerer, validate=False),
CategoricalEncoder(categories),
FunctionTransformer(pd.get_dummies, validate=False),
StandardScaler(scale),
LogisticRegression(),
)
return pipe
import dask.dataframe as dd
%%time
df = dd.read_csv("data/*.csv", dtype=dtype,
parse_dates=['Trip_Pickup_DateTime', 'Trip_Dropoff_DateTime'],)
X = df.drop("Tip_Amt", axis=1)
y = df['Tip_Amt'] > 0
Since the scikit-learn world isn't really "dask-aware" at the moment, we'll use the map_partitions
method. This is a good escape hatch for dealing with non-daskified code.
yhat = X.map_partitions(lambda x: pd.Series(pipe.predict_proba(x)[:, 1], name='yhat'),
meta=('yhat', 'f8'))
%time yhat.to_frame().to_parquet("data/predictions.parq")
CPU times: user 17min 52s, sys: 2min 35s, total: 20min 27s Wall time: 8min 49s