Time series: Processing Notebook
This Notebook is part of the Time series Data Package of Open Power System Data. |
This Notebook handles missing data, performs calculations and aggragations and creates the output files.
This section: load libraries and set up a log.
Note that the download module makes use of the pycountry library that is not part of Anaconda. Install it with with pip install pycountry
from the command line.
from datetime import datetime, date, timedelta
import pandas as pd
import numpy as np
import logging
import json
import sqlite3
import yaml
import itertools
import os
import pytz
from timeseries_scripts.read import read
from timeseries_scripts.download import download
from timeseries_scripts.imputation import find_nan
from timeseries_scripts.make_json import make_json
# reload modules with execution of any code, to avoid having to restart
# the kernel after editing timeseries_scripts
%load_ext autoreload
%autoreload 2
sources_yaml_path = 'input/sources.yml'
out_path = 'original_data'
logger = logging.getLogger('log')
# For more detailed logging messages, replace 'INFO' with 'DEBUG'
# (May slow down computation).
logger.setLevel('INFO')
This section: select the time range and the data sources for download and read. Default: all data sources implemented, full time range available.
Source parameters are specified in input/sources.yml, which describes, for each source, the datasets (such as wind and solar generation) alongside all the parameters necessary to execute the downloads.
The option to perform downloading and reading is for testing only. To be able to run the script succesfully until the end, all sources have to be included, or otherwise the script will run into errors (i.e. the step where aggregate German timeseries are caculated requires data from all four German TSOs to be loaded).
Instead of downloading from the sources, the complete raw data can be downloaded as a zip file from the OPSD Server. Advantages are:
# Specify the beginning and end of the interval for which to attempt
# the download. Type None to dowwnload all available data.
start_from_user = None # i.e. date(2016, 1, 1)
end_from_user = date(2016, 9, 30) # i.e. date(2016, 1, 31)
# Specify an archive version to use the raw data from that version that has
# been cached on the OPSD server as input.
# All data from that version will be downloaded - subset will be ignored.
# Type None to download directly from the original sources.
archive_version = None # i.e. '2016-07-14'
Optionally, specify a subset to download/read.
The next cell prints the available sources and datasets.
Copy from its output and paste to following cell to get the right format.
Type subset = None
to include all data.
with open(sources_yaml_path, 'r') as f:
sources = yaml.load(f.read())
for k, v in sources.items():
print(yaml.dump({k: list(v.keys())}, default_flow_style=False))
subset = yaml.load('''
insert_source_here:
- insert_dataset1_from_that_source_here
- insert_dataset2_here
more_sources:
- more_data_sets
''') # Or
subset = None
Read Source parameters, eliminating sources and variables not in subset.
with open(sources_yaml_path, 'r') as f:
sources = yaml.load(f.read())
if subset: # eliminate sources and variables not in subset
sources = {source_name: {k: v
for k, v in sources[source_name].items()
if k in variable_list}
for source_name, variable_list in subset.items()}
This section: download data. Takes about 1 hour to run for the complete data set (subset=None
).
First, a data directory is created on your local computer. Then, download parameters for each data source are defined, including the URL. These parameters are then turned into a YAML-string. Finally, the download is executed file by file.
Each file is saved under it's original filename. Note that the original file names are often not self-explanatory (called "data" or "January"). The files content is revealed by its place in the directory structure.
%%time
logger.setLevel('INFO')
download(sources, out_path,
archive_version=archive_version,
start_from_user=start_from_user,
end_from_user=end_from_user)
Energinet.dk data needs to be downloaded manually from http://www.energinet.dk/en/el/engrosmarked/udtraek-af-markedsdata/Sider/default.aspx
Check The Boxes as specified below, then press the "Get extract"-button at the end of the page.
You will receive a file Market Data.xls
of about 50 MB. Open the file in Excel. There will be a warning from Excel saying that file extension and content are in conflict. Select "open anyways" and and save the file as .xlsx
. This will compress the file by ~80%, resulting in faster processing afterwards.
In order to be found by the read-function, place the downloaded file in the following subdirectory:
\original_data\Energinet.dk\prices_wind_solar\2000-01-01_2016-09-30\
Boxes to check:
Period
Get data from: 01-01-2000 To: Today
all months
Data columns
Elspot Price, Currency Code/MWh
Production and consumption, MWh/h
Data format:
Currency code
Decimal format
Date format
Recieve to
This section: Read each downloaded file into a pandas-DataFrame and merge data from different sources if it has the same time resolution. Takes ~15 minutes to run.
Set the title of the rows at the top of the data used to store metadata internally
headers = ['variable', 'region', 'attribute', 'source', 'web']
Create a dictionary of empty DataFrames to be populated by the rea function
data_sets = {'15min': pd.DataFrame(), '60min': pd.DataFrame()}
Loop through sources and variables to do the reading
%%time
logger.setLevel('INFO')
# For each source in the source dictionary
for source_name, source_dict in sources.items():
# For each variable from source_name
for variable_name, param_dict in source_dict.items():
variable_dir = os.path.join(out_path, source_name, variable_name)
res_key = param_dict['resolution']
url = param_dict['web']
df = read(source_name, variable_name, url, res_key, headers,
out_path='original_data',
start_from_user=start_from_user,
end_from_user=end_from_user)
if data_sets[res_key].empty:
data_sets[res_key] = df
elif not df.empty:
data_sets[res_key] = (
data_sets[res_key]
).combine_first(df)
Display some rows of the dataframes to get a first impression of the data.
data_sets['15min']['2015-07-01 12:00':].head()
data_sets['60min']['2015-07-01 12:00':].head()
Save the DataFrames created by the read function to disk. This way you have the raw data to fall back to if something goes wrong in the ramainder of this notebook without having to repeat the previos steps.
data_sets['15min'].to_pickle('raw_15.pickle')
data_sets['60min'].to_pickle('raw_60.pickle')
Load the DataFrames saved above
data_sets['15min'] = pd.read_pickle('raw_15.pickle')
data_sets['60min'] = pd.read_pickle('raw_60.pickle')
This section: missing data handling, aggregation of sub-national to national data, aggregate 15'-data to 60'-resolution. Takes 30 minutes to run.
Patch missing data. At this stage, only small gaps (up to 2 hours) are filled by linear interpolation. This catched most of the missing data due to daylight savings time transitions, while leaving bigger gaps untouched
The exact locations of missing data are stored in the nan_table
DataFrames.
Where data has been interpolated, it is marked in a new column comment
. For eaxample the comment solar_DE-transnetbw_generation;
means that in the original data, there is a gap in the solar generation timeseries from TransnetBW in the time period where the marker appears.
Patch the datasets and display the location of missing Data in the original data. Takes ~30 minutes to run.
%time data_sets['15min'], nan_table15 = find_nan(data_sets['15min'], headers, patch=True)
%time data_sets['60min'], nan_table60 = find_nan(data_sets['60min'], headers, patch=True)
Save the patched data sets
data_sets['15min'].to_pickle('patched_15.pickle')
data_sets['60min'].to_pickle('patched_60.pickle')
data_sets['15min'] = pd.read_pickle('patched_15.pickle')
data_sets['60min'] = pd.read_pickle('patched_60.pickle')
Display the table of regions of missing values
nan_table15
nan_table60
You can export the NaN-tables to Excel in order to inspect where there are NaNs
writer = pd.ExcelWriter('NaN_table.xlsx')
nan_table15.to_excel(writer, '15min')
nan_table60.to_excel(writer, '60min')
writer.save()
Execute this to see an example of where the data has been patched.
data_sets['15min'][data_sets['15min']['comment'].notnull()].tail()
data_sets['60min'][data_sets['60min']['comment'].notnull()].tail()
For 50 Hertz, it is already in the data. For TenneT, it calculated by substracting offshore from total generation. For Amprion and TransnetBW, onshore wind generation is just total wind generation. Takes <1 second to run.
# Some of the following operations require the Dataframes to be lexsorted in
# the columns
for res_key, df in data_sets.items():
df.sortlevel(axis=1, inplace=True)
for tso in ['DE-amprion', 'DE-tennet', 'DE-transnetbw']:
new_col_header = ('wind-onshore', tso, 'generation',
'own calculation', 'own calculation')
if tso == 'DE-tennet':
offshore = data_sets['15min'].loc[:,('wind-offshore', 'DE-tennet', 'generation')]
else:
offshore = 0
data_sets['15min'][new_col_header] = (
data_sets['15min'][('wind', tso, 'generation')] - offshore)
The wind and solar in-feed data for the 4 German balancing areas is summed up and stored in in new columns, which are then used to calculate profiles, that is, the share of wind/solar capacity producing at a given time. The column headers are created in the fashion introduced in the read script. Takes 5 seconds to run.
for tech in ['solar', 'wind-onshore', 'wind-offshore']:
for attribute in ['generation']: # we could also include 'forecast'
sum_col = pd.Series()
for tso in ['DE-50hertz', 'DE-amprion', 'DE-tennet', 'DE-transnetbw']:
try:
add_col = data_sets['15min'][tech, tso, attribute]
if len(sum_col) == 0:
sum_col = add_col
else:
sum_col = sum_col + add_col.values
except KeyError: # Occurs i.e. for Amprion and TransnetBW, which
# don't have offshore
pass
# Create a new MultiIndex and ppend aggregated data to the dataset
tuples = [(tech, 'DE', attribute, 'own calculation', 'own calculation')]
colnames = pd.MultiIndex.from_tuples(tuples, names=headers)
sum_col.columns = colnames
data_sets['15min'] = data_sets['15min'].combine_first(sum_col)
if not attribute == 'generation':
continue
# Calculate the profile column
profile_col = sum_col.values / data_sets['15min'][tech, 'DE', 'capacity']
# Create a new MultiIndex and ppend aggregated data to the dataset
tuples = [(tech, 'DE', 'profile', 'own calculation', 'own calculation')]
columns = pd.MultiIndex.from_tuples(tuples, names=headers)
profile_col.columns = columns
data_sets['15min'] = data_sets['15min'].combine_first(profile_col)
Some data comes in 15-minute intervals (i.e. German renewable generation), other in 60-minutes (i.e. load data from ENTSO-E and Prices). We resample the 15-minute data to hourly resolution and append it to the 60-minutes dataset.
The marker column is resampled separately in such a way that all information on where data has been interpolated is preserved.
The .resample('H').mean()
methods calculates the means from the values for 4 quarter hours [:00, :15, :30, :45] of an hour values, inserts that for :00 and drops the other 3 entries. Takes 15 seconds to run.
%%time
def resample_markers(group):
'''Resample marker column from 15 to 60 min
Parameters
----------
group: pd.Series
Series of 4 succeeding quarter-hourly values from the marker column
that have to be combined into one.
Returns
----------
aggregated_marker : str or np.nan
If there were any markers in group: the unique values from the marker
column group joined together in one string, np.nan otherwise
'''
if group.notnull().values.any():
unpacked = [mark for line in group if type(line) is str for mark in line.split(';')[:-1]]
aggregated_marker = '; '.join(set(unpacked)) + '; '
else:
aggregated_marker = np.nan
return aggregated_marker
marker_col_15 = data_sets['15min']['comment']
marker_col_15 = marker_col_15.groupby(
pd.Grouper(freq='60Min', closed='left', label='left')).agg(resample_markers)
marker_col_15 = marker_col_15.reindex(data_sets['60min'].index)
data_sets['60min']['comment'] = (
data_sets['60min']['comment']
.str.cat(others=marker_col_15, sep='', na_rep='')
.replace(to_replace='', value=np.nan))
%time
resampled = data_sets['15min'].resample('H').mean()
try:
data_sets['60min'] = data_sets['60min'].combine_first(resampled)
except KeyError:
data_sets['60min'] = resampled
The index column of th data sets defines the start of the timeperiod represented by each row of that data set in UTC time. We include an additional column for the CE(S)T Central European (Summer-) Time, as this might help aligning the output data with other data sources.
info_cols = {'utc': 'utc_timestamp',
'cet':'cet_cest_timestamp',
'marker': 'comment'}
version = '2016-10-28'
for res_key, df in data_sets.items():
if df.empty:
continue
df.index.rename(info_cols['utc'] inplace=True)
df.insert(0, info_cols['cet']
df.index.tz_localize('UTC').tz_convert('Europe/Brussels'))
This section: create the metadata, both general and column-specific. All metadata we be stored as a JSON file. Takes <1s to run.
make_json(data_sets, info_cols, version, headers)
This section: Save as Data Package (data in CSV, metadata in JSON file). All files are saved in the directory of this notebook. Alternative file formats (SQL, XLSX) are also exported. Takes about 1 hour to run.
But first, create a final savepoint
data_sets['15min'].to_pickle('final_15.pickle')
data_sets['60min'].to_pickle('final_60.pickle')
data_sets = {}
data_sets['15min'] = pd.read_pickle('final_15.pickle')
data_sets['60min'] = pd.read_pickle('final_60.pickle')
Data are provided in three different "shapes":
The different shapes need to be created internally befor they can be saved to files. Takes about 1 minute to run.
%%time
data_sets_singleindex = {}
data_sets_multiindex = {}
data_sets_stacked = {}
for res_key, df in data_sets.items():
if df.empty:
continue
for col_name, col in df.iteritems():
if not (col_name[0] in info_cols.values() or
col_name[2] == 'profile'):
df[col_name] = col.round(0)
# MultIndex
data_sets_multiindex[res_key + '_multiindex'] = df
# SingleIndex
df_singleindex = df.copy()
# use first 3 levels of multiindex to create singleindex
df_singleindex.columns = [
col[0] if col[0] in info_cols.values()
else '_'.join(col[0:3]) for col in df.columns.values]
data_sets_singleindex[res_key + '_singleindex'] = df_singleindex
# Stacked
stacked = df.copy()
stacked.drop(info_cols['cet'], axis=1, inplace=True)
stacked.columns = stacked.columns.droplevel(['source', 'web'])
stacked = stacked.transpose().stack(dropna=True).to_frame(name='data')
data_sets_stacked[res_key + '_stacked'] = stacked
This file format is required for the filtering function on the OPSD website. This takes about 30 seconds to complete.
%%time
for res_key, df in data_sets_singleindex.items():
f = 'time_series_' + res_key
df = df.copy()
df.index = df.index.strftime('%Y-%m-%dT%H:%M:%SZ')
df[info_cols['cet']] = df[info_cols['cet']].dt.strftime('%Y-%m-%dT%H:%M:%S%z')
df.to_sql(f, sqlite3.connect('time_series.sqlite'),
if_exists='replace', index_label=info_cols['utc'])
Writing the full tables to Excel takes extremely long. As a workaround, only the first 5 rows are exported. The rest of the data can than be inserted manually from the _multindex.csv
files.
%%time
writer = pd.ExcelWriter('time_series.xlsx')
for res_key, df in data_sets_multiindex.items():
df.head().to_excel(writer, res_key, float_format='%.2f', merge_cells=True)
writer.save()
This takes about 10 minutes to complete.
%%time
# itertoools.chain() allows iterating over multiple dicts at once
for res_stacking_key, df in itertools.chain(
data_sets_singleindex.items(),
data_sets_multiindex.items(),
data_sets_stacked.items()
):
# convert the format of the cet_cest-timestamp to ISO-8601
if not (res_stacking_key in ['15min_stacked', '60min_stacked']
or type(df.iloc[0,0]) == str):
df.iloc[:,0] = df.iloc[:,0].dt.strftime('%Y-%m-%dT%H:%M:%S%z')
f = 'time_series_' + res_stacking_key
df.to_csv(f + '.csv', float_format='%.2f',
date_format='%Y-%m-%dT%H:%M:%SZ')