Household data: Processing Notebook
This Notebook is part of the Household Data Package of Open Power System Data.

1. Introductory Notes

This Notebook handles missing data, performs calculations and aggragations and creates the output files.

2. Settings

2.1 Set version number and recent changes

Executing this script till the end will create a new version of the data package. The Version number specifies the local directory for the data
We include a note on what has been changed.

In [ ]:
version = '2017-11-10'
changes = 'Initial release of all CoSSMic households'

2.2 Import Python libraries

In [ ]:
# Python modules
from datetime import datetime, date, timedelta, time
import pandas as pd
import numpy as np
import logging
import json
import sqlite3
import yaml
import itertools
import os
import pytz
import hashlib
from shutil import copyfile

# Scripts from household repository package
from household.download import download
from household.read import read
from household.validation import validate
from household.imputation import make_equidistant
from household.imputation import find_nan
from household.imputation import resample_markers
from household.make_json import make_json

# Reload modules with execution of any code, to avoid having to restart
# the kernel after editing timeseries_scripts
%load_ext autoreload
%autoreload 2

2.3 Set directories

In [ ]:
# make sure the working directory is this file's directory
try:
    os.chdir(home_path)
except NameError:
    home_path = os.path.realpath('.')

households_yaml_path = os.path.join(home_path, 'input', 'households.yml')
data_path = os.path.join(home_path, 'household_data', version, 'original_data')
out_path = os.path.join(home_path, 'household_data', version)
temp_path = os.path.join(home_path, 'household_data', 'temp')
os.makedirs(out_path, exist_ok=True)
os.makedirs(temp_path, exist_ok=True)
# change to temp directory
os.chdir(temp_path)
os.getcwd()

2.4 Set up a log

In [ ]:
FORMAT = '%(asctime)s %(levelname)s %(message)s'
DATEFORMAT = '%Y-%m-%d %H:%M:%S'
formatter = logging.Formatter(fmt=FORMAT, datefmt=DATEFORMAT)
logging.basicConfig(level=logging.INFO,
                    format=FORMAT,
                    datefmt=DATEFORMAT)

logfile = logging.FileHandler('log1.log')
logfile.setFormatter(formatter)
logfile.setLevel(logging.INFO)
logger = logging.getLogger(__name__)
logger.addHandler(logfile)

# For more detailed logging messages, replace 'INFO' with 'DEBUG'
# (May slow down computation).
logger.setLevel(logging.INFO)

# Print out additional CSV files to verify feed integrity
verbose = False

2.5 Select timerange

Select the time range to read and process data.
Default: all data.

Type None to process all available data.

In [ ]:
start_from_user = None  # i.e. date(2016, 1, 1)
end_from_user = None  # i.e. date(2016, 1, 31)

2.6 Select download source

The raw data can be downloaded as a zip file from the OPSD Server. To do this, specify an archive version to use, that has been cached on the OPSD server as input.

In [ ]:
archive_version = '2017-11-10' # i.e. '2017-11-10'

2.7 Select household subset

Optionally, specify a subset of households to process.
The next cell prints the available sources and datasets.

In [ ]:
with open(households_yaml_path, 'r') as f:
    households = yaml.load(f.read())
for k, v in households.items():
    print(yaml.dump({k: list(v['feeds'].keys())}, default_flow_style=False))

Copy from its output and paste to following cell to get the right format.

Type subset = None to include all data.

In [ ]:
subset = yaml.load('''
    insert_household_here:
    - insert_feed1_here
    - insert_feed2_here
    more_households_here:
    - more_feeds_here
    ''')

subset = None

Now eliminate households and feeds not in subset.

In [ ]:
if subset:  # eliminate households and feeds not in subset
    households = {household_name: {k: v
                                   for k, v in households[household_name].items()
                                   for k, v in households[household_name]['feeds'].items()
                                   if k in feed_list}
                  for household_name, feed_list in subset.items()}

3. Download

This section: download raw data to process.

If the original data does not exist, it will be downloaded from the OPSD Server and extracted in a local directory

In [ ]:
download(out_path, version=archive_version)

4. Read

This section: Read each downloaded file into a pandas-DataFrame and merge data from different sources if it has the same time resolution.

4.1 Preparations

Set the title of the rows at the top of the data used to store metadata internally. The order of this list determines the order of the levels in the resulting output.

In [ ]:
headers = ['region', 'household', 'type', 'unit', 'feed']

Create an empty dictionary to be populated by the read function

In [ ]:
data_households = {}

4.2 Reading loop

Loop through households and feeds to do the reading

In [ ]:
# For each source in the household dictionary
for household_name, household_dict in households.items():
    df = read(household_name, household_dict['dir'], household_dict['region'], household_dict['type'], 
              household_dict['feeds'], headers, 
              start_from_user=start_from_user,
              end_from_user=end_from_user)
    
    data_households[household_name] = df

4.3 Save raw data

Save the DataFrames created by the read function to disk. This way you have the raw data to fall back to if something goes wrong in the remainder of this notebook without having to repeat the previos steps.

In [ ]:
for household_name, household_dict in households.items():
    data_households[household_name].to_pickle('raw_'+household_dict['dir']+'.pickle')

Load the DataFrames saved above

In [ ]:
data_households = {}
for household_name, household_dict in households.items():
    data_households[household_name] = pd.read_pickle('raw_'+household_dict['dir']+'.pickle')

5. Processing

This section: validate energy data and fix wrongly measured data points. Handle missing data and aggregation to regular and 15 minute intervals.

5.1 Validate Data

With the raw data being the unvalidated measurements of a research prototype under development, certain steps should be taken, to remove clearly incorrect measured data points or react to other events, such as the counter reset of an energy meter.

In [ ]:
for household_name, household_dict in households.items():
    data_households[household_name] = validate(data_households[household_name], household_name, 
                                               household_dict['feeds'], headers, output=verbose)

5.2 Aggregate Index equidistantly

Most of the acquired household data comes in irregular time intervals and is highly unsynchronized for different data feeds.

To improve readability and comparability, regular intervals will be assumed and interpolated according to existing values. Gaps greater than 15 minutes will be left untouched.

In [ ]:
data_sets = {}
for household_name, household_dict in households.items():
    for res_dict in household_dict['resolutions']:
        res_key = res_dict['key']
        
        df = make_equidistant(data_households[household_name], household_name, 
                              res_key, res_dict.get('seconds'), 
                              res_dict.get('start'), res_dict.get('end'), 
                              household_dict['feeds'])
        
        if not res_key in data_sets:
            data_sets[res_key] = df
        elif not df.empty:
            data_sets[res_key] = (
                data_sets[res_key]
            ).combine_first(df)

Save/Load the equidistant data sets

In [ ]:
for res_key, df in data_sets.items():
    df.to_pickle('fixed_'+res_key+'.pickle')
In [ ]:
data_sets = {}
data_sets['1min'] = pd.read_pickle('fixed_1min.pickle')
data_sets['3min'] = pd.read_pickle('fixed_3min.pickle')

5.3 Missing Data Handling

Patch missing data. At this stage, only small gaps (up to 2 hours) are filled by linear interpolation. This catched most of the missing data due to daylight savings time transitions or failed radio transmissions, while leaving bigger gaps untouched.

The exact locations of missing data are stored in the nan_table DataFrames.

Where data has been interpolated, it is marked in a new column comment. For eaxample the comment residential_004_pv; means that in the original data, there is a gap in the solar generation timeseries from the Resident 4 in the time period where the marker appears.

Patch the datasets and display the location of missing Data in the original data.

In [ ]:
nan_sets = {}
for res_key, df in data_sets.items():
    data_sets[res_key], nan_table = find_nan(df, headers, res_key, patch=True)
    
    nan_sets[res_key] = nan_table

Execute this to see an example of where the data has been patched.

In [ ]:
data_sets['3min'][data_sets['3min']['interpolated_values'].notnull()].tail()
In [ ]:
data_sets['1min'][data_sets['1min']['interpolated_values'].notnull()].tail()

Display the table of regions of missing values

In [ ]:
nan_sets

You can export the NaN-tables to Excel in order to inspect where there are NaNs

In [ ]:
writer = pd.ExcelWriter('NaN_table.xlsx')
for res_key, df in nan_sets.items():
    df.to_excel(writer, res_key)
writer.save()

Save/Load the patched data sets

In [ ]:
for res_key, df in data_sets.items():
    df.to_pickle('patched_'+res_key+'.pickle')
In [ ]:
data_sets = {}
data_sets['1min'] = pd.read_pickle('patched_1min.pickle')
data_sets['3min'] = pd.read_pickle('patched_3min.pickle')

5.4 Aggregate hourly and 15min interval data

As some data comes in 1-minute, other (older series) in 3-minute intervals, a harmonization can be done. With most billing intervals being at 15-minute and 60-minute ranges, a resampling to those intervals can be done to improve the overview over the data.

The marker column is resampled separately in such a way that all information on where data has been interpolated is preserved.

To do this, first combine the patched resolutions again

In [ ]:
data_full = None
for res_key, df in data_sets.items():
    if data_full is None:
        data_full = df
    elif not df.empty:
        data_full = data_full.combine_first(df)

Afterwards, condense the marker column to both 15 and 60 minutes resolution, resample the series and update the condensed markers

In [ ]:
minute = data_full.index[0].minute + (15 - data_full.index[0].minute % 15)
hour = data_full.index[0].hour
if(minute > 59):
    minute = 0
    hour += 1
start_15 = data_full.index[0].replace(hour=hour, minute=minute, second=0)

minute = data_full.index[-1].minute + (15 - data_full.index[-1].minute % 15)
hour = data_full.index[-1].hour
if(minute > 59):
    minute = 0
    hour += 1
end_15 = data_full.index[-1].replace(hour=hour, minute=minute, second=0)

index_15 = pd.DatetimeIndex(start=start_15, end=end_15, freq='15min')
marker_15 = data_full['interpolated_values'].groupby(
    pd.Grouper(freq='15min', closed='left', label='left')
    ).agg(resample_markers).reindex(index_15)

data_sets['15min'] = data_full.resample('15min').last()
data_sets['15min']['interpolated_values'] = marker_15
In [ ]:
start_60 = data_full.index[0].replace(minute=0, second=0)
end_60 = data_full.index[-1].replace(minute=0, second=0)
index_60 = pd.DatetimeIndex(start=start_60, end=end_60, freq='60min')
marker_60 = data_full['interpolated_values'].groupby(
    pd.Grouper(freq='60min', closed='left', label='left')
    ).agg(resample_markers).reindex(index_60)

data_sets['60min'] = data_full.resample('60min').last()
data_sets['60min']['interpolated_values'] = marker_60

5.5 Insert a column with Central European Time

The index column of th data sets defines the start of the timeperiod represented by each row of that data set in UTC time. We include an additional column for the CE(S)T Central European (Summer-) Time, as this might help aligning the output data with other data sources.

In [ ]:
info_cols = {'utc': 'utc_timestamp',
             'cet': 'cet_cest_timestamp',
             'marker': 'interpolated_values'}
In [ ]:
for res_key, df in data_sets.items():
    if df.empty:
        continue
    df.index.rename(info_cols['utc'], inplace=True)
    df.insert(0, info_cols['cet'],
              df.index.tz_localize('UTC').tz_convert('Europe/Brussels'))

Create a final savepoint

In [ ]:
for res_key, df in data_sets.items():
    df.to_pickle('final_'+res_key+'.pickle')
In [ ]:
data_sets = {}
data_sets['1min'] = pd.read_pickle('final_1min.pickle')
data_sets['3min'] = pd.read_pickle('final_3min.pickle')
data_sets['15min'] = pd.read_pickle('final_15min.pickle')
data_sets['60min'] = pd.read_pickle('final_60min.pickle')

6. Create metadata

This section: create the metadata, both general and column-specific. All metadata we be stored as a JSON file.

In [ ]:
# change to out_path directory
os.chdir(out_path)
os.getcwd()
In [ ]:
make_json(data_sets, info_cols, version, changes, headers)

7. Write data to disk

This section: Save as Data Package (data in CSV, metadata in JSON file). All files are saved in the directory of this notebook. Alternative file formats (SQL, XLSX) are also exported. Takes about 1 hour to run.

7.1 Limit time range

Cut off the data outside of [start_from_user:end_from_user]

In [ ]:
for res_key, df in data_sets.items():
    # First, convert userinput to UTC time to conform with data_set.index
    if start_from_user:
        start_from_user = (
            pytz.timezone('Europe/Brussels')
            .localize(datetime.combine(start_from_user, time()))
            .astimezone(pytz.timezone('UTC')))
    if end_from_user:
        end_from_user = (
            pytz.timezone('Europe/Brussels')
            .localize(datetime.combine(end_from_user, time()))
            .astimezone(pytz.timezone('UTC'))
            # appropriate offset to inlude the end of period
            + timedelta(days=1, minutes=-int(res_key[:2])))
    # Then cut off the data_set
    data_sets[res_key] = df.loc[start_from_user:end_from_user, :]

7.2 Convert the Central European Time column to ISO-8601

Convert the UTC, as well as the Central European Time indexes to ISO-8601

In [ ]:
for res_key, df in data_sets.items():
    if df.empty:
        continue
    
    df.iloc[:,0] = df.iloc[:,0].dt.strftime('%Y-%m-%dT%H:%M:%S%z')

7.3 Different shapes

Data are provided in three different "shapes":

  • SingleIndex (easy to read for humans, compatible with datapackage standard, small file size)
    • Fileformat: CSV, SQLite
  • MultiIndex (easy to read into GAMS, not compatible with datapackage standard, small file size)
    • Fileformat: CSV, Excel
  • Stacked (compatible with data package standard, large file size, many rows, too many for Excel)
    • Fileformat: CSV

The different shapes need to be created internally befor they can be saved to files. Takes about 1 minute to run.

In [ ]:
data_sets_singleindex = {}
data_sets_multiindex = {}
data_sets_stacked = {}
for res_key, df in data_sets.items():
    if df.empty:
        continue

    # MultIndex
    data_sets_multiindex[res_key + '_multiindex'] = df
    
    # SingleIndex
    df_singleindex = df.copy()
    # use first 3 levels of multiindex to create singleindex
    df_singleindex.columns = [
        col[0] if col[0] in info_cols.values()
        else next(iter([l for l in col if l in df.columns.get_level_values('region')] or []), None) + \
                '_' + next(iter([l for l in col if l in df.columns.get_level_values('household')] or []), None) + \
                '_' + next(iter([l for l in col if l in df.columns.get_level_values('feed')] or []), None)
        for col in df.columns.values]

    data_sets_singleindex[res_key + '_singleindex'] = df_singleindex

    # Stacked
    stacked = df.copy()
    stacked.drop(info_cols['cet'], axis=1, inplace=True)
    stacked.columns = stacked.columns.droplevel(['region', 'type', 'unit'])
    stacked = stacked.transpose().stack(dropna=True).to_frame(name='data')
    data_sets_stacked[res_key + '_stacked'] = stacked

7.4 Write to SQL-database

This file format is required for the filtering function on the OPSD website. This takes about 30 seconds to complete.

In [ ]:
for res_key, df in data_sets_singleindex.items():
    if df.empty:
        continue
    
    df_sql = df.copy()
    df_sql.index = df_sql.index.strftime('%Y-%m-%dT%H:%M:%SZ')
    
    table = 'household_data_' + res_key
    df_sql.to_sql(table, sqlite3.connect('household_data.sqlite'),
                  if_exists='replace', index_label=info_cols['utc'])

7.5 Write to Excel

Writing the full tables to Excel takes extremely long. As a workaround, only the first 5 rows are exported. The rest of the data can than be inserted manually from the _multindex.csv files.

In [ ]:
writer = pd.ExcelWriter('household_data.xlsx')
for res_key, df in data_sets_multiindex.items():
    if df.empty:
        continue
        
    df_csv = df.copy()
    df_csv.index = df_csv.index.strftime('%Y-%m-%dT%H:%M:%SZ')
    
    df_csv.to_excel(writer, res_key.split('_')[0], float_format='%.3f',
                    merge_cells=True)
writer.save()

7.6 Write to CSV

In [ ]:
# itertoools.chain() allows iterating over multiple dicts at once
for res_key, df in itertools.chain(
        data_sets_singleindex.items(),
        data_sets_multiindex.items(),
        data_sets_stacked.items()
    ):
    if df.empty:
        continue
    
    filename = 'household_data_' + res_key + '.csv'
    df.to_csv(filename, float_format='%.3f',
              date_format='%Y-%m-%dT%H:%M:%SZ')
In [ ]:
if verbose:
    from household.tools import derive_power
    
    for res_key, df in data_sets_singleindex.items():
        df_power = df.copy()
        for col_name, col in df_power.iteritems():
            if not (col_name in info_cols.values()):
                feed_power = derive_power(pd.DataFrame(col))
                df_power[col_name] = feed_power.iloc[:,0]
        
        filename = os.path.join(temp_path, 'household_data_' + res_key.split('_')[0] + '_power.csv')
        df_power.to_csv(filename, float_format='%.3f', 
                        date_format='%Y-%m-%dT%H:%M:%SZ')

7.7 Write checksums.txt

We publish SHA-checksums for the outputfiles on GitHub to allow verifying the integrity of outputfiles on the OPSD server.

In [ ]:
def get_sha_hash(path, blocksize=65536):
    sha_hasher = hashlib.sha256()
    with open(path, 'rb') as f:
        buffer = f.read(blocksize)
        while len(buffer) > 0:
            sha_hasher.update(buffer)
            buffer = f.read(blocksize)
        return sha_hasher.hexdigest()

files = os.listdir(out_path)

# Create checksums.txt in the output directory
with open('checksums.txt', 'w') as f:
    for file_name in files:
        if file_name.split('.')[-1] in ['csv', 'sqlite', 'xlsx']:
            file_hash = get_sha_hash(file_name)
            f.write('{},{}\n'.format(file_name, file_hash))

# Copy the file to root directory from where it will be pushed to GitHub,
# leaving a copy in the version directory for reference
copyfile('checksums.txt', os.path.join(home_path, 'checksums.txt'))