In [1]:
import codecs, json
import dask.dataframe as dd
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
In [2]:
%%time
# set data file path
parquet_data_folder = '../data/crimes-2017.snappy.parq'
print('Loading crime data from: {}'.format(parquet_data_folder))

# load crimes parquet data into dask df
crimes = dd.read_parquet(parquet_data_folder, index='Date')

# load all data into memory
crimes = crimes.persist()
print('Crime data loaded into memory.')

# log records count and data frame stats
print('Crime data stats:')
print('---------------------------------------')
print('{:,} total records in {} partitions'.format(len(crimes), crimes.npartitions))
print('DataFrame size: {:,}'.format(crimes.size.compute()))
Loading crime data from: ../data/crimes-2017.snappy.parq
Crime data loaded into memory.
Crime data stats:
---------------------------------------
172,030 total records in 1 partitions
DataFrame size: 2,408,420
Wall time: 3.39 s
In [3]:
# get crime geo data for mapping homicides
crime_geo = crimes[['PrimaryType',
                    'Block',
                    'Description',
                    'LocationDescription',
                    'CommunityArea',
                    'Arrest',
                    'Domestic',
                    'Latitude', 
                    'Longitude']].dropna()

# get homicides
homicides = crime_geo[(crime_geo['PrimaryType']=='HOMICIDE')].compute()
print('Chicago homicides data preview:')
print('--------------------------------------------------------------------------')
print(homicides.head())
print('...')
print('Total Homicides:', len(homicides))
Chicago homicides data preview:
--------------------------------------------------------------------------
                    PrimaryType                 Block          Description  \
Date                                                                         
2017-01-01 05:19:00    HOMICIDE      046XX N BROADWAY  FIRST DEGREE MURDER   
2017-01-01 06:18:00    HOMICIDE     046XX W MONROE ST  FIRST DEGREE MURDER   
2017-01-02 09:14:00    HOMICIDE    025XX N LOWELL AVE  FIRST DEGREE MURDER   
2017-01-03 12:20:00    HOMICIDE   034XX W FULTON BLVD  FIRST DEGREE MURDER   
2017-01-03 23:52:00    HOMICIDE  032XX W LEXINGTON ST  FIRST DEGREE MURDER   

                    LocationDescription CommunityArea  Arrest  Domestic  \
Date                                                                      
2017-01-01 05:19:00              TAVERN           3.0    True     False   
2017-01-01 06:18:00              STREET          25.0   False     False   
2017-01-02 09:14:00              STREET          20.0    True     False   
2017-01-03 12:20:00              STREET          27.0   False     False   
2017-01-03 23:52:00               ALLEY          27.0   False     False   

                      Latitude  Longitude  
Date                                       
2017-01-01 05:19:00  41.966082 -87.657908  
2017-01-01 06:18:00  41.879291 -87.741599  
2017-01-02 09:14:00  41.926841 -87.735416  
2017-01-03 12:20:00  41.886341 -87.712000  
2017-01-03 23:52:00  41.871868 -87.706610  
...
Total Homicides: 441
In [4]:
# get homicides coordinates for heatmap data
homicides_geo = homicides[['Latitude', 'Longitude']].values.tolist() # to_records()
print(homicides_geo[0:5])
[[41.966081546999995, -87.657908498], [41.879290642, -87.74159851299999], [41.926840967, -87.735415625], [41.886340706999995, -87.711999596], [41.871868444, -87.706610311]]
In [5]:
#json.dumps(homicides_geo)
def to_json_file(file_path, data):
    json.dump(data, 
          codecs.open(file_path, 'w', encoding='utf-8'), 
          separators=(',', ':'), sort_keys=False, indent=0)
In [6]:
# create homicides json data file for the map
to_json_file('../data/chicago-homicides-2017.json', homicides_geo)
In [7]:
print('All Crimes:', len(crime_geo))
All Crimes: 165567
In [8]:
%%time
# output all crimes coordinates to see how large it gets in raw json
to_json_file('../data/chicago-crimes-2017.json', 
             crime_geo[['Latitude', 'Longitude']].compute().values.tolist())
Wall time: 4.21 s
In [9]:
# gets crime coordinates with date dask df for animated crimes map replay
crime_coordinates = crime_geo[['Latitude', 'Longitude']]
crime_coordinates
Out[9]:
Dask DataFrame Structure:
Latitude Longitude
npartitions=1
2017-01-01 00:00:00 float64 float64
2017-08-25 23:59:00 ... ...
Dask Name: getitem, 4 tasks
In [10]:
# spit out that csv data frame for demo time later
crime_coordinates.to_csv('../data/chicago-crimes-2017-*.csv')
In [27]:
%%time
# dish it out in snappy parquet for comparison
crime_coordinates.to_parquet('../data/chicago-crimes-2017.parquet', compression='SNAPPY')
Wall time: 174 ms
In [12]:
# create pandas dataframe for conversion to arrow
crime_geo_df = crime_geo[['Latitude', 'Longitude']].compute()
crime_geo_df.info()
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 165567 entries, 2017-01-01 00:00:00 to 2017-08-25 23:59:00
Data columns (total 2 columns):
Latitude     165567 non-null float64
Longitude    165567 non-null float64
dtypes: float64(2)
memory usage: 3.8 MB
In [13]:
# convert pandas data frame to arrow table
crime_geo_table = pa.Table.from_pandas(crime_geo_df)
crime_geo_table
Out[13]:
pyarrow.Table
Latitude: double
Longitude: double
Date: timestamp[ns]
metadata
--------
{b'pandas': b'{"index_columns": ["Date"], "columns": [{"numpy_type": "float64"'
            b', "pandas_type": "float64", "metadata": null, "name": "Latitude"'
            b'}, {"numpy_type": "float64", "pandas_type": "float64", "metadata'
            b'": null, "name": "Longitude"}, {"numpy_type": "datetime64[ns]", '
            b'"pandas_type": "datetime", "metadata": null, "name": "Date"}], "'
            b'pandas_version": "0.20.3"}'}
In [14]:
%%time
# write arrow table to a single parquet file, just to test it
pq.write_table(crime_geo_table, '../data/chicago-crimes-2017.parq')
Wall time: 73.2 ms
In [15]:
%%time
# read parquet file created with arrow with dask for compatibility check
ddf = dd.read_parquet('../data/chicago-crimes-2017.parq', index='Date')
Wall time: 5.86 ms
In [16]:
print('{:,} total records in {} partitions'.format(len(ddf), ddf.npartitions))
print('DataFrame size: {:,}'.format(ddf.size.compute()))
ddf
165,567 total records in 1 partitions
DataFrame size: 331,134
Out[16]:
Dask DataFrame Structure:
Latitude Longitude
npartitions=1
2017-01-01 00:00:00 float64 float64
2017-08-25 23:59:00 ... ...
Dask Name: read-parquet, 1 tasks
In [17]:
%%time
# read parquet file with arrow
table = pq.read_table('../data/chicago-crimes-2017.parq')
Wall time: 0 ns
In [18]:
table
Out[18]:
pyarrow.Table
Latitude: double
Longitude: double
Date: timestamp[us]
metadata
--------
{b'pandas': b'{"index_columns": ["Date"], "columns": [{"numpy_type": "float64"'
            b', "pandas_type": "float64", "metadata": null, "name": "Latitude"'
            b'}, {"numpy_type": "float64", "pandas_type": "float64", "metadata'
            b'": null, "name": "Longitude"}, {"numpy_type": "datetime64[ns]", '
            b'"pandas_type": "datetime", "metadata": null, "name": "Date"}], "'
            b'pandas_version": "0.20.3"}'}
In [19]:
%%time
# convert it to pandas data frame
df = table.to_pandas()
Wall time: 24.4 ms
In [20]:
df.info()
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 165567 entries, 2017-01-01 00:00:00 to 2017-08-25 23:59:00
Data columns (total 2 columns):
Latitude     165567 non-null float64
Longitude    165567 non-null float64
dtypes: float64(2)
memory usage: 3.8 MB
In [21]:
%%time
# write arrow stream to disk
writer = pa.RecordBatchFileWriter('../data/chicago-crimes-2017.arrow', table.schema)
writer.write_table(table)
writer.close()
Wall time: 0 ns
In [22]:
%%time
# read back binary arrow file from disk
reader = pa.RecordBatchFileReader('../data/chicago-crimes-2017.arrow')
read_table = reader.read_all()
Wall time: 4.88 ms
In [23]:
read_table
Out[23]:
pyarrow.Table
Latitude: double
Longitude: double
Date: timestamp[us]
metadata
--------
{b'pandas': b'{"index_columns": ["Date"], "columns": [{"numpy_type": "float64"'
            b', "pandas_type": "float64", "metadata": null, "name": "Latitude"'
            b'}, {"numpy_type": "float64", "pandas_type": "float64", "metadata'
            b'": null, "name": "Longitude"}, {"numpy_type": "datetime64[ns]", '
            b'"pandas_type": "datetime", "metadata": null, "name": "Date"}], "'
            b'pandas_version": "0.20.3"}'}
In [24]:
read_table.num_rows
Out[24]:
165567
In [25]:
# js can't read Timestamp. let's save coordinates only for now
coordinates_table = pa.Table.from_pandas(df, preserve_index=False)
coordinates_table
Out[25]:
pyarrow.Table
Latitude: double
Longitude: double
metadata
--------
{b'pandas': b'{"index_columns": [], "columns": [{"numpy_type": "float64", "pan'
            b'das_type": "float64", "metadata": null, "name": "Latitude"}, {"n'
            b'umpy_type": "float64", "pandas_type": "float64", "metadata": nul'
            b'l, "name": "Longitude"}], "pandas_version": "0.20.3"}'}
In [26]:
# write coordinates arrow binary data to disk
writer = pa.RecordBatchFileWriter('../data/chicago-crimes-2017-coordinates.arrow', coordinates_table.schema)
writer.write_table(coordinates_table)
writer.close()