#!/usr/bin/env python # coding: utf-8 # # # # #
# Household data: Processing Notebook # #
This Notebook is part of the Household Data Package of Open Power System Data. #
# # Table of Contents # * [1. Introductory Notes](#1.-Introductory-Notes) # * [2. Settings](#2.-Settings) # * [2.1 Set version number and recent changes](#2.1-Set-version-number-and-recent-changes) # * [2.2 Import Python libraries](#2.2-Import-Python-libraries) # * [2.3 Set directories](#2.3-Set-directories) # * [2.4 Set up a log](#2.4-Set-up-a-log) # * [2.5 Select timerange](#2.5-Select-timerange) # * [2.6 Select download source](#2.6-Select-download-source) # * [2.7 Select household subset](#2.7-Select-household-subset) # * [3. Download](#3.-Download) # * [4. Read](#4.-Read) # * [4.1 Preparations](#4.1-Preparations) # * [4.2 Reading loop](#4.2-Reading-loop) # * [4.3 Save raw data](#4.3-Save-raw-data) # * [5. Processing](#5.-Processing) # * [5.1 Validate Data](#5.1-Validate-Data) # * [5.2 Aggregate Index equidistantly](#5.2-Aggregate-Index-equidistantly) # * [5.3 Missing Data Handling](#5.3-Missing-Data-Handling) # * [5.4 Aggregate hourly and 15min interval data](#5.4-Aggregate-hourly-and-15min-interval-data) # * [5.5 Insert a column with Central European Time](#5.5-Insert-a-column-with-Central-European-Time) # * [6. Create metadata](#6.-Create-metadata) # * [7. Write data to disk](#7.-Write-data-to-disk) # * [7.1 Limit time range](#7.1-Limit-time-range) # * [7.2 Convert the Central European Time column to ISO-8601](#7.2-Convert-the-Central-European-Time-column-to-ISO-8601) # * [7.3 Different shapes](#7.3-Different-shapes) # * [7.4 Write to SQL-database](#7.4-Write-to-SQL-database) # * [7.5 Write to Excel](#7.5-Write-to-Excel) # * [7.6 Write to CSV](#7.6-Write-to-CSV) # * [7.7 Write checksums.txt](#7.7-Write-checksums.txt) # # 1. Introductory Notes # # This Notebook handles missing data, performs calculations and aggragations and creates the output files. # # # 2. Settings # # ## 2.1 Set version number and recent changes # Executing this script till the end will create a new version of the data package. # The Version number specifies the local directory for the data
# We include a note on what has been changed. # In[ ]: version = '2017-11-10' changes = 'Initial release of all CoSSMic households' # ## 2.2 Import Python libraries # In[ ]: # Python modules from datetime import datetime, date, timedelta, time import pandas as pd import numpy as np import logging import json import sqlite3 import yaml import itertools import os import pytz import hashlib from shutil import copyfile # Scripts from household repository package from household.download import download from household.read import read from household.validation import validate from household.imputation import make_equidistant from household.imputation import find_nan from household.imputation import resample_markers from household.make_json import make_json # Reload modules with execution of any code, to avoid having to restart # the kernel after editing timeseries_scripts get_ipython().run_line_magic('load_ext', 'autoreload') get_ipython().run_line_magic('autoreload', '2') # ## 2.3 Set directories # In[ ]: # make sure the working directory is this file's directory try: os.chdir(home_path) except NameError: home_path = os.path.realpath('.') households_yaml_path = os.path.join(home_path, 'input', 'households.yml') data_path = os.path.join(home_path, 'household_data', version, 'original_data') out_path = os.path.join(home_path, 'household_data', version) temp_path = os.path.join(home_path, 'household_data', 'temp') os.makedirs(out_path, exist_ok=True) os.makedirs(temp_path, exist_ok=True) # change to temp directory os.chdir(temp_path) os.getcwd() # ## 2.4 Set up a log # In[ ]: FORMAT = '%(asctime)s %(levelname)s %(message)s' DATEFORMAT = '%Y-%m-%d %H:%M:%S' formatter = logging.Formatter(fmt=FORMAT, datefmt=DATEFORMAT) logging.basicConfig(level=logging.INFO, format=FORMAT, datefmt=DATEFORMAT) logfile = logging.FileHandler('log1.log') logfile.setFormatter(formatter) logfile.setLevel(logging.INFO) logger = logging.getLogger(__name__) logger.addHandler(logfile) # For more detailed logging messages, replace 'INFO' with 'DEBUG' # (May slow down computation). logger.setLevel(logging.INFO) # Print out additional CSV files to verify feed integrity verbose = False # ## 2.5 Select timerange # # Select the time range to read and process data.
# *Default: all data.* # # Type `None` to process all available data. # In[ ]: start_from_user = None # i.e. date(2016, 1, 1) end_from_user = None # i.e. date(2016, 1, 31) # ## 2.6 Select download source # # The raw data can be downloaded as a zip file from the OPSD Server. To do this, specify an archive version to use, that has been cached on the OPSD server as input. # In[ ]: archive_version = '2017-11-10' # i.e. '2017-11-10' # ## 2.7 Select household subset # # Optionally, specify a subset of households to process.
# The next cell prints the available sources and datasets. # In[ ]: with open(households_yaml_path, 'r') as f: households = yaml.load(f.read()) for k, v in households.items(): print(yaml.dump({k: list(v['feeds'].keys())}, default_flow_style=False)) # Copy from its output and paste to following cell to get the right format. # # Type `subset = None` to include all data. # In[ ]: subset = yaml.load(''' insert_household_here: - insert_feed1_here - insert_feed2_here more_households_here: - more_feeds_here ''') subset = None # Now eliminate households and feeds not in subset. # In[ ]: if subset: # eliminate households and feeds not in subset households = {household_name: {k: v for k, v in households[household_name].items() for k, v in households[household_name]['feeds'].items() if k in feed_list} for household_name, feed_list in subset.items()} # # 3. Download # # This section: download raw data to process. # # If the original data does not exist, it will be downloaded from the OPSD Server and extracted in a local directory # In[ ]: download(out_path, version=archive_version) # # 4. Read # # This section: Read each downloaded file into a pandas-DataFrame and merge data from different sources if it has the same time resolution. # # ## 4.1 Preparations # Set the title of the rows at the top of the data used to store metadata internally. The order of this list determines the order of the levels in the resulting output. # In[ ]: headers = ['region', 'household', 'type', 'unit', 'feed'] # Create an empty dictionary to be populated by the read function # In[ ]: data_households = {} # ## 4.2 Reading loop # # Loop through households and feeds to do the reading # In[ ]: # For each source in the household dictionary for household_name, household_dict in households.items(): df = read(household_name, household_dict['dir'], household_dict['region'], household_dict['type'], household_dict['feeds'], headers, start_from_user=start_from_user, end_from_user=end_from_user) data_households[household_name] = df # ## 4.3 Save raw data # # Save the DataFrames created by the read function to disk. This way you have the raw data to fall back to if something goes wrong in the remainder of this notebook without having to repeat the previos steps. # In[ ]: for household_name, household_dict in households.items(): data_households[household_name].to_pickle('raw_'+household_dict['dir']+'.pickle') # Load the DataFrames saved above # In[ ]: data_households = {} for household_name, household_dict in households.items(): data_households[household_name] = pd.read_pickle('raw_'+household_dict['dir']+'.pickle') # # 5. Processing # # This section: validate energy data and fix wrongly measured data points. Handle missing data and aggregation to regular and 15 minute intervals. # # ## 5.1 Validate Data # # With the raw data being the unvalidated measurements of a research prototype under development, certain steps should be taken, to remove clearly incorrect measured data points or react to other events, such as the counter reset of an energy meter. # In[ ]: for household_name, household_dict in households.items(): data_households[household_name] = validate(data_households[household_name], household_name, household_dict['feeds'], headers, output=verbose) # ## 5.2 Aggregate Index equidistantly # # Most of the acquired household data comes in irregular time intervals and is highly unsynchronized for different data feeds. # # To improve readability and comparability, regular intervals will be assumed and interpolated according to existing values. Gaps greater than 15 minutes will be left untouched. # In[ ]: data_sets = {} for household_name, household_dict in households.items(): for res_dict in household_dict['resolutions']: res_key = res_dict['key'] df = make_equidistant(data_households[household_name], household_name, res_key, res_dict.get('seconds'), res_dict.get('start'), res_dict.get('end'), household_dict['feeds']) if not res_key in data_sets: data_sets[res_key] = df elif not df.empty: data_sets[res_key] = ( data_sets[res_key] ).combine_first(df) # Save/Load the equidistant data sets # In[ ]: for res_key, df in data_sets.items(): df.to_pickle('fixed_'+res_key+'.pickle') # In[ ]: data_sets = {} data_sets['1min'] = pd.read_pickle('fixed_1min.pickle') data_sets['3min'] = pd.read_pickle('fixed_3min.pickle') # ## 5.3 Missing Data Handling # # Patch missing data. At this stage, only small gaps (up to 2 hours) are filled by linear interpolation. This catched most of the missing data due to daylight savings time transitions or failed radio transmissions, while leaving bigger gaps untouched. # # The exact locations of missing data are stored in the `nan_table` DataFrames. # # Where data has been interpolated, it is marked in a new column `comment`. For eaxample the comment `residential_004_pv;` means that in the original data, there is a gap in the solar generation timeseries from the Resident 4 in the time period where the marker appears. # # Patch the datasets and display the location of missing Data in the original data. # In[ ]: nan_sets = {} for res_key, df in data_sets.items(): data_sets[res_key], nan_table = find_nan(df, headers, res_key, patch=True) nan_sets[res_key] = nan_table # Execute this to see an example of where the data has been patched. # In[ ]: data_sets['3min'][data_sets['3min']['interpolated_values'].notnull()].tail() # In[ ]: data_sets['1min'][data_sets['1min']['interpolated_values'].notnull()].tail() # Display the table of regions of missing values # In[ ]: nan_sets # You can export the NaN-tables to Excel in order to inspect where there are NaNs # In[ ]: writer = pd.ExcelWriter('NaN_table.xlsx') for res_key, df in nan_sets.items(): df.to_excel(writer, res_key) writer.save() # Save/Load the patched data sets # In[ ]: for res_key, df in data_sets.items(): df.to_pickle('patched_'+res_key+'.pickle') # In[ ]: data_sets = {} data_sets['1min'] = pd.read_pickle('patched_1min.pickle') data_sets['3min'] = pd.read_pickle('patched_3min.pickle') # ## 5.4 Aggregate hourly and 15min interval data # # As some data comes in 1-minute, other (older series) in 3-minute intervals, a harmonization can be done. With most billing intervals being at 15-minute and 60-minute ranges, a resampling to those intervals can be done to improve the overview over the data. # # The marker column is resampled separately in such a way that all information on where data has been interpolated is preserved. # # To do this, first combine the patched resolutions again # In[ ]: data_full = None for res_key, df in data_sets.items(): if data_full is None: data_full = df elif not df.empty: data_full = data_full.combine_first(df) # Afterwards, condense the marker column to both 15 and 60 minutes resolution, resample the series and update the condensed markers # In[ ]: minute = data_full.index[0].minute + (15 - data_full.index[0].minute % 15) hour = data_full.index[0].hour if(minute > 59): minute = 0 hour += 1 start_15 = data_full.index[0].replace(hour=hour, minute=minute, second=0) minute = data_full.index[-1].minute + (15 - data_full.index[-1].minute % 15) hour = data_full.index[-1].hour if(minute > 59): minute = 0 hour += 1 end_15 = data_full.index[-1].replace(hour=hour, minute=minute, second=0) index_15 = pd.DatetimeIndex(start=start_15, end=end_15, freq='15min') marker_15 = data_full['interpolated_values'].groupby( pd.Grouper(freq='15min', closed='left', label='left') ).agg(resample_markers).reindex(index_15) data_sets['15min'] = data_full.resample('15min').last() data_sets['15min']['interpolated_values'] = marker_15 # In[ ]: start_60 = data_full.index[0].replace(minute=0, second=0) end_60 = data_full.index[-1].replace(minute=0, second=0) index_60 = pd.DatetimeIndex(start=start_60, end=end_60, freq='60min') marker_60 = data_full['interpolated_values'].groupby( pd.Grouper(freq='60min', closed='left', label='left') ).agg(resample_markers).reindex(index_60) data_sets['60min'] = data_full.resample('60min').last() data_sets['60min']['interpolated_values'] = marker_60 # ## 5.5 Insert a column with Central European Time # # The index column of th data sets defines the start of the timeperiod represented by each row of that data set in **UTC** time. We include an additional column for the **CE(S)T** Central European (Summer-) Time, as this might help aligning the output data with other data sources. # In[ ]: info_cols = {'utc': 'utc_timestamp', 'cet': 'cet_cest_timestamp', 'marker': 'interpolated_values'} # In[ ]: for res_key, df in data_sets.items(): if df.empty: continue df.index.rename(info_cols['utc'], inplace=True) df.insert(0, info_cols['cet'], df.index.tz_localize('UTC').tz_convert('Europe/Brussels')) # Create a final savepoint # In[ ]: for res_key, df in data_sets.items(): df.to_pickle('final_'+res_key+'.pickle') # In[ ]: data_sets = {} data_sets['1min'] = pd.read_pickle('final_1min.pickle') data_sets['3min'] = pd.read_pickle('final_3min.pickle') data_sets['15min'] = pd.read_pickle('final_15min.pickle') data_sets['60min'] = pd.read_pickle('final_60min.pickle') # ## 6. Create metadata # # This section: create the metadata, both general and column-specific. All metadata we be stored as a JSON file. # In[ ]: # change to out_path directory os.chdir(out_path) os.getcwd() # In[ ]: make_json(data_sets, info_cols, version, changes, headers) # # 7. Write data to disk # # This section: Save as [Data Package](http://data.okfn.org/doc/tabular-data-package) (data in CSV, metadata in JSON file). All files are saved in the directory of this notebook. Alternative file formats (SQL, XLSX) are also exported. Takes about 1 hour to run. # # ## 7.1 Limit time range # # Cut off the data outside of [start_from_user:end_from_user] # In[ ]: for res_key, df in data_sets.items(): # First, convert userinput to UTC time to conform with data_set.index if start_from_user: start_from_user = ( pytz.timezone('Europe/Brussels') .localize(datetime.combine(start_from_user, time())) .astimezone(pytz.timezone('UTC'))) if end_from_user: end_from_user = ( pytz.timezone('Europe/Brussels') .localize(datetime.combine(end_from_user, time())) .astimezone(pytz.timezone('UTC')) # appropriate offset to inlude the end of period + timedelta(days=1, minutes=-int(res_key[:2]))) # Then cut off the data_set data_sets[res_key] = df.loc[start_from_user:end_from_user, :] # ## 7.2 Convert the Central European Time column to ISO-8601 # # Convert the UTC, as well as the Central European Time indexes to ISO-8601 # In[ ]: for res_key, df in data_sets.items(): if df.empty: continue df.iloc[:,0] = df.iloc[:,0].dt.strftime('%Y-%m-%dT%H:%M:%S%z') # ## 7.3 Different shapes # # Data are provided in three different "shapes": # - SingleIndex (easy to read for humans, compatible with datapackage standard, small file size) # - Fileformat: CSV, SQLite # - MultiIndex (easy to read into GAMS, not compatible with datapackage standard, small file size) # - Fileformat: CSV, Excel # - Stacked (compatible with data package standard, large file size, many rows, too many for Excel) # - Fileformat: CSV # # The different shapes need to be created internally befor they can be saved to files. Takes about 1 minute to run. # In[ ]: data_sets_singleindex = {} data_sets_multiindex = {} data_sets_stacked = {} for res_key, df in data_sets.items(): if df.empty: continue # MultIndex data_sets_multiindex[res_key + '_multiindex'] = df # SingleIndex df_singleindex = df.copy() # use first 3 levels of multiindex to create singleindex df_singleindex.columns = [ col[0] if col[0] in info_cols.values() else next(iter([l for l in col if l in df.columns.get_level_values('region')] or []), None) + \ '_' + next(iter([l for l in col if l in df.columns.get_level_values('household')] or []), None) + \ '_' + next(iter([l for l in col if l in df.columns.get_level_values('feed')] or []), None) for col in df.columns.values] data_sets_singleindex[res_key + '_singleindex'] = df_singleindex # Stacked stacked = df.copy() stacked.drop(info_cols['cet'], axis=1, inplace=True) stacked.columns = stacked.columns.droplevel(['region', 'type', 'unit']) stacked = stacked.transpose().stack(dropna=True).to_frame(name='data') data_sets_stacked[res_key + '_stacked'] = stacked # ## 7.4 Write to SQL-database # # This file format is required for the filtering function on the OPSD website. This takes about 30 seconds to complete. # In[ ]: for res_key, df in data_sets_singleindex.items(): if df.empty: continue df_sql = df.copy() df_sql.index = df_sql.index.strftime('%Y-%m-%dT%H:%M:%SZ') table = 'household_data_' + res_key df_sql.to_sql(table, sqlite3.connect('household_data.sqlite'), if_exists='replace', index_label=info_cols['utc']) # ## 7.5 Write to Excel # # Writing the full tables to Excel takes extremely long. As a workaround, only the first 5 rows are exported. The rest of the data can than be inserted manually from the `_multindex.csv` files. # In[ ]: writer = pd.ExcelWriter('household_data.xlsx') for res_key, df in data_sets_multiindex.items(): if df.empty: continue df_csv = df.copy() df_csv.index = df_csv.index.strftime('%Y-%m-%dT%H:%M:%SZ') df_csv.to_excel(writer, res_key.split('_')[0], float_format='%.3f', merge_cells=True) writer.save() # ## 7.6 Write to CSV # In[ ]: # itertoools.chain() allows iterating over multiple dicts at once for res_key, df in itertools.chain( data_sets_singleindex.items(), data_sets_multiindex.items(), data_sets_stacked.items() ): if df.empty: continue filename = 'household_data_' + res_key + '.csv' df.to_csv(filename, float_format='%.3f', date_format='%Y-%m-%dT%H:%M:%SZ') # In[ ]: if verbose: from household.tools import derive_power for res_key, df in data_sets_singleindex.items(): df_power = df.copy() for col_name, col in df_power.iteritems(): if not (col_name in info_cols.values()): feed_power = derive_power(pd.DataFrame(col)) df_power[col_name] = feed_power.iloc[:,0] filename = os.path.join(temp_path, 'household_data_' + res_key.split('_')[0] + '_power.csv') df_power.to_csv(filename, float_format='%.3f', date_format='%Y-%m-%dT%H:%M:%SZ') # ## 7.7 Write checksums.txt # # We publish SHA-checksums for the outputfiles on GitHub to allow verifying the integrity of outputfiles on the OPSD server. # In[ ]: def get_sha_hash(path, blocksize=65536): sha_hasher = hashlib.sha256() with open(path, 'rb') as f: buffer = f.read(blocksize) while len(buffer) > 0: sha_hasher.update(buffer) buffer = f.read(blocksize) return sha_hasher.hexdigest() files = os.listdir(out_path) # Create checksums.txt in the output directory with open('checksums.txt', 'w') as f: for file_name in files: if file_name.split('.')[-1] in ['csv', 'sqlite', 'xlsx']: file_hash = get_sha_hash(file_name) f.write('{},{}\n'.format(file_name, file_hash)) # Copy the file to root directory from where it will be pushed to GitHub, # leaving a copy in the version directory for reference copyfile('checksums.txt', os.path.join(home_path, 'checksums.txt'))