Household data: Processing Notebook
This Notebook is part of the Household Data Package of Open Power System Data. |
This Notebook handles missing data, performs calculations and aggragations and creates the output files.
Executing this script till the end will create a new version of the data package.
The Version number specifies the local directory for the data
We include a note on what has been changed.
version = '2017-08-01'
changes = '''Initial upload'''
# Python modules
from datetime import datetime, date, timedelta, time
import pandas as pd
import numpy as np
import logging
import json
import sqlite3
import yaml
import itertools
import os
import pytz
import hashlib
from shutil import copyfile
# Scripts from household repository package
from household.download import download
from household.read import read
from household.validation import validate
from household.imputation import make_equidistant
from household.imputation import find_nan
from household.imputation import resample_markers
from household.make_json import make_json
# Reload modules with execution of any code, to avoid having to restart
# the kernel after editing timeseries_scripts
%load_ext autoreload
%autoreload 2
# make sure the working directory is this file's directory
try:
os.chdir(home_path)
except NameError:
home_path = os.path.realpath('.')
households_yaml_path = os.path.join(home_path, 'input', 'households.yml')
data_path = os.path.join(home_path, 'household_data', version, 'original_data')
out_path = os.path.join(home_path, 'household_data', version)
temp_path = os.path.join(home_path, 'household_data', 'temp')
os.makedirs(out_path, exist_ok=True)
os.makedirs(temp_path, exist_ok=True)
# change to temp directory
os.chdir(temp_path)
os.getcwd()
FORMAT = '%(asctime)s %(levelname)s %(message)s'
DATEFORMAT = '%Y-%m-%d %H:%M:%S'
formatter = logging.Formatter(fmt=FORMAT, datefmt=DATEFORMAT)
logging.basicConfig(level=logging.INFO,
format=FORMAT,
datefmt=DATEFORMAT)
logfile = logging.FileHandler('log1.log')
logfile.setFormatter(formatter)
logfile.setLevel(logging.INFO)
logger = logging.getLogger(__name__)
logger.addHandler(logfile)
logger.setLevel(logging.INFO)
# For more detailed logging messages, replace 'INFO' with 'DEBUG'
# (May slow down computation).
Select the time range to read and process data.
Default: all data.
Type None
to process all available data.
start_from_user = None # i.e. date(2016, 1, 1)
end_from_user = None # i.e. date(2016, 1, 31)
The raw data can be downloaded as a zip file from the OPSD Server. To do this, specify an archive version to use, that has been cached on the OPSD server as input.
archive_version = '2017-06-30' # i.e. '2017-06-30'
Optionally, specify a subset of households to process.
The next cell prints the available sources and datasets.
with open(households_yaml_path, 'r') as f:
households = yaml.load(f.read())
for k, v in households.items():
print(yaml.dump({k: list(v['feeds'].keys())}, default_flow_style=False))
Copy from its output and paste to following cell to get the right format.
Type subset = None
to include all data.
subset = yaml.load('''
insert_household_here:
- insert_feed1_here
- insert_feed2_here
more_households_here:
- more_feeds_here
''')
subset = None
Now eliminate households and feeds not in subset.
if subset: # eliminate households and feeds not in subset
households = {household_name: {k: v
for k, v in households[household_name].items()
for k, v in households[household_name]['feeds'].items()
if k in feed_list}
for household_name, feed_list in subset.items()}
This section: download raw data to process.
If the original data does not exist, it will be downloaded from the OPSD Server and extracted in a local directory
download(out_path, version=archive_version)
This section: Read each downloaded file into a pandas-DataFrame and merge data from different sources if it has the same time resolution.
Set the title of the rows at the top of the data used to store metadata internally. The order of this list determines the order of the levels in the resulting output.
headers = ['region', 'household', 'type', 'unit', 'feed']
Create an empty dictionary to be populated by the read function
data_households = {}
Loop through households and feeds to do the reading
# For each source in the household dictionary
for household_name, household_dict in households.items():
df = read(household_name, household_dict['dir'], household_dict['region'], household_dict['type'],
household_dict['feeds'], headers,
start_from_user=start_from_user,
end_from_user=end_from_user)
data_households[household_name] = df
Save the DataFrames created by the read function to disk. This way you have the raw data to fall back to if something goes wrong in the remainder of this notebook without having to repeat the previos steps.
for household_name, household_dict in households.items():
data_households[household_name].to_pickle('raw_'+household_dict['dir']+'.pickle')
Load the DataFrames saved above
data_households = {}
for household_name, household_dict in households.items():
data_households[household_name] = pd.read_pickle('raw_'+household_dict['dir']+'.pickle')
This section: validate energy data and fix wrongly measured data points. Handle missing data and aggregation to regular and 15 minute intervals.
With the raw data being the unvalidated measurements of a research prototype under development, certain steps should be taken, to remove clearly incorrect measured data points or react to other events, such as the counter reset of an energy meter.
for household_name, household_dict in households.items():
data_households[household_name] = validate(data_households[household_name], household_name,
household_dict['feeds'], headers)
Most of the acquired household data comes in irregular time intervals and is highly unsynchronized for different data feeds.
To improve readability and comparability, regular intervals will be assumed and interpolated according to existing values. Gaps greater than 15 minutes will be left untouched.
data_sets = {}
for household_name, household_dict in households.items():
for res_dict in household_dict['resolutions']:
res_key = res_dict['key']
df = make_equidistant(data_households[household_name], household_name,
res_key, res_dict.get('seconds'),
res_dict.get('start'), res_dict.get('end'),
household_dict['feeds'])
if not res_key in data_sets:
data_sets[res_key] = df
elif not df.empty:
data_sets[res_key] = (
data_sets[res_key]
).combine_first(df)
Save/Load the equidistant data sets
for res_key, df in data_sets.items():
df.to_pickle('fixed_'+res_key+'.pickle')
data_sets = {}
data_sets['1min'] = pd.read_pickle('fixed_1min.pickle')
data_sets['3min'] = pd.read_pickle('fixed_3min.pickle')
Patch missing data. At this stage, only small gaps (up to 2 hours) are filled by linear interpolation. This catched most of the missing data due to daylight savings time transitions or failed radio transmissions, while leaving bigger gaps untouched.
The exact locations of missing data are stored in the nan_table
DataFrames.
Where data has been interpolated, it is marked in a new column comment
. For eaxample the comment residential_004_pv;
means that in the original data, there is a gap in the solar generation timeseries from the Resident 4 in the time period where the marker appears.
Patch the datasets and display the location of missing Data in the original data.
nan_sets = {}
for res_key, df in data_sets.items():
data_sets[res_key], nan_table = find_nan(df, headers, res_key, patch=True)
nan_sets[res_key] = nan_table
Execute this to see an example of where the data has been patched.
data_sets['3min'][data_sets['3min']['interpolated_values'].notnull()].tail()
data_sets['1min'][data_sets['1min']['interpolated_values'].notnull()].tail()
Display the table of regions of missing values
nan_sets
You can export the NaN-tables to Excel in order to inspect where there are NaNs
writer = pd.ExcelWriter('NaN_table.xlsx')
for res_key, df in nan_sets.items():
df.to_excel(writer, res_key)
writer.save()
Save/Load the patched data sets
for res_key, df in data_sets.items():
df.to_pickle('patched_'+res_key+'.pickle')
data_sets = {}
data_sets['1min'] = pd.read_pickle('patched_1min.pickle')
data_sets['3min'] = pd.read_pickle('patched_3min.pickle')
As some data comes in 1-minute, other (older series) in 3-minute intervals, a harmonization can be done. With most billing intervals being at 15-minute and 60-minute ranges, a resampling to those intervals can be done to improve the overview over the data.
The marker column is resampled separately in such a way that all information on where data has been interpolated is preserved.
To do this, first combine the patched resolutions again
data_full = None
for res_key, df in data_sets.items():
if data_full is None:
data_full = df
elif not df.empty:
data_full = data_full.combine_first(df)
Afterwards, condense the marker column to both 15 and 60 minutes resolution, resample the series and update the condensed markers
start_15 = data_full.index[0].replace(minute=data_full.index[0].minute + (15 - data_full.index[0].minute % 15), second=0)
end_15 = data_full.index[-1].replace(minute=data_full.index[-1].minute - (data_full.index[-1].minute % 15), second=0)
index_15 = pd.DatetimeIndex(start=start_15, end=end_15, freq='15min')
marker_15 = data_full['interpolated_values'].groupby(
pd.Grouper(freq='15min', closed='left', label='left')
).agg(resample_markers).reindex(index_15)
data_sets['15min'] = data_full.resample('15min').last()
data_sets['15min']['interpolated_values'] = marker_15
start_60 = data_full.index[0].replace(minute=0, second=0)
end_60 = data_full.index[-1].replace(minute=0, second=0)
index_60 = pd.DatetimeIndex(start=start_60, end=end_60, freq='60min')
marker_60 = data_full['interpolated_values'].groupby(
pd.Grouper(freq='60min', closed='left', label='left')
).agg(resample_markers).reindex(index_60)
data_sets['60min'] = data_full.resample('60min').last()
data_sets['60min']['interpolated_values'] = marker_60
The index column of th data sets defines the start of the timeperiod represented by each row of that data set in UTC time. We include an additional column for the CE(S)T Central European (Summer-) Time, as this might help aligning the output data with other data sources.
info_cols = {'utc': 'utc_timestamp',
'cet': 'cet_cest_timestamp',
'marker': 'interpolated_values'}
for res_key, df in data_sets.items():
if df.empty:
continue
df.index.rename(info_cols['utc'], inplace=True)
df.insert(0, info_cols['cet'],
df.index.tz_localize('UTC').tz_convert('Europe/Brussels'))
Create a final savepoint
for res_key, df in data_sets.items():
df.to_pickle('final_'+res_key+'.pickle')
data_sets = {}
data_sets['1min'] = pd.read_pickle('final_1min.pickle')
data_sets['3min'] = pd.read_pickle('final_3min.pickle')
data_sets['15min'] = pd.read_pickle('final_15min.pickle')
data_sets['60min'] = pd.read_pickle('final_60min.pickle')
This section: create the metadata, both general and column-specific. All metadata we be stored as a JSON file.
# change to out_path directory
os.chdir(out_path)
os.getcwd()
make_json(data_sets, info_cols, version, changes, headers)
This section: Save as Data Package (data in CSV, metadata in JSON file). All files are saved in the directory of this notebook. Alternative file formats (SQL, XLSX) are also exported. Takes about 1 hour to run.
Cut off the data outside of [start_from_user:end_from_user]
for res_key, df in data_sets.items():
# First, convert userinput to UTC time to conform with data_set.index
if start_from_user:
start_from_user = (
pytz.timezone('Europe/Brussels')
.localize(datetime.combine(start_from_user, time()))
.astimezone(pytz.timezone('UTC')))
if end_from_user:
end_from_user = (
pytz.timezone('Europe/Brussels')
.localize(datetime.combine(end_from_user, time()))
.astimezone(pytz.timezone('UTC'))
# appropriate offset to inlude the end of period
+ timedelta(days=1, minutes=-int(res_key[:2])))
# Then cut off the data_set
data_sets[res_key] = df.loc[start_from_user:end_from_user, :]
Convert the UTC, as well as the Central European Time indexes to ISO-8601
for res_key, df in data_sets.items():
if df.empty:
continue
df.iloc[:,0] = df.iloc[:,0].dt.strftime('%Y-%m-%dT%H:%M:%S%z')
Data are provided in three different "shapes":
The different shapes need to be created internally befor they can be saved to files. Takes about 1 minute to run.
data_sets_singleindex = {}
data_sets_multiindex = {}
data_sets_stacked = {}
for res_key, df in data_sets.items():
if df.empty:
continue
for col_name, col in df.iteritems():
if not (col_name[0] in info_cols.values() or
col_name[2] == 'profile'):
df[col_name] = col.round(0)
# MultIndex
data_sets_multiindex[res_key + '_multiindex'] = df
# SingleIndex
df_singleindex = df.copy()
# use first 3 levels of multiindex to create singleindex
df_singleindex.columns = [
col[0] if col[0] in info_cols.values()
else next(iter([l for l in col if l in df.columns.get_level_values('region')] or []), None) + \
'_' + next(iter([l for l in col if l in df.columns.get_level_values('household')] or []), None) + \
'_' + next(iter([l for l in col if l in df.columns.get_level_values('feed')] or []), None)
for col in df.columns.values]
data_sets_singleindex[res_key + '_singleindex'] = df_singleindex
# Stacked
stacked = df.copy()
stacked.drop(info_cols['cet'], axis=1, inplace=True)
stacked.columns = stacked.columns.droplevel(['region', 'type', 'unit'])
stacked = stacked.transpose().stack(dropna=True).to_frame(name='data')
data_sets_stacked[res_key + '_stacked'] = stacked
This file format is required for the filtering function on the OPSD website. This takes about 30 seconds to complete.
for res_key, df in data_sets_singleindex.items():
if df.empty:
continue
df_sql = df.copy()
df_sql.index = df_sql.index.strftime('%Y-%m-%dT%H:%M:%SZ')
table = 'household_data_' + res_key
df_sql.to_sql(table, sqlite3.connect('household_data.sqlite'),
if_exists='replace', index_label=info_cols['utc'])
Writing the full tables to Excel takes extremely long. As a workaround, only the first 5 rows are exported. The rest of the data can than be inserted manually from the _multindex.csv
files.
writer = pd.ExcelWriter('household_data.xlsx')
for res_key, df in data_sets_multiindex.items():
if df.empty:
continue
df_csv = df.copy()
df_csv.index = df_csv.index.strftime('%Y-%m-%dT%H:%M:%SZ')
df_csv.to_excel(writer, res_key.split('_')[0], float_format='%.3f',
merge_cells=True)
writer.save()
# itertoools.chain() allows iterating over multiple dicts at once
for res_key, df in itertools.chain(
data_sets_singleindex.items(),
data_sets_multiindex.items(),
data_sets_stacked.items()
):
if df.empty:
continue
filename = 'household_data_' + res_key + '.csv'
df.to_csv(filename, float_format='%.3f',
date_format='%Y-%m-%dT%H:%M:%SZ')
We publish SHA-checksums for the outputfiles on GitHub to allow verifying the integrity of outputfiles on the OPSD server.
def get_sha_hash(path, blocksize=65536):
sha_hasher = hashlib.sha256()
with open(path, 'rb') as f:
buffer = f.read(blocksize)
while len(buffer) > 0:
sha_hasher.update(buffer)
buffer = f.read(blocksize)
return sha_hasher.hexdigest()
files = os.listdir(out_path)
# Create checksums.txt in the output directory
with open('checksums.txt', 'w') as f:
for file_name in files:
if file_name.split('.')[-1] in ['csv', 'sqlite', 'xlsx']:
file_hash = get_sha_hash(file_name)
f.write('{},{}\n'.format(file_name, file_hash))
# Copy the file to root directory from where it will be pushed to GitHub,
# leaving a copy in the version directory for reference
copyfile('checksums.txt', os.path.join(home_path, 'checksums.txt'))