Open Power System Data: time series
Part of the project Open Power System Data.
This Jupyter notebook python script downloads and processes time-series data from European power systems. The notebook has been used to create the timeseries-datapackage that is available on the Open Power System Data plattform.
A Jupyter notebook is a file that combines executable programming code with visualizations and comments in markdown format, allowing for an intuitive documentation of the code.
The notebook is hosted in a GitHub repository that can be downloaded for execution on your local computer (You need a running python installation to do this, for example Anaconda).
The download and read functions are implemented as distinct modules that are imported to this notebook. Click below to inspect the code (The link to the local copy will only work if you are running this notebook on your yomputer):
The sources file (GitHub / local copy)
The download script (GitHub / local copy) downloads the data from our sources to your hard drive.
The read script (GitHub / local copy) reads each downloaded file into a pandas-DataFrame and merges data from different sources but with the same time resolution.
Load libraries, set up a log etc.
This notebook makes use of the pycountry library that is not part of Anaconda. Install it with with pip install pycountry
from your command line.
from datetime import datetime, date, timedelta
import pandas as pd
import numpy as np
import logging
import pycountry
import json
import sqlite3
import yaml
from itertools import chain
from timeseries_scripts.read import read
from timeseries_scripts.download import download
# reload modules with execution of any code, to avoid having to restart
# the kernel after editing timeseries_scripts
%load_ext autoreload
%autoreload 2
logger = logging.getLogger('log')
logger.setLevel('INFO')
# Optionally, specify the beginning and end of the interval for which to attempt
# The download. If None, all available data will be downloadet
start_date = None #date(2016, 1, 1)
end_date = None #date(2016, 6, 30)
sources_yaml_path = 'config/sources.yml'
out_path = 'original_data'
# Optionally, specify a subset to download/read, e.g. subset=['TenneT', '50Hertz']
include_sources = ['50Hertz', 'Amprion', 'TenneT', 'TransnetBW', 'Elia', 'ENTSO-E', 'OPSD', 'Svenska Kraftnaet']
#include_sources = ['Svenska Kraftnaet']
Download sources are in config/sources.yml
, which specifies, for each source, the variables (such as wind and solar generation) alongside all the parameters necessary to execute the downloads.
First, a data directory is created on your local computer. Then, download parameters for each data source are defined, including the URL. These parameters are then turned into a YAML-string. Finally, the download is executed one by one. If all data need to be downloaded, this usually takes several hours.
Each file is saved under it's original filename. Note that the original file names are often not self-explanatory (called "data" or "January"). The files content is revealed by its place in the directory structure.
download(sources_yaml_path, out_path, end_date=end_date, start_date=start_date, subset=include_sources)
Energinet.dk data needs to be downloadet manually from http://www.energinet.dk/en/el/engrosmarked/udtraek-af-markedsdata/Sider/default.aspx Check The following Boxes Nand then press the "Get extract"-button at the end of the page:
Period Get data from: 01-01-2000 To: Today all months
Data columns Elspot Price, Currency Code/MWh
Production and consumption, MWh/h
Data format Currency code EUR Decimal format English number Format (period as decimal separator) Date format Other date format(YYYY-MM-DD) Recieve to Excel
This section reads each downloaded file into a pandas-DataFrame and merges data from different sources but with the same time resolution.
These are the names of the rows at the top of the data used to store metadata internally. In the output data, this information will be moved to the datapackage.json File.
headers = ['variable', 'region', 'attribute', 'source', 'web']
data_sets = read(sources_yaml_path, out_path, headers, subset=include_sources)
data_sets['60min']#['2015-12-31 12':]#.xs('CH', axis=1, level='region', drop_level=False)
Save/load the data already parsed for faster access if you need to restart the skript
data_sets['15min'].to_pickle('data_sets_15.pickle')
data_sets['60min'].to_pickle('data_sets_60.pickle')
#data_sets = {}
#data_sets['15min'] = pd.read_pickle('data_sets_15_Elia.pickle')
##data_sets['60min'] = pd.read_pickle('data_sets_60.pickle')
This section performs some aggregations and transforms the data to the tabular data package format, where actual data is saved in a CSV file, while metadata (information on format, units, sources, and descriptions) is stored in a JSON file.
Patch missing data. At this stage, only implemented for 15 minute resolution solar/wind in-feed data from german TSOs. Small gaps (up to 2 hours) are filled by linear interpolation. For the generation timeseries, larger gaps are guessed by up-/down scaling the data from other balancing areas to fit the expected magnitude of the missing data.
The locations of missing data are stored in the nan_table DataFrame.
def interpolator(i, j, row, col, col_name, nan_regs, one_period):
'''interpolate missing value spans up to 2 hours'''
if i + 1 == len(nan_regs):
logger.info(
'%s : \n '
'interpolated %s up-to-2-hour-spans of NaNs',
col_name[0:3], i + 1 - j
)
to_fill = slice(row['start_idx'] - one_period, row['till_idx'] + one_period)
col.iloc[:,0].loc[to_fill] = col.iloc[:,0].loc[to_fill].interpolate()
return col
def guesser(row, col, col_name, nan_regs, frame, one_period):
'''guess missing value spans longer than one hour based on other tsos'''
#logger.info('guessed %s entries after %s', row['count'], row['start_idx'])
day_before = pd.DatetimeIndex(
freq='15min',
start=row['start_idx'] - timedelta(hours=24),
end=row['start_idx'] - one_period
)
to_fill = pd.DatetimeIndex(
freq='15min',
start=row['start_idx'],
end=row['till_idx']
)
# other_tsos = [c[1] for c in compact.drop(col_name, axis=1).loc[:,(col_name[0],slice(None),col_name[2])].columns.tolist()]
other_tsos = [
tso
for tso
in ['DE50hertz', 'DEamprion', 'DEtennet', 'DEtransnetbw']
if tso != col_name[1]
]
# select columns with data for same technology (wind/solar) but from other TSOs
similar = frame.loc[:,(col_name[0],other_tsos,col_name[2])]
# calculate the sum using columns without NaNs the day
# before or during the period to be guessed
similar = similar.dropna(
axis=1,
how='any',
subset=day_before.append(to_fill)
).sum(axis=1)
# calculate scaling factor for other TSO data
factor = similar.loc[day_before].sum(axis=0) / col.loc[day_before,:].sum(axis=0)
guess = similar.loc[to_fill] / float(factor)
col.iloc[:,0].loc[to_fill] = guess
a = float(col.iloc[:,0].loc[row['start_idx'] - one_period])
b = float(col.iloc[:,0].loc[row['start_idx']])
if a == 0:
deviation = '{} absolut'.format(a - b)
else:
deviation = '{:.2f} %'.format((a - b) / a * 100)
logger.info(
'%s : \n '
'guessed %s entries after %s \n '
'last non-missing: %s \n '
'first guessed: %s \n '
'deviation of first guess from last known value: %s',
col_name[0:3], row['count'], row['start_idx'], a, b, deviation
)
return col
def chooser(col, col_name, nan_regs, frame, one_period):
for i, row in nan_regs.iterrows():
j = 0
# interpolate missing value spans up to 2 hours
if row['span'] <= timedelta(hours=2):
col = interpolator(i, j, row, col, col_name, nan_regs, one_period)
# guess missing value spans longer than one hour based on other tsos
elif col_name[1][:2] == 'DE' and col_name[2] == 'generation':
j += 1
col = guesser(row, col, col_name, nan_regs, frame, one_period)
return col
def nan_finder(frame, patch=False):
'''Search for missing values in a DataFrame and apply custom patching.'''
nan_table = pd.DataFrame()
patched = pd.DataFrame()
one_period = frame.index[1] - frame.index[0]
for col_name, col in frame.iteritems():
col = col.to_frame() # kann man colname wieder an df drankleben? df sollte col heißen
# tag all occurences of NaN in the data (but not before first actual entry or after last one)
col['tag'] = (
(col.index >= col.first_valid_index()) &
(col.index <= col.last_valid_index()) &
col.isnull().transpose().as_matrix()
).transpose()
# make another DF to hold info about each region
nan_regs = pd.DataFrame()
# first row of consecutive region is a True preceded by a False in tags
nan_regs['start_idx'] = col.index[
col['tag'] & ~
col['tag'].shift(1).fillna(False)]
# last row of consecutive region is a False preceded by a True
nan_regs['till_idx'] = col.index[
col['tag'] & ~
col['tag'].shift(-1).fillna(False)]
if not col['tag'].any():
logger.info('%s : nothing to patch in this column', col_name[0:3])
col.drop('tag', axis=1, inplace=True)
nan_idx = pd.MultiIndex.from_arrays([
[0, 0, 0, 0],
['count', 'span', 'start_idx', 'till_idx']
]
)
nan_list = pd.DataFrame(index=nan_idx, columns=col.columns)
else:
# how long is each region
nan_regs['span'] = nan_regs['till_idx'] - nan_regs['start_idx'] + one_period
nan_regs['count'] = (nan_regs['span'] / one_period)
# sort the info DF to put longest missing region on top
nan_regs = nan_regs.sort_values(
'count',
ascending=False
).reset_index(drop=True)
col.drop('tag', axis=1, inplace=True)
nan_list = nan_regs.stack().to_frame()
nan_list.columns = col.columns
if patch:
col = chooser(col, col_name, nan_regs, frame, one_period)
if len(patched) == 0:
patched = col
else:
patched = patched.combine_first(col)
if len(nan_table) == 0:
nan_table = nan_list
else:
nan_table = nan_table.combine_first(nan_list)
nan_table.columns.names = headers
patched.columns.names = headers
return patched, nan_table
Patch the 15 minutes dataset and display the location of missing Data in the original data.
patched, nan_table = nan_finder(data_sets['15min'], patch=True)
nan_table
nan_table.to_excel('nan_table2.xlsx')
Execute this to see whether there is still missing data. This is the case for some of the forecast columns.
patched2, nan_table2 = nan_finder(patched)
nan_table2#.loc[(slice(None),['count','start_idx']),:]
Execute this to see an example of where the data has been patched.
data_sets['15min'].loc['2015-10-24 23:00:00':'2015-10-25 03:00:00', 'wind']
patched.loc['2015-10-24 23:00:00':'2015-10-25 03:00:00', 'wind']
Replace the untreated data set with the patched one.
data_sets['15min'] = patched
The wind and solar in-feed data for the 4 German balancing areas is summed up and stored in in new columns, which are then used to calculate profiles, that is, the share of wind/solar capacity producing at a given time. The column headers are created in the fashion introduced in the read script.
web = 'http://data.open-power-system-data.org/datapackage_timeseries'
for tech in ['wind', 'solar']:
for attribute in ['generation', 'forecast']:
sum_col = pd.Series()
for tso in ['DE50hertz', 'DEamprion', 'DEtennet', 'DEtransnetbw']:
try:
add_col = data_sets['15min'][tech, tso, attribute]
if len(sum_col) == 0:
sum_col = add_col
else:
sum_col = sum_col + add_col.values
except KeyError:
pass
# Create a new MultiIndex
tuples = [(tech, 'DE', attribute, 'own calculation', web)]
columns = pd.MultiIndex.from_tuples(tuples, names=headers)
sum_col.columns = columns
data_sets['15min'] = data_sets['15min'].combine_first(sum_col)
# Calculate the profile column
try:
if attribute == 'generation':
profile_col = sum_col.values / data_sets['15min'][tech, 'DE', 'capacity']
tuples = [(tech, 'DE', 'profile', 'own calculation', web)]
columns = pd.MultiIndex.from_tuples(tuples, names=headers)
profile_col.columns = columns
data_sets['15min'] = data_sets['15min'].combine_first(profile_col)
except KeyError:
pass # FIXME
New columns for the aggregated data have been added to the 15 minutes dataset.
data_sets['15min']
The German renewables in-feed data comes in 15-minute intervals. We resample it to hourly intervals in order to match the load data from ENTSO-E.
resampled = data_sets['15min'].resample('H').mean()
try:
data_sets['60min'] = data_sets['60min'].combine_first(resampled)
except KeyError:
data_sets['60min'] = resampled
New columns for the resampled data have been added to the 60 minutes dataset.
data_sets['60min']
# Insert a column with Central European (Summer-)time.
# Still causes some problems, not recommended
#for res_key, df in data_sets.items():
# if not df.empty:
# df.insert(0, 'cet-timestamp', df.index.tz_localize('UTC').tz_convert('Europe/Brussels'))
In this part, we create the metadata that will document the data output in CSV format. The metadata we be stored in JSON format, which is very much like a python dictionary.
First, we define the general metadata for the timeseries datapackage
metadata_head = '''
name: opsd-timeseries
title: 'Time-series data: load, wind and solar, prices'
description: This data package contains different kinds of timeseries
data relevant for power system modelling, namely electricity consumption
(load) for 36 European countries as well as wind and solar power generation
and capacities and prices for a growing subset of countries.
The timeseries become available at different points in time depending on the
sources. The full dataset is only available from 2012 onwards. The
data has been downloaded from the sources, resampled and merged in
a large CSV file with hourly resolution. Additionally, the data
available at a higher resolution (Some renewables in-feed, 15
minutes) is provided in a separate file. All data processing is
conducted in python and pandas and has been documented in the
Jupyter notebooks linked below.
opsd-jupyter-notebook-url: https://github.com/Open-Power-System-Data/
datapackage_timeseries/blob/2016-07-14/main.ipynb
version: '2016-07-14'
opsd-changes-to-last-version: Included data from Energinet.DK, Elia and
Svenska Kraftnaet
keywords:
- timeseries
- electricity
- in-feed
- capacity
- renewables
- wind
- solar
- load
- tso
- europe
- germany
geographical-scope: Europe
licenses:
- url: http://example.com/license/url/here
version: 1.0
name: License Name Here
id: license-id-from-open
views:
- {}
maintainers:
- web: http://example.com/
name: Jonathan Muehlenpfordt
email: muehlenpfordt@neon-energie.de
resources:
'''
source_template = '''
- name: {source}
web: {web}
'''
resource_template = '''
- path: timeseries{res_key}.csv
format: csv
mediatype: text/csv
alternative_formats:
- path: timeseries{res_key}.csv
stacking: Singleindex
format: csv
- path: timeseries{res_key}.xlsx
stacking: Singleindex
format: xlsx
- path: timeseries{res_key}_multiindex.xlsx
stacking: Multiindex
format: xlsx
- path: timeseries{res_key}_multiindex.csv
stacking: Multiindex
format: csv
- path: timeseries{res_key}_stacked.csv
stacking: Stacked
format: csv
schema:
fields:
'''
indexfield = '''
- name: timestamp
description: Start of timeperiod in UTC
type: datetime
format: YYYY-MM-DDThh:mm:ssZ
'''
field_template = '''
- name: {variable}_{region}_{attribute}
description: {description}
type: number
source:
name: {source}
web: {web}
opsd-properties:
Region: {region}
Variable: {variable}
Attribute: {attribute}
'''
descriptions_template = '''
load: Consumption in {geo} in MW
generation: Actual {tech} generation in {geo} in MW
actual: Actual {tech} generation in {geo} in MW
forecast: {tech} day-ahead generation forecast in {geo} in MW
capacity: {tech} capacity in {geo} in MW
profile: Share of {tech} capacity producing in {geo}
offshoreshare: {tech} actual offshore generation in {geo} in MW
'''
For each dataset/outputfile, the metadata has an entry in the "resources" list that describes the file/dataset. The main part of each entry is the "schema" dictionary, consisting of a list of "fields", meaning the columns in the dataset. The first field is the timestamp index of the dataset. For the other fields, we iterate over the columns of the MultiIndex index of the datasets to contruct the corresponding metadata.
resource_list = '' # list of files included in the datapackage
source_list = '' # list of sources were data comes from
for res_key, df in data_sets.items():
field_list = indexfield # list of of columns in a file, starting with the index field
for col in df.columns: # create
h = {k: v for k, v in zip(headers, col)}
if len(h['region']) > 2:
geo = h['region'] + ' control area'
elif h['region'] == 'NI':
geo = 'Northern Ireland'
elif h['region'] == 'CS':
geo = 'Serbia and Montenegro'
else:
geo = pycountry.countries.get(alpha2=h['region']).name
descriptions = yaml.load(
descriptions_template.format(tech=h['variable'], geo=geo)
)
h['description'] = descriptions[h['attribute']]
field_list = field_list + field_template.format(**h)
source_list = source_list + source_template.format(**h)
resource_list = resource_list + resource_template.format(res_key=res_key) + field_list
source_list = [dict(tupleized) #remove duplicates from sources_list
for tupleized
in set(tuple(entry.items())
for entry
in yaml.load(source_list)
)
]
metadata = yaml.load(metadata_head)
metadata['sources'] = source_list
metadata['resources'] = yaml.load(resource_list)
Execute this to write the metadata to disk
datapackage_json = json.dumps(metadata, indent=2, separators=(',', ': '))
with open('datapackage.json', 'w') as f:
f.write(datapackage_json)
Finally, we want to write the data to the output files and save it in the directory of this notebook. First, we prepare different shapes of the dataset.
data_sets_singleindex = {}
data_sets_multiindex = {}
data_sets_stacked = {}
for res_key, df in data_sets.items():
if not df.empty:
df_singleindex = df.copy()
# use first 3 levels of multiindex to create singleindex
df_singleindex.columns = ['_'.join(col[0:3])
for col
in df.columns.values
]
data_sets_singleindex[res_key] = df_singleindex
data_sets_multiindex[res_key + '_multiindex'] = df
stacked = df.copy()
stacked.columns = stacked.columns.droplevel(['source', 'web'])
stacked = stacked.transpose().stack(dropna=True).to_frame(name='data')
data_sets_stacked[res_key + '_stacked'] = stacked
This file format is required for the filtering function on the OPSD website. This takes about 30 seconds to complete.
%%time
for res_key, df in data_sets_singleindex.items():
table = 'timeseries' + res_key
df = df.copy()
df.index = df.index.strftime('%Y-%m-%dT%H:%M:%SZ')
df.to_sql(table, sqlite3.connect('data.sqlite'),
if_exists='replace', index_label='timestamp')
This takes about 15 Minutes to complete.
%%time
for res_key, df in chain(
data_sets_singleindex.items(),
data_sets_multiindex.items()
):
f = 'timeseries' + res_key
df.to_excel(f+ '.xlsx', float_format='%.2f', merge_cells=False)
This takes about 10 minutes to complete.
%%time
for res_key, df in chain(
data_sets_singleindex.items(),
data_sets_multiindex.items(),
data_sets_stacked.items()
):
f = 'timeseries' + res_key
df.to_csv(f + '.csv', float_format='%.2f',
date_format='%Y-%m-%dT%H:%M:%SZ')
work in progress
# pv = compact.xs(('solar'), level=('variable'), axis=1, drop_level=False)
# pv.index = pd.MultiIndex.from_arrays([pv.index.date, pv.index.time], names=['date','time'])
# pv
# pv.groupby(level='time').max()
# pv.unstack().idxmax().to_frame().unstack().transpose()