In [4]:
import pandas as pd
import numpy as np
import dask.dataframe as dd

import matplotlib.pyplot as plt
import seaborn as sns

Script to download the data: download.py

In [ ]:
# %load download.py
"""
Download taxi data from S3 to local
"""
from pathlib import Path
import sys
import argparse
import s3fs
from distributed import Client, wait


def parse_args(args=None):
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument('-s', '--scheduler', default=None,
                        help='Scheduler address')
    return parser.parse_args(args)


def fetch(key):
    fs = s3fs.S3FileSystem(anon=True)
    dest = Path('data').joinpath(Path(key).name)
    dest.parent.mkdir(exist_ok=True)
    fs.get(key, str(dest))
    return key


def main(args=None):
    args = parse_args(args)
    client = Client(args.scheduler)
    keys = [
        f'nyc-tlc/trip data/yellow_tripdata_2009-{m:0>2}.csv'
        for m in range(1, 13)
    ]
    results = client.map(fetch, keys)
    wait(results)


if __name__ == '__main__':
    sys.exit(main())
In [5]:
pd.options.display.max_rows = 10
In [6]:
%matplotlib inline
In [4]:
ls -lh data/*.csv
-rw-r--r--  1 taugspurger  staff   2.4G Sep  9 06:14 data/yellow_tripdata_2009-01.csv
-rw-r--r--  1 taugspurger  staff   2.2G Sep  9 10:56 data/yellow_tripdata_2009-02.csv
-rw-r--r--  1 taugspurger  staff   2.4G Sep  9 11:03 data/yellow_tripdata_2009-03.csv
-rw-r--r--  1 taugspurger  staff   2.4G Sep  9 11:10 data/yellow_tripdata_2009-04.csv
-rw-r--r--  1 taugspurger  staff   2.5G Sep  9 11:17 data/yellow_tripdata_2009-05.csv
-rw-r--r--  1 taugspurger  staff   2.4G Sep  9 11:23 data/yellow_tripdata_2009-06.csv
-rw-r--r--  1 taugspurger  staff   2.3G Sep  9 11:30 data/yellow_tripdata_2009-07.csv
-rw-r--r--  1 taugspurger  staff   2.3G Sep  9 11:36 data/yellow_tripdata_2009-08.csv
-rw-r--r--  1 taugspurger  staff   2.4G Sep  9 11:44 data/yellow_tripdata_2009-09.csv
-rw-r--r--  1 taugspurger  staff   2.6G Sep  9 11:52 data/yellow_tripdata_2009-10.csv
-rw-r--r--  1 taugspurger  staff   2.4G Sep  9 11:59 data/yellow_tripdata_2009-11.csv
-rw-r--r--  1 taugspurger  staff   2.5G Sep  9 12:07 data/yellow_tripdata_2009-12.csv

Load the first DataFrame into memory.

In [5]:
%%time
dtype = {
    'vendor_name': 'category',
    'Payment_Type': 'category',
}

df = pd.read_csv("data/yellow_tripdata_2009-01.csv", dtype=dtype,
                 parse_dates=['Trip_Pickup_DateTime', 'Trip_Dropoff_DateTime'],)
CPU times: user 1min 6s, sys: 9.81 s, total: 1min 16s
Wall time: 1min 17s
In [6]:
df.head()
Out[6]:
vendor_name Trip_Pickup_DateTime Trip_Dropoff_DateTime Passenger_Count Trip_Distance Start_Lon Start_Lat Rate_Code store_and_forward End_Lon End_Lat Payment_Type Fare_Amt surcharge mta_tax Tip_Amt Tolls_Amt Total_Amt
0 VTS 2009-01-04 02:52:00 2009-01-04 03:02:00 1 2.63 -73.991957 40.721567 NaN NaN -73.993803 40.695922 CASH 8.9 0.5 NaN 0.00 0.0 9.40
1 VTS 2009-01-04 03:31:00 2009-01-04 03:38:00 3 4.55 -73.982102 40.736290 NaN NaN -73.955850 40.768030 Credit 12.1 0.5 NaN 2.00 0.0 14.60
2 VTS 2009-01-03 15:43:00 2009-01-03 15:57:00 5 10.35 -74.002587 40.739748 NaN NaN -73.869983 40.770225 Credit 23.7 0.0 NaN 4.74 0.0 28.44
3 DDS 2009-01-01 20:52:58 2009-01-01 21:14:00 1 5.00 -73.974267 40.790955 NaN NaN -73.996558 40.731849 CREDIT 14.9 0.5 NaN 3.05 0.0 18.45
4 DDS 2009-01-24 16:18:23 2009-01-24 16:24:56 1 0.40 -74.001580 40.719382 NaN NaN -74.008378 40.720350 CASH 3.7 0.0 NaN 0.00 0.0 3.70

Let's predict whether or not the person tips.

In [7]:
X = df.drop("Tip_Amt", axis=1)
y = df['Tip_Amt'] > 0

We're in-memory, so all this is normal.

In [8]:
from sklearn.model_selection import train_test_split
In [9]:
X_train, X_test, y_train, y_test = train_test_split(X, y)
In [12]:
len(X_train)
Out[12]:
10569309
In [13]:
len(X_test)
Out[13]:
3523104

I notice that there are some minor differences in the spelling on "Payment Type":

In [10]:
df.Payment_Type.cat.categories
Out[10]:
Index(['CASH', 'CREDIT', 'Cash', 'Credit', 'Dispute', 'No Charge'], dtype='object')

We'll consolidate those by just lower-casing them:

In [11]:
df.Payment_Type.str.lower()
Out[11]:
0             cash
1           credit
2           credit
3           credit
4             cash
             ...  
14092408      cash
14092409    credit
14092410      cash
14092411      cash
14092412    credit
Name: Payment_Type, Length: 14092413, dtype: object

And since we're good sci-kittens, we'll package all this up in a pipeline.

In [18]:
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import FunctionTransformer
In [19]:
class ColumnSelector(TransformerMixin):
    "Select `columns` from `X`"
    def __init__(self, columns):
        self.columns = columns

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        return X[self.columns]
    

class HourExtractor(TransformerMixin):
    "Transform each datetime64 column in `columns` to integer hours"
    def __init__(self, columns):
        self.columns = columns

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        return X.assign(**{col: lambda x: x[col].dt.hour for col in self.columns})


def payment_lowerer(X):
    """Lowercase all the Payment_Type values"""
    return X.assign(Payment_Type=X.Payment_Type.str.lower())


class CategoricalEncoder(TransformerMixin):
    """Convert to Categorical with specific `categories`"""
    def __init__(self, categories):
        self.categories = categories
        
    def fit(self, X, y=None):
        return self
        
    def transform(self, X, y=None):
        for col, categories in self.categories.items():
            X[col] = X[col].astype('category').cat.set_categories(categories)
        return X
    
class StandardScaler(TransformerMixin):
    "Scale a subset of the columns in a DataFrame"
    def __init__(self, columns):
        self.columns = columns
    
    def fit(self, X, y=None):
        self.μs = X[self.columns].mean()
        self.σs = X[self.columns].std()
        return self

    def transform(self, X, y=None):
        X = X.copy()
        X[self.columns] = X[self.columns].sub(self.μs).div(self.σs)
        return X
In [20]:
# The columns at the start of the pipeline
columns = ['vendor_name', 'Trip_Pickup_DateTime',
           'Passenger_Count', 'Trip_Distance',
           'Payment_Type', 'Fare_Amt', 'surcharge']

# The mapping of {column: set of categories}
categories = {
    'vendor_name': ['CMT', 'DDS', 'VTS'],
    'Payment_Type': ['cash', 'credit', 'dispute', 'no charge'],
}

scale = ['Trip_Distance', 'Fare_Amt', 'surcharge']

pipe = make_pipeline(
    ColumnSelector(columns),
    HourExtractor(['Trip_Pickup_DateTime']),
    FunctionTransformer(payment_lowerer, validate=False),
    CategoricalEncoder(categories),
    FunctionTransformer(pd.get_dummies, validate=False),
    StandardScaler(scale),
    LogisticRegression(),
)
pipe
Out[20]:
Pipeline(memory=None,
     steps=[('columnselector', <__main__.ColumnSelector object at 0x1561b2dd8>), ('hourextractor', <__main__.HourExtractor object at 0x1561b2278>), ('functiontransformer-1', FunctionTransformer(accept_sparse=False,
          func=<function payment_lowerer at 0x1f3508e18>, inv_kw_args=None,
          inve...ty='l2', random_state=None, solver='liblinear', tol=0.0001,
          verbose=0, warm_start=False))])
In [21]:
pipe.steps
Out[21]:
[('columnselector', <__main__.ColumnSelector at 0x1561b2dd8>),
 ('hourextractor', <__main__.HourExtractor at 0x1561b2278>),
 ('functiontransformer-1', FunctionTransformer(accept_sparse=False,
            func=<function payment_lowerer at 0x1f3508e18>, inv_kw_args=None,
            inverse_func=None, kw_args=None, pass_y='deprecated',
            validate=False)),
 ('categoricalencoder', <__main__.CategoricalEncoder at 0x1561b2668>),
 ('functiontransformer-2', FunctionTransformer(accept_sparse=False,
            func=<function get_dummies at 0x111a19e18>, inv_kw_args=None,
            inverse_func=None, kw_args=None, pass_y='deprecated',
            validate=False)),
 ('standardscaler', <__main__.StandardScaler at 0x1561b2198>),
 ('logisticregression',
  LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
            intercept_scaling=1, max_iter=100, multi_class='ovr', n_jobs=1,
            penalty='l2', random_state=None, solver='liblinear', tol=0.0001,
            verbose=0, warm_start=False))]
In [22]:
%time pipe.fit(X_train, y_train)
CPU times: user 59.6 s, sys: 5.81 s, total: 1min 5s
Wall time: 1min 7s
Out[22]:
Pipeline(memory=None,
     steps=[('columnselector', <__main__.ColumnSelector object at 0x1561b2dd8>), ('hourextractor', <__main__.HourExtractor object at 0x1561b2278>), ('functiontransformer-1', FunctionTransformer(accept_sparse=False,
          func=<function payment_lowerer at 0x1f3508e18>, inv_kw_args=None,
          inve...ty='l2', random_state=None, solver='liblinear', tol=0.0001,
          verbose=0, warm_start=False))])
In [23]:
pipe.score(X_train, y_train)
Out[23]:
0.99314373342666018
In [25]:
pipe.score(X_test, y_test)
Out[25]:
0.99316284730737436
In [26]:
def mkpipe():
    pipe = make_pipeline(
        ColumnSelector(columns),
        HourExtractor(['Trip_Pickup_DateTime']),
        FunctionTransformer(payment_lowerer, validate=False),
        CategoricalEncoder(categories),
        FunctionTransformer(pd.get_dummies, validate=False),
        StandardScaler(scale),
        LogisticRegression(),
    )
    return pipe

Scaling it Out

In [30]:
import dask.dataframe as dd
In [31]:
%%time
df = dd.read_csv("data/*.csv", dtype=dtype,
                 parse_dates=['Trip_Pickup_DateTime', 'Trip_Dropoff_DateTime'],)

X = df.drop("Tip_Amt", axis=1)
y = df['Tip_Amt'] > 0

Since the scikit-learn world isn't really "dask-aware" at the moment, we'll use the map_partitions method. This is a good escape hatch for dealing with non-daskified code.

In [35]:
yhat = X.map_partitions(lambda x: pd.Series(pipe.predict_proba(x)[:, 1], name='yhat'),
                        meta=('yhat', 'f8'))
In [36]:
%time yhat.to_frame().to_parquet("data/predictions.parq")
CPU times: user 17min 52s, sys: 2min 35s, total: 20min 27s
Wall time: 8min 49s