import dask.dataframe as dd
import dask.distributed
import numpy as np
client = dask.distributed.Client()
# Print column names
df0 = dd.read_parquet('/data/all_trips.parquet', engine='fastparquet', index='pickup_datetime')
df0.columns
# Load only the columns we need, large speedup.
df = dd.read_parquet('/data/all_trips_spark.parquet', engine='arrow',
columns=[
# 'pickup_datetime',
'pickup_longitude', 'pickup_latitude', #'pickup_taxizone_id',
# 'dropoff_datetime',
'dropoff_longitude', 'dropoff_latitude', #'dropoff_taxizone_id',
# 'trip_type',
# 'passenger_count'
'total_amount'
])
df.head()
df.tail()
#Select only those points within some reasonable bounds (half a degree)
df = df[df.pickup_latitude.notnull() & df.pickup_longitude.notnull()
& ((df.pickup_latitude - 40.75).abs() <= 0.5)
& ((df.pickup_longitude + 73.9).abs() <= 0.5)
]
df = df[df.dropoff_latitude.notnull() & df.dropoff_longitude.notnull()
& ((df.dropoff_latitude - 40.75).abs() <= 0.5)
& ((df.dropoff_longitude + 73.9).abs() <= 0.5)
]
# We get about 1.27 billion points
df.count().compute()
df.head()
def convert_lon(d, latvar):
"Convert longitude to web mercator"
k = d[latvar].copy()
k = (20037508.34 / 180) * (np.log(np.tan((90. + d[latvar]) * np.pi/360))/(np.pi/180.))
return k
# Convert lats and lons to web mercator projection
df['pickup_longitude'] = df.pickup_longitude * (20037508.34 / 180)
df['pickup_latitude'] = df.map_partitions(convert_lon, 'pickup_latitude')
df['dropoff_longitude'] = df.dropoff_longitude * (20037508.34 / 180)
df['dropoff_latitude'] = df.map_partitions(convert_lon, 'dropoff_latitude')
# Consolidate partitions for faster plotting
df.repartition(npartitions=200).to_parquet('/tmp/filtered.parquet', compression='SNAPPY')
# Read the consolidated data back in
df = dd.read_parquet('/tmp/filtered.parquet')
# Subsample the data
# It's currently commented out, but it's useful
# when iterating on plot details (axes, ranges, etc.),
# as it greatly speeds up plot redrawing.
# df = client.persist(df.sample(frac=0.02))
df.head()
import datashader as ds
import datashader.transfer_functions as tf
import datashader as ds
from datashader.bokeh_ext import InteractiveImage
from functools import partial
from datashader.utils import export_image
from datashader.colors import colormap_select, Greys9, Hot, viridis, inferno
from IPython.core.display import HTML, display
from bokeh.models import BoxZoomTool
from bokeh.plotting import figure, output_notebook, show
output_notebook()
#set centers, bounds, and ranges in web mercator coords
x_center = -8234000
y_center = 4973000
x_half_range = 30000
y_half_range = 25000
NYC = x_range, y_range = ((x_center - x_half_range, x_center + x_half_range),
(y_center-y_half_range, y_center+y_half_range))
# plot_width scales (quadratically?) with memory consumption.
# With 32GB RAM, I can set this to 2000, but 2500 crashes with MemoryError.
# I used this setting for high quality, large plots.
# plot_width = 2000
# plot_width of 400 seems to require less than 4GB, and makes the notebook more manageable.
# Also changes aesthetic appearance by decreasing GPS "noise" due to coarse binning
plot_width = 400
# auto calculate from width
plot_height = int(plot_width/(x_half_range/y_half_range))
def base_plot(tools='pan,wheel_zoom,reset,save',plot_width=plot_width,
plot_height=plot_height, **plot_args):
p = figure(tools=tools, plot_width=plot_width, plot_height=plot_height,
x_range=x_range, y_range=y_range, outline_line_color=None,
min_border=0, min_border_left=0, min_border_right=0,
min_border_top=0, min_border_bottom=0, **plot_args)
p.axis.visible = False
p.xgrid.grid_line_color = None
p.ygrid.grid_line_color = None
p.add_tools(BoxZoomTool(match_aspect=True))
return p
options = dict(line_color=None, fill_color='blue', size=5)
background = "black"
export = partial(export_image, export_path="export", background=background)
cm = partial(colormap_select, reverse=(background=="black"))
def create_image_1(x_range, y_range, w=plot_width, h=plot_height):
cvs = ds.Canvas(plot_width=w, plot_height=h, x_range=x_range, y_range=y_range)
agg = cvs.points(df, 'pickup_longitude', 'pickup_latitude', ds.count('total_amount'))
img = tf.shade(agg, cmap=viridis, how='eq_hist')
return tf.dynspread(img, threshold=0.5, max_px=4)
p = base_plot(background_fill_color=background)
export(create_image_1(x_range, y_range, plot_width, plot_height),"pickups_large_wide")
InteractiveImage(p, create_image_1)
background = "black"
export = partial(export_image, export_path="export", background=background)
cm = partial(colormap_select, reverse=(background=="black"))
def create_image_2(x_range, y_range, w=plot_width, h=plot_height):
cvs = ds.Canvas(plot_width=w, plot_height=h, x_range=x_range, y_range=y_range)
agg = cvs.points(df, 'dropoff_longitude', 'dropoff_latitude', ds.count('total_amount'))
img = tf.shade(agg, cmap=inferno, how='eq_hist')
return tf.dynspread(img, threshold=0.5, max_px=4)
p = base_plot(background_fill_color=background)
export(create_image_2(x_range, y_range, plot_width, plot_height),"dropoffs_large_wide")
InteractiveImage(p, create_image_2)