#!/usr/bin/env python
# coding: utf-8
#
# In[ ]:
get_ipython().run_line_magic('load_ext', 'autoreload')
get_ipython().run_line_magic('autoreload', '2')
import glob
import os
import sqlite3
import hashlib
import shutil
import pandas as pd
import geopandas as gp
import gsee
import tqdm
from joblib import Parallel, delayed
import generate_metadata
# In[ ]:
version = '2019-04-09'
changes = 'All European countries'
# In[ ]:
dir_shapefiles = os.path.join('downloads', 'shapefiles')
dir_countries = os.path.join('downloads', 'countries')
dir_nuts = os.path.join('downloads', 'nuts')
# # Read and process data
# In[ ]:
if os.path.exists('cached_dataframe.csv'):
df_cached = pd.read_csv('cached_dataframe.csv', index_col=0, header=[0, 1], parse_dates=True)
else:
df_cached = df_cached = pd.DataFrame({}) # Empty dataframe, to permit 'x in df_cached' tests
# In[ ]:
dataframes = {}
parse_kwargs = dict(skiprows=2, index_col=0, parse_dates=True)
# Files of form `ninja_weather_country_AT_merra-2_population_weighted.csv`
for f in glob.glob(os.path.join(dir_countries, '*.csv')):
country_code = f.split('_')[3]
if country_code not in df_cached:
df = pd.read_csv(f, **parse_kwargs)
dataframes[country_code] = df
# Files of form `ninja_weather_irradiance_surface_country_DE_merra-2_nuts-2_population_weighted.csv`
for f in glob.glob(os.path.join(dir_nuts, '*.csv')):
country_code = f.split('country_')[1][0:2]
variable = f.split('weather_')[1].split('_country')[0]
df = pd.read_csv(f, **parse_kwargs)
df = df.rename(columns={country_code + '_TOTAL': country_code})
for c in df.columns:
if c not in df_cached:
if c not in dataframes:
dataframes[c] = pd.DataFrame({variable: df[c]})
else:
dataframes[c].loc[:, variable] = df[c]
# In[ ]:
# Estimate direct and diffuse radiation using the BRL model as implented in GSEE.
# https://github.com/renewables-ninja/gsee
# Warning: this code takes a while to execute (easily 1-2mins CPU time per location).
nuts_centroids = gp.GeoDataFrame.from_file(os.path.join(dir_shapefiles, 'NUTS_LB_2016_4326.shp'))
nuts_centroids.set_index('NUTS_ID', inplace=True)
# Map GB to UK, GR to EL
nuts_centroids.loc['GB', : ] = nuts_centroids.loc['UK', :]
nuts_centroids.loc['GR', : ] = nuts_centroids.loc['EL', :]
data = {k: {
'clearness': dataframes[k]['irradiance_surface'] / dataframes[k]['irradiance_toa'],
'centroid': list(nuts_centroids.loc[k, 'geometry'].coords)[0][::-1],
} for k in dataframes.keys()}
def process_item(item):
return gsee.brl_model.run(hourly_clearness=item['clearness'], coords=item['centroid'])
result = Parallel(n_jobs=-1)(delayed(process_item)(item) for item in tqdm.tqdm(data.values()))
# In[ ]:
for i, k in enumerate(data.keys()):
df = dataframes[k]
diffuse_fraction = result[i]
df['radiation_direct_horizontal'] = ((1 - diffuse_fraction) * df['irradiance_surface']).fillna(0)
df['radiation_diffuse_horizontal'] = (diffuse_fraction * df['irradiance_surface']).fillna(0)
dataframes[k] = df
# In[ ]:
variables = ['windspeed_10m', 'temperature', 'radiation_direct_horizontal', 'radiation_diffuse_horizontal']
for k in dataframes.keys():
dataframes[k] = dataframes[k].loc[:, [v for v in variables if v in dataframes[k].columns]]
if len(dataframes) > 0:
complete_data = pd.concat(dataframes, axis=1, join='inner')
df = pd.concat([complete_data, df_cached], axis=1)
df.columns = pd.MultiIndex.from_tuples(
[(i[0], i[1]) for i in df.columns],
names=['geography', 'variable']
)
df.index.name = 'utc_timestamp'
df.to_csv('cached_dataframe.csv')
else:
df = df_cached
# # Write data to disk
# In[ ]:
os.makedirs(version, exist_ok=True)
# ## Reshape data
# Data are provided in two different "shapes":
# - SingleIndex (easy to read for humans, compatible with datapackage standard, small file size)
# - File formats: CSV, SQLite
# - MultiIndex (easy to read into GAMS, not compatible with datapackage standard, small file size)
# - File formats: CSV, Excel
# In[ ]:
df_multiindex = df
df_singleindex = df.copy()
df_singleindex.columns = [
'_'.join([level for level in list(col)])
for col in df.columns.values
]
df_stacked = df.copy()
df_stacked = df_stacked.transpose().stack(dropna=True).to_frame(name='data')
# In[ ]:
datetime_format = '%Y-%m-%dT%H:%M:%SZ'
# ## Write to SQLite database
# In[ ]:
# SQLite is required for the filtering function on the OPSD website
df_sqlite = df_singleindex.copy()
df_sqlite.index = df_sqlite.index.strftime(datetime_format)
filepath = os.path.join(version, 'weather_data.sqlite')
if os.path.exists(filepath):
os.unlink(filepath)
df_sqlite.to_sql(
'weather_data',
sqlite3.connect(filepath),
if_exists='replace',
index_label='utc_timestamp'
)
# ## Write to CSV
# In[ ]:
csv_kwargs = dict(
float_format='%.4f',
date_format=datetime_format
)
df_singleindex.to_csv(
os.path.join(version, 'weather_data.csv'),
**csv_kwargs
)
df_multiindex.to_csv(
os.path.join(version, 'weather_data_multiindex.csv'),
**csv_kwargs
)
# ## Write metadata
# In[ ]:
# See generate_metadata.py for details
generate_metadata.generate_json(df_multiindex, version, changes)
# ## Write checksums.txt
# We publish SHA checksums for the output files on GitHub to allow verifying their integrity on the OPSD server.
# In[ ]:
def get_sha_hash(path, blocksize=65536):
sha_hasher = hashlib.sha256()
with open(path, 'rb') as f:
buffer = f.read(blocksize)
while len(buffer) > 0:
sha_hasher.update(buffer)
buffer = f.read(blocksize)
return sha_hasher.hexdigest()
checksum_file_path = os.path.join(version, 'checksums.txt')
files = glob.glob(os.path.join(version, 'weather_data*'))
# Create checksums.txt in the version directory
with open(checksum_file_path, 'w') as f:
for this_file in files:
file_hash = get_sha_hash(this_file)
f.write('{},{}\n'.format(os.path.basename(this_file), file_hash))
# Copy the file to root directory from where it will be pushed to GitHub
shutil.copyfile(checksum_file_path, 'checksums.txt')
# In[ ]: