#!/usr/bin/env python # coding: utf-8 # In[8]: import pandas as pd import numpy as np import dask.dataframe as dd import matplotlib.pyplot as plt import seaborn as sns # In[2]: pd.options.display.max_rows = 10 # In[3]: get_ipython().run_line_magic('matplotlib', 'inline') # In[4]: ls -lh data/*.csv # In[5]: get_ipython().run_cell_magic('time', '', 'dtype = {\n \'vendor_name\': \'category\',\n \'Payment_Type\': \'category\',\n}\n\ndf = pd.read_csv("data/yellow_tripdata_2009-01.csv", dtype=dtype,\n parse_dates=[\'Trip_Pickup_DateTime\', \'Trip_Dropoff_DateTime\'],)\n') # In[6]: df.head() # Let's predict whether or not the person tips. We'll keep it simple and just use a `LogisticRegression`. # In[11]: X = df.drop("Tip_Amt", axis=1) y = df['Tip_Amt'] > 0 # In[12]: from sklearn.model_selection import train_test_split # In[13]: X_train, X_test, y_train, y_test = train_test_split(X, y) # I notice that there are some minor differences in the spelling on "Payment Type": # In[14]: df.Payment_Type.cat.categories # We'll consolidate those by just lower-casing them: # In[15]: df.Payment_Type.str.lower() # Finally, we'll want to ensure that all the DataFrames have a consistent set of categories (incidentally, this is what my [`CategoricalDtype` refactor](https://github.com/pandas-dev/pandas/pull/16015) is going solve a little more cleanly that what we'll have to do here). For now, we'll just use a `set_categories`. # # And since we're good sci-kittens, we'll package all this up in a pipeline. # In[18]: from sklearn.base import BaseEstimator, TransformerMixin from sklearn.pipeline import make_pipeline from sklearn.preprocessing import FunctionTransformer # In[19]: class ColumnSelector(TransformerMixin): "Select `columns` from `X`" def __init__(self, columns): self.columns = columns def fit(self, X, y=None): return self def transform(self, X, y=None): return X[self.columns] class HourExtractor(TransformerMixin): "Transform each datetime64 column in `columns` to integer hours" def __init__(self, columns): self.columns = columns def fit(self, X, y=None): return self def transform(self, X, y=None): return X.assign(**{col: lambda x: x[col].dt.hour for col in self.columns}) def payment_lowerer(X): """Lowercase all the Payment_Type values""" return X.assign(Payment_Type=X.Payment_Type.str.lower()) class CategoricalEncoder(TransformerMixin): """Convert to Categorical with specific `categories`""" def __init__(self, categories): self.categories = categories def fit(self, X, y=None): return self def transform(self, X, y=None): for col, categories in self.categories.items(): X[col] = X[col].astype('category').cat.set_categories(categories) return X class StandardScaler(TransformerMixin): "Scale a subset of the columns in a DataFrame" def __init__(self, columns): self.columns = columns def fit(self, X, y=None): self.μs = X[self.columns].mean() self.σs = X[self.columns].std() return self def transform(self, X, y=None): X = X.copy() X[self.columns] = X[self.columns].sub(self.μs).div(self.σs) return X # In[20]: # The columns at the start of the pipeline columns = ['vendor_name', 'Trip_Pickup_DateTime', 'Passenger_Count', 'Trip_Distance', 'Payment_Type', 'Fare_Amt', 'surcharge'] # The mapping of {column: set of categories} categories = { 'vendor_name': ['CMT', 'DDS', 'VTS'], 'Payment_Type': ['cash', 'credit', 'dispute', 'no charge'], } scale = ['Trip_Distance', 'Fare_Amt', 'surcharge'] pipe = make_pipeline( ColumnSelector(columns), HourExtractor(['Trip_Pickup_DateTime']), FunctionTransformer(payment_lowerer, validate=False), CategoricalEncoder(categories), FunctionTransformer(pd.get_dummies, validate=False), StandardScaler(scale), LogisticRegression(), ) pipe # In[21]: pipe.steps # This may seem a bit overly-formal. You *could* just do this stuff imperatively. # In[22]: get_ipython().run_line_magic('time', 'pipe.fit(X_train, y_train)') # In[23]: pipe.score(X_train, y_train) # In[25]: pipe.score(X_test, y_test) # In[26]: def mkpipe(): pipe = make_pipeline( ColumnSelector(columns), HourExtractor(['Trip_Pickup_DateTime']), FunctionTransformer(payment_lowerer, validate=False), CategoricalEncoder(categories), FunctionTransformer(pd.get_dummies, validate=False), StandardScaler(scale), LogisticRegression(), ) return pipe # ## Learning Curve # In[29]: from sklearn.model_selection import cross_val_score, train_test_split # In[32]: X_train, X_test, y_train, y_test = train_test_split( X, y ) # In[39]: Ns = [100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000] scores = [] for n in Ns: pipe = mkpipe() sdf = df.sample(n=n) X_train2, _, y_train2, _ = train_test_split(X, y, train_size=n) print(f"Fitting for {n}") pipe.fit(X_train2, y_train2) print(f"Scoring for {n}") scores.append(cross_val_score(pipe, X_test, y_test)) # In[25]: len(df) / 10_000_000 # ## Scaling it Out # In[30]: import dask.dataframe as dd # In[31]: get_ipython().run_cell_magic('time', '', 'df = dd.read_csv("data/*.csv", dtype=dtype,\n parse_dates=[\'Trip_Pickup_DateTime\', \'Trip_Dropoff_DateTime\'],)\n\nX = df.drop("Tip_Amt", axis=1)\ny = df[\'Tip_Amt\'] > 0\n') # Since the scikit-learn world isn't really "dask-aware" at the moment, we'll use the `map_partitions` method. This is a good escape hatch for dealing with non-daskified code. # In[35]: yhat = X.map_partitions(lambda x: pd.Series(pipe.predict_proba(x)[:, 1], name='yhat'), meta=('yhat', 'f8')) # In[36]: get_ipython().run_line_magic('time', 'yhat.to_frame().to_parquet("data/predictions.parq")') # In[39]: keys = [ f'nyc-tlc/trip data/yellow_tripdata_2009-{m:0>2}.csv' for m in range(1, 13) ] keys # In[ ]: k