#!/usr/bin/env python # coding: utf-8 # In[4]: import pandas as pd import numpy as np import dask.dataframe as dd import matplotlib.pyplot as plt import seaborn as sns # Script to download the data: `download.py` # In[ ]: # %load download.py """ Download taxi data from S3 to local """ from pathlib import Path import sys import argparse import s3fs from distributed import Client, wait def parse_args(args=None): parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('-s', '--scheduler', default=None, help='Scheduler address') return parser.parse_args(args) def fetch(key): fs = s3fs.S3FileSystem(anon=True) dest = Path('data').joinpath(Path(key).name) dest.parent.mkdir(exist_ok=True) fs.get(key, str(dest)) return key def main(args=None): args = parse_args(args) client = Client(args.scheduler) keys = [ f'nyc-tlc/trip data/yellow_tripdata_2009-{m:0>2}.csv' for m in range(1, 13) ] results = client.map(fetch, keys) wait(results) if __name__ == '__main__': sys.exit(main()) # In[5]: pd.options.display.max_rows = 10 # In[6]: get_ipython().run_line_magic('matplotlib', 'inline') # In[4]: ls -lh data/*.csv # Load the first `DataFrame` into memory. # In[5]: get_ipython().run_cell_magic('time', '', 'dtype = {\n \'vendor_name\': \'category\',\n \'Payment_Type\': \'category\',\n}\n\ndf = pd.read_csv("data/yellow_tripdata_2009-01.csv", dtype=dtype,\n parse_dates=[\'Trip_Pickup_DateTime\', \'Trip_Dropoff_DateTime\'],)\n') # In[6]: df.head() # Let's predict whether or not the person tips. # In[7]: X = df.drop("Tip_Amt", axis=1) y = df['Tip_Amt'] > 0 # We're in-memory, so all this is normal. # In[8]: from sklearn.model_selection import train_test_split # In[9]: X_train, X_test, y_train, y_test = train_test_split(X, y) # In[12]: len(X_train) # In[13]: len(X_test) # I notice that there are some minor differences in the spelling on "Payment Type": # In[10]: df.Payment_Type.cat.categories # We'll consolidate those by just lower-casing them: # In[11]: df.Payment_Type.str.lower() # And since we're good sci-kittens, we'll package all this up in a pipeline. # In[18]: from sklearn.base import BaseEstimator, TransformerMixin from sklearn.pipeline import make_pipeline from sklearn.preprocessing import FunctionTransformer # In[19]: class ColumnSelector(TransformerMixin): "Select `columns` from `X`" def __init__(self, columns): self.columns = columns def fit(self, X, y=None): return self def transform(self, X, y=None): return X[self.columns] class HourExtractor(TransformerMixin): "Transform each datetime64 column in `columns` to integer hours" def __init__(self, columns): self.columns = columns def fit(self, X, y=None): return self def transform(self, X, y=None): return X.assign(**{col: lambda x: x[col].dt.hour for col in self.columns}) def payment_lowerer(X): """Lowercase all the Payment_Type values""" return X.assign(Payment_Type=X.Payment_Type.str.lower()) class CategoricalEncoder(TransformerMixin): """Convert to Categorical with specific `categories`""" def __init__(self, categories): self.categories = categories def fit(self, X, y=None): return self def transform(self, X, y=None): for col, categories in self.categories.items(): X[col] = X[col].astype('category').cat.set_categories(categories) return X class StandardScaler(TransformerMixin): "Scale a subset of the columns in a DataFrame" def __init__(self, columns): self.columns = columns def fit(self, X, y=None): self.μs = X[self.columns].mean() self.σs = X[self.columns].std() return self def transform(self, X, y=None): X = X.copy() X[self.columns] = X[self.columns].sub(self.μs).div(self.σs) return X # In[20]: # The columns at the start of the pipeline columns = ['vendor_name', 'Trip_Pickup_DateTime', 'Passenger_Count', 'Trip_Distance', 'Payment_Type', 'Fare_Amt', 'surcharge'] # The mapping of {column: set of categories} categories = { 'vendor_name': ['CMT', 'DDS', 'VTS'], 'Payment_Type': ['cash', 'credit', 'dispute', 'no charge'], } scale = ['Trip_Distance', 'Fare_Amt', 'surcharge'] pipe = make_pipeline( ColumnSelector(columns), HourExtractor(['Trip_Pickup_DateTime']), FunctionTransformer(payment_lowerer, validate=False), CategoricalEncoder(categories), FunctionTransformer(pd.get_dummies, validate=False), StandardScaler(scale), LogisticRegression(), ) pipe # In[21]: pipe.steps # In[22]: get_ipython().run_line_magic('time', 'pipe.fit(X_train, y_train)') # In[23]: pipe.score(X_train, y_train) # In[25]: pipe.score(X_test, y_test) # In[26]: def mkpipe(): pipe = make_pipeline( ColumnSelector(columns), HourExtractor(['Trip_Pickup_DateTime']), FunctionTransformer(payment_lowerer, validate=False), CategoricalEncoder(categories), FunctionTransformer(pd.get_dummies, validate=False), StandardScaler(scale), LogisticRegression(), ) return pipe # ## Scaling it Out # In[30]: import dask.dataframe as dd # In[31]: get_ipython().run_cell_magic('time', '', 'df = dd.read_csv("data/*.csv", dtype=dtype,\n parse_dates=[\'Trip_Pickup_DateTime\', \'Trip_Dropoff_DateTime\'],)\n\nX = df.drop("Tip_Amt", axis=1)\ny = df[\'Tip_Amt\'] > 0\n') # Since the scikit-learn world isn't really "dask-aware" at the moment, we'll use the `map_partitions` method. This is a good escape hatch for dealing with non-daskified code. # In[35]: yhat = X.map_partitions(lambda x: pd.Series(pipe.predict_proba(x)[:, 1], name='yhat'), meta=('yhat', 'f8')) # In[36]: get_ipython().run_line_magic('time', 'yhat.to_frame().to_parquet("data/predictions.parq")')