In [1]:
import dask.dataframe as dd
import dask.distributed
import numpy as np
In [2]:
client = dask.distributed.Client()
In [4]:
# Print column names

df0 = dd.read_parquet('/data/all_trips.parquet', engine='fastparquet', index='pickup_datetime')
df0.columns
Out[4]:
Index(['dropoff_datetime', 'dropoff_latitude', 'dropoff_longitude',
       'dropoff_taxizone_id', 'ehail_fee', 'extra', 'fare_amount',
       'improvement_surcharge', 'mta_tax', 'passenger_count', 'payment_type',
       'pickup_latitude', 'pickup_longitude', 'pickup_taxizone_id',
       'rate_code_id', 'store_and_fwd_flag', 'tip_amount', 'tolls_amount',
       'total_amount', 'trip_distance', 'trip_type', 'vendor_id', 'trip_id'],
      dtype='object')
In [21]:
# Load only the columns we need, large speedup.

df = dd.read_parquet('/data/all_trips_spark.parquet', engine='arrow', 
                     columns=[
#                         'pickup_datetime', 
        'pickup_longitude', 'pickup_latitude', #'pickup_taxizone_id',
#                         'dropoff_datetime', 
        'dropoff_longitude', 'dropoff_latitude', #'dropoff_taxizone_id',
#                         'trip_type', 
#         'passenger_count'
        'total_amount'
                    ])
In [22]:
df.head()
Out[22]:
pickup_longitude pickup_latitude dropoff_longitude dropoff_latitude total_amount
0 -73.965919 40.771244 -73.949608 40.777058 5.800000
1 -73.997482 40.725952 -74.005936 40.735703 5.400000
2 -73.964798 40.767391 -73.977753 40.773746 5.800000
3 -74.011597 40.708832 -74.013466 40.709358 4.600000
4 -74.000648 40.718578 -73.944580 40.712368 27.799999
In [23]:
df.tail()
Out[23]:
pickup_longitude pickup_latitude dropoff_longitude dropoff_latitude total_amount
1432504 NaN NaN NaN NaN 8.300000
1432505 NaN NaN NaN NaN 17.299999
1432506 NaN NaN NaN NaN 41.759998
1432507 NaN NaN NaN NaN 6.300000
1432508 NaN NaN NaN NaN 14.300000
In [24]:
#Select only those points within some reasonable bounds (half a degree)

df = df[df.pickup_latitude.notnull() & df.pickup_longitude.notnull() 
        & ((df.pickup_latitude - 40.75).abs() <= 0.5) 
        & ((df.pickup_longitude + 73.9).abs() <= 0.5)
       ]
df = df[df.dropoff_latitude.notnull() & df.dropoff_longitude.notnull() 
        & ((df.dropoff_latitude - 40.75).abs() <= 0.5) 
        & ((df.dropoff_longitude + 73.9).abs() <= 0.5)
       ]
In [25]:
# We get about 1.27 billion points
df.count().compute()
Out[25]:
pickup_longitude     1268170371
pickup_latitude      1268170371
dropoff_longitude    1268170371
dropoff_latitude     1268170371
total_amount         1268170371
dtype: int64
In [26]:
df.head()
Out[26]:
pickup_longitude pickup_latitude dropoff_longitude dropoff_latitude total_amount
0 -73.965919 40.771244 -73.949608 40.777058 5.800000
1 -73.997482 40.725952 -74.005936 40.735703 5.400000
2 -73.964798 40.767391 -73.977753 40.773746 5.800000
3 -74.011597 40.708832 -74.013466 40.709358 4.600000
4 -74.000648 40.718578 -73.944580 40.712368 27.799999
In [27]:
def convert_lon(d, latvar):
    "Convert longitude to web mercator"
    k = d[latvar].copy()
    k = (20037508.34 / 180) * (np.log(np.tan((90. + d[latvar]) * np.pi/360))/(np.pi/180.))
    return k
In [28]:
# Convert lats and lons to web mercator projection
df['pickup_longitude'] = df.pickup_longitude * (20037508.34 / 180)
df['pickup_latitude'] = df.map_partitions(convert_lon, 'pickup_latitude')
df['dropoff_longitude'] = df.dropoff_longitude * (20037508.34 / 180)
df['dropoff_latitude'] = df.map_partitions(convert_lon, 'dropoff_latitude')
In [29]:
# Consolidate partitions for faster plotting
df.repartition(npartitions=200).to_parquet('/tmp/filtered.parquet', compression='SNAPPY')
In [30]:
# Read the consolidated data back in
df = dd.read_parquet('/tmp/filtered.parquet')
In [31]:
# Subsample the data 
# It's currently commented out, but it's useful 
# when iterating on plot details (axes, ranges, etc.), 
# as it greatly speeds up plot redrawing. 

# df = client.persist(df.sample(frac=0.02))
In [32]:
df.head()
Out[32]:
pickup_longitude pickup_latitude dropoff_longitude dropoff_latitude total_amount
0 -8233848.5 4978660.0 -8232033.0 4979513.0 5.800000
1 -8237362.0 4972004.0 -8238303.0 4973436.5 5.400000
2 -8233724.0 4978093.5 -8235166.0 4979025.5 5.800000
3 -8238933.5 4969490.0 -8239141.5 4969566.0 4.600000
4 -8237714.5 4970922.5 -8231473.0 4970009.0 27.799999
In [61]:
import datashader as ds
import datashader.transfer_functions as tf

import datashader as ds
from datashader.bokeh_ext import InteractiveImage
from functools import partial
from datashader.utils import export_image
from datashader.colors import colormap_select, Greys9, Hot, viridis, inferno
from IPython.core.display import HTML, display
In [62]:
from bokeh.models import BoxZoomTool
from bokeh.plotting import figure, output_notebook, show

output_notebook()

#set centers, bounds, and ranges in web mercator coords
x_center = -8234000 
y_center = 4973000

x_half_range = 30000
y_half_range = 25000

NYC = x_range, y_range = ((x_center - x_half_range, x_center + x_half_range), 
                          (y_center-y_half_range, y_center+y_half_range))

# plot_width scales (quadratically?) with memory consumption.
# With 32GB RAM, I can set this to 2000, but 2500 crashes with MemoryError.
# I used this setting for high quality, large plots. 
# plot_width = 2000 

# plot_width of 400 seems to require less than 4GB, and makes the notebook more manageable. 
# Also changes aesthetic appearance by decreasing GPS "noise" due to coarse binning
plot_width  = 400 

# auto calculate from width
plot_height = int(plot_width/(x_half_range/y_half_range))

def base_plot(tools='pan,wheel_zoom,reset,save',plot_width=plot_width, 
              plot_height=plot_height, **plot_args):
    p = figure(tools=tools, plot_width=plot_width, plot_height=plot_height,
        x_range=x_range, y_range=y_range, outline_line_color=None,
        min_border=0, min_border_left=0, min_border_right=0,
        min_border_top=0, min_border_bottom=0, **plot_args)
    
    p.axis.visible = False
    p.xgrid.grid_line_color = None
    p.ygrid.grid_line_color = None
    
    p.add_tools(BoxZoomTool(match_aspect=True))
    
    return p
    
options = dict(line_color=None, fill_color='blue', size=5)
Loading BokehJS ...

Pickups

In [40]:
background = "black"
export = partial(export_image, export_path="export", background=background)
cm = partial(colormap_select, reverse=(background=="black"))


def create_image_1(x_range, y_range, w=plot_width, h=plot_height):
    cvs = ds.Canvas(plot_width=w, plot_height=h, x_range=x_range, y_range=y_range)
    agg = cvs.points(df, 'pickup_longitude', 'pickup_latitude', ds.count('total_amount'))
    img = tf.shade(agg, cmap=viridis, how='eq_hist')
    return tf.dynspread(img, threshold=0.5, max_px=4)
In [41]:
p = base_plot(background_fill_color=background)
export(create_image_1(x_range, y_range, plot_width, plot_height),"pickups_large_wide")
InteractiveImage(p, create_image_1)
Out[41]:

Dropoffs

In [42]:
background = "black"
export = partial(export_image, export_path="export", background=background)
cm = partial(colormap_select, reverse=(background=="black"))


def create_image_2(x_range, y_range, w=plot_width, h=plot_height):
    cvs = ds.Canvas(plot_width=w, plot_height=h, x_range=x_range, y_range=y_range)
    agg = cvs.points(df, 'dropoff_longitude', 'dropoff_latitude', ds.count('total_amount'))
    img = tf.shade(agg, cmap=inferno, how='eq_hist')
    return tf.dynspread(img, threshold=0.5, max_px=4)
In [43]:
p = base_plot(background_fill_color=background)
export(create_image_2(x_range, y_range, plot_width, plot_height),"dropoffs_large_wide")
InteractiveImage(p, create_image_2)
Out[43]: