#!/usr/bin/env python # coding: utf-8 # In[1]: import codecs, json import dask.dataframe as dd import numpy as np import pandas as pd import pyarrow as pa import pyarrow.parquet as pq # In[2]: year = '2018' data_dir = '../data/' + year + '/' file_name = 'chicago-crimes-' + year # In[3]: get_ipython().run_cell_magic('time', '', "# set input data file path\nparquet_data_dir = data_dir + 'crimes-' + year + '.snappy.parq'\nprint('Loading crime data from: {}'.format(parquet_data_dir))\n\n# load crimes parquet data into dask df\ncrimes = dd.read_parquet(parquet_data_dir, index='Date')\n\n# load all data into memory\ncrimes = crimes.persist()\nprint('Crime data loaded into memory.')\n\n# log records count and data frame stats\nprint('Crime data stats:')\nprint('---------------------------------------')\nprint('{:,} total records in {} partitions'.format(len(crimes), crimes.npartitions))\nprint('DataFrame size: {:,}'.format(crimes.size.compute()))\n") # In[4]: crimes # In[5]: # get crime geo data for mapping, drop na crime_geo = crimes[['PrimaryType', 'Block', 'Description', 'LocationDescription', 'CommunityArea', 'Arrest', 'Domestic', 'Latitude', 'Longitude', 'Ward']].dropna() print('All Crimes:', len(crime_geo)) # In[6]: # converts crimes data to json def to_json_file(file_path, data): json.dump(data, codecs.open(file_path, 'w', encoding='utf-8'), separators=(',', ':'), sort_keys=False, indent=0) # In[7]: get_ipython().run_cell_magic('time', '', "# output crimes data in raw json to see how large it gets\ngeo_data_columns = ['Latitude', 'Longitude', 'Block', 'LocationDescription', \n 'PrimaryType', 'Description', 'Arrest', 'Domestic', 'Ward']\nto_json_file(data_dir + file_name + '.json', \n crime_geo[geo_data_columns].compute().values.tolist())\n") # In[8]: get_ipython().run_cell_magic('time', '', "# dish it out in snappy parquet for comparison\ncrime_geo.to_parquet(data_dir + file_name + '.parquet', compression='SNAPPY')\n") # In[9]: # create pandas dataframe for conversion to arrow crime_geo_df = crime_geo[geo_data_columns].compute() crime_geo_df.info() # In[10]: # convert pandas data frame to arrow table crime_geo_table = pa.Table.from_pandas(crime_geo_df) crime_geo_table # In[11]: get_ipython().run_cell_magic('time', '', "# write arrow table to a single parquet file, just to test it\npq.write_table(crime_geo_table, data_dir + file_name + '.parq')\n") # In[12]: get_ipython().run_cell_magic('time', '', "# read parquet file created with arrow with dask for compatibility check\nddf = dd.read_parquet(data_dir + file_name + '.parq', index='Date')\n") # In[13]: print('{:,} total records in {} partitions'.format(len(ddf), ddf.npartitions)) print('DataFrame size: {:,}'.format(ddf.size.compute())) ddf # In[14]: get_ipython().run_cell_magic('time', '', "# read parquet file with arrow\ntable = pq.read_table(data_dir + file_name + '.parq')\n") # In[15]: table # In[16]: get_ipython().run_cell_magic('time', '', '# convert it to pandas data frame\ndf = table.to_pandas()\n') # In[17]: df.info() # In[18]: get_ipython().run_cell_magic('time', '', "# write arrow stream to disk\nwriter = pa.RecordBatchFileWriter(data_dir + file_name + '.arrow', table.schema)\nwriter.write_table(table)\nwriter.close()\n") # In[19]: get_ipython().run_cell_magic('time', '', "# read back binary arrow file from disk\nreader = pa.RecordBatchFileReader(data_dir + file_name + '.arrow')\nread_table = reader.read_all()\n") # In[20]: read_table