#!/usr/bin/env python # coding: utf-8 #
# Weather Data: Renewables.ninja processing notebook # #
This Notebook is part of the Weather Data Package of Open Power System Data. #
# In[ ]: get_ipython().run_line_magic('load_ext', 'autoreload') get_ipython().run_line_magic('autoreload', '2') import glob import os import sqlite3 import hashlib import shutil import pandas as pd import geopandas as gp import gsee import tqdm from joblib import Parallel, delayed import generate_metadata # In[ ]: version = '2019-04-09' changes = 'All European countries' # In[ ]: dir_shapefiles = os.path.join('downloads', 'shapefiles') dir_countries = os.path.join('downloads', 'countries') dir_nuts = os.path.join('downloads', 'nuts') # # Read and process data # In[ ]: if os.path.exists('cached_dataframe.csv'): df_cached = pd.read_csv('cached_dataframe.csv', index_col=0, header=[0, 1], parse_dates=True) else: df_cached = df_cached = pd.DataFrame({}) # Empty dataframe, to permit 'x in df_cached' tests # In[ ]: dataframes = {} parse_kwargs = dict(skiprows=2, index_col=0, parse_dates=True) # Files of form `ninja_weather_country_AT_merra-2_population_weighted.csv` for f in glob.glob(os.path.join(dir_countries, '*.csv')): country_code = f.split('_')[3] if country_code not in df_cached: df = pd.read_csv(f, **parse_kwargs) dataframes[country_code] = df # Files of form `ninja_weather_irradiance_surface_country_DE_merra-2_nuts-2_population_weighted.csv` for f in glob.glob(os.path.join(dir_nuts, '*.csv')): country_code = f.split('country_')[1][0:2] variable = f.split('weather_')[1].split('_country')[0] df = pd.read_csv(f, **parse_kwargs) df = df.rename(columns={country_code + '_TOTAL': country_code}) for c in df.columns: if c not in df_cached: if c not in dataframes: dataframes[c] = pd.DataFrame({variable: df[c]}) else: dataframes[c].loc[:, variable] = df[c] # In[ ]: # Estimate direct and diffuse radiation using the BRL model as implented in GSEE. # https://github.com/renewables-ninja/gsee # Warning: this code takes a while to execute (easily 1-2mins CPU time per location). nuts_centroids = gp.GeoDataFrame.from_file(os.path.join(dir_shapefiles, 'NUTS_LB_2016_4326.shp')) nuts_centroids.set_index('NUTS_ID', inplace=True) # Map GB to UK, GR to EL nuts_centroids.loc['GB', : ] = nuts_centroids.loc['UK', :] nuts_centroids.loc['GR', : ] = nuts_centroids.loc['EL', :] data = {k: { 'clearness': dataframes[k]['irradiance_surface'] / dataframes[k]['irradiance_toa'], 'centroid': list(nuts_centroids.loc[k, 'geometry'].coords)[0][::-1], } for k in dataframes.keys()} def process_item(item): return gsee.brl_model.run(hourly_clearness=item['clearness'], coords=item['centroid']) result = Parallel(n_jobs=-1)(delayed(process_item)(item) for item in tqdm.tqdm(data.values())) # In[ ]: for i, k in enumerate(data.keys()): df = dataframes[k] diffuse_fraction = result[i] df['radiation_direct_horizontal'] = ((1 - diffuse_fraction) * df['irradiance_surface']).fillna(0) df['radiation_diffuse_horizontal'] = (diffuse_fraction * df['irradiance_surface']).fillna(0) dataframes[k] = df # In[ ]: variables = ['windspeed_10m', 'temperature', 'radiation_direct_horizontal', 'radiation_diffuse_horizontal'] for k in dataframes.keys(): dataframes[k] = dataframes[k].loc[:, [v for v in variables if v in dataframes[k].columns]] if len(dataframes) > 0: complete_data = pd.concat(dataframes, axis=1, join='inner') df = pd.concat([complete_data, df_cached], axis=1) df.columns = pd.MultiIndex.from_tuples( [(i[0], i[1]) for i in df.columns], names=['geography', 'variable'] ) df.index.name = 'utc_timestamp' df.to_csv('cached_dataframe.csv') else: df = df_cached # # Write data to disk # In[ ]: os.makedirs(version, exist_ok=True) # ## Reshape data # Data are provided in two different "shapes": # - SingleIndex (easy to read for humans, compatible with datapackage standard, small file size) # - File formats: CSV, SQLite # - MultiIndex (easy to read into GAMS, not compatible with datapackage standard, small file size) # - File formats: CSV, Excel # In[ ]: df_multiindex = df df_singleindex = df.copy() df_singleindex.columns = [ '_'.join([level for level in list(col)]) for col in df.columns.values ] df_stacked = df.copy() df_stacked = df_stacked.transpose().stack(dropna=True).to_frame(name='data') # In[ ]: datetime_format = '%Y-%m-%dT%H:%M:%SZ' # ## Write to SQLite database # In[ ]: # SQLite is required for the filtering function on the OPSD website df_sqlite = df_singleindex.copy() df_sqlite.index = df_sqlite.index.strftime(datetime_format) filepath = os.path.join(version, 'weather_data.sqlite') if os.path.exists(filepath): os.unlink(filepath) df_sqlite.to_sql( 'weather_data', sqlite3.connect(filepath), if_exists='replace', index_label='utc_timestamp' ) # ## Write to CSV # In[ ]: csv_kwargs = dict( float_format='%.4f', date_format=datetime_format ) df_singleindex.to_csv( os.path.join(version, 'weather_data.csv'), **csv_kwargs ) df_multiindex.to_csv( os.path.join(version, 'weather_data_multiindex.csv'), **csv_kwargs ) # ## Write metadata # In[ ]: # See generate_metadata.py for details generate_metadata.generate_json(df_multiindex, version, changes) # ## Write checksums.txt # We publish SHA checksums for the output files on GitHub to allow verifying their integrity on the OPSD server. # In[ ]: def get_sha_hash(path, blocksize=65536): sha_hasher = hashlib.sha256() with open(path, 'rb') as f: buffer = f.read(blocksize) while len(buffer) > 0: sha_hasher.update(buffer) buffer = f.read(blocksize) return sha_hasher.hexdigest() checksum_file_path = os.path.join(version, 'checksums.txt') files = glob.glob(os.path.join(version, 'weather_data*')) # Create checksums.txt in the version directory with open(checksum_file_path, 'w') as f: for this_file in files: file_hash = get_sha_hash(this_file) f.write('{},{}\n'.format(os.path.basename(this_file), file_hash)) # Copy the file to root directory from where it will be pushed to GitHub shutil.copyfile(checksum_file_path, 'checksums.txt') # In[ ]: