import codecs, json
import dask.dataframe as dd
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
year = '2018'
data_dir = '../data/' + year + '/'
file_name = 'chicago-crimes-' + year
%%time
# set input data file path
parquet_data_dir = data_dir + 'crimes-' + year + '.snappy.parq'
print('Loading crime data from: {}'.format(parquet_data_dir))
# load crimes parquet data into dask df
crimes = dd.read_parquet(parquet_data_dir, index='Date')
# load all data into memory
crimes = crimes.persist()
print('Crime data loaded into memory.')
# log records count and data frame stats
print('Crime data stats:')
print('---------------------------------------')
print('{:,} total records in {} partitions'.format(len(crimes), crimes.npartitions))
print('DataFrame size: {:,}'.format(crimes.size.compute()))
Loading crime data from: ../data/2018/crimes-2018.snappy.parq Crime data loaded into memory. Crime data stats: --------------------------------------- 157,504 total records in 1 partitions DataFrame size: 2,205,056 Wall time: 610 ms
crimes
Block | PrimaryType | FBICode | Description | LocationDescription | CommunityArea | Beat | District | Ward | Arrest | Domestic | Latitude | Longitude | Year | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
npartitions=1 | ||||||||||||||
object | int8 | int8 | int16 | int8 | int8 | int16 | int8 | int8 | bool | bool | float64 | float64 | int8 | |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
# get crime geo data for mapping, drop na
crime_geo = crimes[['PrimaryType',
'Block',
'Description',
'LocationDescription',
'CommunityArea',
'Arrest',
'Domestic',
'Latitude',
'Longitude',
'Ward']].dropna()
print('All Crimes:', len(crime_geo))
All Crimes: 156385
# converts crimes data to json
def to_json_file(file_path, data):
json.dump(data,
codecs.open(file_path, 'w', encoding='utf-8'),
separators=(',', ':'), sort_keys=False, indent=0)
%%time
# output crimes data in raw json to see how large it gets
geo_data_columns = ['Latitude', 'Longitude', 'Block', 'LocationDescription',
'PrimaryType', 'Description', 'Arrest', 'Domestic', 'Ward']
to_json_file(data_dir + file_name + '.json',
crime_geo[geo_data_columns].compute().values.tolist())
Wall time: 5.81 s
%%time
# dish it out in snappy parquet for comparison
crime_geo.to_parquet(data_dir + file_name + '.parquet', compression='SNAPPY')
Wall time: 486 ms
# create pandas dataframe for conversion to arrow
crime_geo_df = crime_geo[geo_data_columns].compute()
crime_geo_df.info()
<class 'pandas.core.frame.DataFrame'> DatetimeIndex: 156385 entries, 2018-01-01 00:00:00 to 2018-08-08 23:59:00 Data columns (total 9 columns): Latitude 156385 non-null float64 Longitude 156385 non-null float64 Block 156385 non-null object LocationDescription 156385 non-null object PrimaryType 156385 non-null object Description 156385 non-null object Arrest 156385 non-null bool Domestic 156385 non-null bool Ward 156385 non-null float64 dtypes: bool(2), float64(3), object(4) memory usage: 9.8+ MB
# convert pandas data frame to arrow table
crime_geo_table = pa.Table.from_pandas(crime_geo_df)
crime_geo_table
pyarrow.Table Latitude: double Longitude: double Block: string LocationDescription: string PrimaryType: string Description: string Arrest: bool Domestic: bool Ward: double Date: timestamp[ns] metadata -------- {b'pandas': b'{"index_columns": ["Date"], "column_indexes": [{"name": null, "f' b'ield_name": null, "pandas_type": "unicode", "numpy_type": "objec' b't", "metadata": {"encoding": "UTF-8"}}], "columns": [{"name": "L' b'atitude", "field_name": "Latitude", "pandas_type": "float64", "n' b'umpy_type": "float64", "metadata": null}, {"name": "Longitude", ' b'"field_name": "Longitude", "pandas_type": "float64", "numpy_type' b'": "float64", "metadata": null}, {"name": "Block", "field_name":' b' "Block", "pandas_type": "unicode", "numpy_type": "object", "met' b'adata": null}, {"name": "LocationDescription", "field_name": "Lo' b'cationDescription", "pandas_type": "unicode", "numpy_type": "obj' b'ect", "metadata": null}, {"name": "PrimaryType", "field_name": "' b'PrimaryType", "pandas_type": "unicode", "numpy_type": "object", ' b'"metadata": null}, {"name": "Description", "field_name": "Descri' b'ption", "pandas_type": "unicode", "numpy_type": "object", "metad' b'ata": null}, {"name": "Arrest", "field_name": "Arrest", "pandas_' b'type": "bool", "numpy_type": "bool", "metadata": null}, {"name":' b' "Domestic", "field_name": "Domestic", "pandas_type": "bool", "n' b'umpy_type": "bool", "metadata": null}, {"name": "Ward", "field_n' b'ame": "Ward", "pandas_type": "float64", "numpy_type": "float64",' b' "metadata": null}, {"name": "Date", "field_name": "Date", "pand' b'as_type": "datetime", "numpy_type": "datetime64[ns]", "metadata"' b': null}], "pandas_version": "0.23.0"}'}
%%time
# write arrow table to a single parquet file, just to test it
pq.write_table(crime_geo_table, data_dir + file_name + '.parq')
Wall time: 173 ms
%%time
# read parquet file created with arrow with dask for compatibility check
ddf = dd.read_parquet(data_dir + file_name + '.parq', index='Date')
Wall time: 11.7 ms
print('{:,} total records in {} partitions'.format(len(ddf), ddf.npartitions))
print('DataFrame size: {:,}'.format(ddf.size.compute()))
ddf
156,385 total records in 1 partitions DataFrame size: 1,407,465
Latitude | Longitude | Block | LocationDescription | PrimaryType | Description | Arrest | Domestic | Ward | |
---|---|---|---|---|---|---|---|---|---|
npartitions=1 | |||||||||
float64 | float64 | object | object | object | object | bool | bool | float64 | |
... | ... | ... | ... | ... | ... | ... | ... | ... |
%%time
# read parquet file with arrow
table = pq.read_table(data_dir + file_name + '.parq')
Wall time: 75.2 ms
table
pyarrow.Table Latitude: double Longitude: double Block: string LocationDescription: string PrimaryType: string Description: string Arrest: bool Domestic: bool Ward: double Date: timestamp[us] metadata -------- {b'pandas': b'{"index_columns": ["Date"], "column_indexes": [{"name": null, "f' b'ield_name": null, "pandas_type": "unicode", "numpy_type": "objec' b't", "metadata": {"encoding": "UTF-8"}}], "columns": [{"name": "L' b'atitude", "field_name": "Latitude", "pandas_type": "float64", "n' b'umpy_type": "float64", "metadata": null}, {"name": "Longitude", ' b'"field_name": "Longitude", "pandas_type": "float64", "numpy_type' b'": "float64", "metadata": null}, {"name": "Block", "field_name":' b' "Block", "pandas_type": "unicode", "numpy_type": "object", "met' b'adata": null}, {"name": "LocationDescription", "field_name": "Lo' b'cationDescription", "pandas_type": "unicode", "numpy_type": "obj' b'ect", "metadata": null}, {"name": "PrimaryType", "field_name": "' b'PrimaryType", "pandas_type": "unicode", "numpy_type": "object", ' b'"metadata": null}, {"name": "Description", "field_name": "Descri' b'ption", "pandas_type": "unicode", "numpy_type": "object", "metad' b'ata": null}, {"name": "Arrest", "field_name": "Arrest", "pandas_' b'type": "bool", "numpy_type": "bool", "metadata": null}, {"name":' b' "Domestic", "field_name": "Domestic", "pandas_type": "bool", "n' b'umpy_type": "bool", "metadata": null}, {"name": "Ward", "field_n' b'ame": "Ward", "pandas_type": "float64", "numpy_type": "float64",' b' "metadata": null}, {"name": "Date", "field_name": "Date", "pand' b'as_type": "datetime", "numpy_type": "datetime64[ns]", "metadata"' b': null}], "pandas_version": "0.23.0"}'}
%%time
# convert it to pandas data frame
df = table.to_pandas()
Wall time: 63.5 ms
df.info()
<class 'pandas.core.frame.DataFrame'> DatetimeIndex: 156385 entries, 2018-01-01 00:00:00 to 2018-08-08 23:59:00 Data columns (total 9 columns): Latitude 156385 non-null float64 Longitude 156385 non-null float64 Block 156385 non-null object LocationDescription 156385 non-null object PrimaryType 156385 non-null object Description 156385 non-null object Arrest 156385 non-null bool Domestic 156385 non-null bool Ward 156385 non-null float64 dtypes: bool(2), float64(3), object(4) memory usage: 9.8+ MB
%%time
# write arrow stream to disk
writer = pa.RecordBatchFileWriter(data_dir + file_name + '.arrow', table.schema)
writer.write_table(table)
writer.close()
Wall time: 265 ms
%%time
# read back binary arrow file from disk
reader = pa.RecordBatchFileReader(data_dir + file_name + '.arrow')
read_table = reader.read_all()
Wall time: 4.88 ms
read_table
pyarrow.Table Latitude: double Longitude: double Block: string LocationDescription: string PrimaryType: string Description: string Arrest: bool Domestic: bool Ward: double Date: timestamp[us] metadata -------- {b'pandas': b'{"index_columns": ["Date"], "column_indexes": [{"name": null, "f' b'ield_name": null, "pandas_type": "unicode", "numpy_type": "objec' b't", "metadata": {"encoding": "UTF-8"}}], "columns": [{"name": "L' b'atitude", "field_name": "Latitude", "pandas_type": "float64", "n' b'umpy_type": "float64", "metadata": null}, {"name": "Longitude", ' b'"field_name": "Longitude", "pandas_type": "float64", "numpy_type' b'": "float64", "metadata": null}, {"name": "Block", "field_name":' b' "Block", "pandas_type": "unicode", "numpy_type": "object", "met' b'adata": null}, {"name": "LocationDescription", "field_name": "Lo' b'cationDescription", "pandas_type": "unicode", "numpy_type": "obj' b'ect", "metadata": null}, {"name": "PrimaryType", "field_name": "' b'PrimaryType", "pandas_type": "unicode", "numpy_type": "object", ' b'"metadata": null}, {"name": "Description", "field_name": "Descri' b'ption", "pandas_type": "unicode", "numpy_type": "object", "metad' b'ata": null}, {"name": "Arrest", "field_name": "Arrest", "pandas_' b'type": "bool", "numpy_type": "bool", "metadata": null}, {"name":' b' "Domestic", "field_name": "Domestic", "pandas_type": "bool", "n' b'umpy_type": "bool", "metadata": null}, {"name": "Ward", "field_n' b'ame": "Ward", "pandas_type": "float64", "numpy_type": "float64",' b' "metadata": null}, {"name": "Date", "field_name": "Date", "pand' b'as_type": "datetime", "numpy_type": "datetime64[ns]", "metadata"' b': null}], "pandas_version": "0.23.0"}'}