This notebook helps compare the CMIP6 prefix in cmip6-pds zarr bucket versus the esgf-world netcdf buckets. It uses intake_esm catalogs and S3 Inventory reports. It can potentially use just the S3 inventory reports.
DCPP (about 20TB) is excluded from the comparison ATM
If you want to dive directly into the comparison plots, please scroll all the way to the bottom.
If you'd like run it on a recent catalog, just update the following cell with the relevant pointers.
Authors: Naomi Henderson, Aparna Radhakrishnan
z_inv_list='s3://s3-inventory-collection/cmip6-pds/s3-inventory-config/hive/dt=2021-04-09-00-00/symlink.txt' #'s3://s3-inventory-collection/cmip6-pds/s3-inventory-config/hive/dt=2021-04-01-00-00/symlink.txt'
n_inv_list = 's3://s3-inventory-dest/esgf-world/cmip6-inventory/hive/dt=2021-04-09-00-00/symlink.txt' #'s3://s3-inventory-dest/esgf-world/cmip6-inventory/hive/dt=2021-03-30-00-00/symlink.txt'
zarr_cat = 'https://cmip6-pds.s3.amazonaws.com/pangeo-cmip6-noQC.csv.gz'
netcdf_cat = 'https://cmip6-nc.s3.us-east-2.amazonaws.com/esgf-world.csv.gz'
import numpy as np
import pandas as pd
import xarray as xr
import matplotlib.pyplot as plt
from datetime import datetime
import sys
import matplotlib
matplotlib.rcParams['figure.figsize'] = [18,4]
import matplotlib.style
import matplotlib as mpl
mpl.rcParams['font.size'] = 14
mpl.rcParams['legend.fontsize'] = 'large'
mpl.rcParams['figure.titlesize'] = 'large'
pd.set_option("display.max_colwidth", None)
from dask import dataframe as dd
import warnings
warnings.simplefilter("ignore")
def launchDask():
#TODO try, except
gateway = Gateway()
options = gateway.cluster_options()
options.worker_memory=16
clusters = gateway.list_clusters()
clusters
if len(clusters) >0:
cluster = gateway.connect(clusters[0].name)
print("close existing connection")#or shut down and start afresh cluster.shutdown(). otherwise I dunno how to apply custom options
cluster.shutdown()
cluster = gateway.new_cluster(options)
else:
cluster = gateway.new_cluster(options)
return(cluster)
from dask_gateway import Gateway
from dask.distributed import Client
cluster = launchDask()
from distributed import Client
client = Client(cluster)
client
cluster.adapt(minimum=0, maximum=30)
from distributed import Client
client = Client(cluster)
client
Client
|
Cluster
|
def sibytesto(bytes, to, bsize=10):
a = {'k' : 3, 'm': 6, 'g' : 9, 't' : 12}
return(bytes / pow(bsize, a[to]))
#Usage: bytesto(size_in_bytes,'g')
#TiB really.
def bytesto(bytes, to, bsize=2):
a = {'g' : 30, 't' : 40,'p': 50}
return(bytes / pow(bsize, a[to]))
#Usage: bytesto(size_in_bytes,'g')
#z_inv_list='s3://s3-inventory-collection/cmip6-pds/s3-inventory-config/hive/dt=2021-04-09-00-00/symlink.txt' #'s3://s3-inventory-collection/cmip6-pds/s3-inventory-config/hive/dt=2021-04-01-00-00/symlink.txt'
zsym = pd.read_csv(z_inv_list, delimiter='\n',dtype='unicode',index_col=False,header=None)
zarr_inventory_dd = dd.read_parquet(zsym[0].tolist(), columns=['key', 'size'], engine='pyarrow',storage_options=dict(anon=True))
#TEST ONLY
zarr_inventory_dd.loc[0:1000000]
zarr_inventory_dd['size'].sum().compute()
908246538633815
bytesto(908246538633815,'t') #826 TB #This is how much the cmip6-pds CMIP6 prefix holds
826.0454147910559
zarr_inventory_dd=zarr_inventory_dd.repartition(npartitions=40) #zarr_inventory_dd.npartitions
zarr_inventory_dd=zarr_inventory_dd.set_index('key',drop=False)
zarr_inventory_dd['vstore'] = zarr_inventory_dd.apply(
lambda row: '/'.join([row['key'].split('/v20')[0],'v20'+row['key'].split('v20')[-1].split('/')[0]] ),axis =1).compute()
#client.persist(zarr_inventory_dd)
cluster.scale(25)
zarr_inventory_dd = zarr_inventory_dd.groupby(['vstore'])['size'].sum(split_out=40)
zarr_inventory_dd.compute()
zarr_inventory_dd = zarr_inventory_dd.to_frame()
zarr_inventory_dd = zarr_inventory_dd.reset_index(drop=False)
#zarr_cat = 'https://cmip6-pds.s3.amazonaws.com/pangeo-cmip6-noQC.csv.gz' #'https://cmip6-pds.s3.amazonaws.com/compare/pangeo-cmip6-noQC.csv.gz'
dzarr = pd.read_csv(zarr_cat, dtype='unicode')
ddzarr = dd.from_pandas(dzarr, npartitions=40)
ddzarr['vstore'] = ddzarr.apply(lambda row: (row['zstore'].split('s3://cmip6-pds/')[-1].split('v20')[0]+'v'+row.version), axis = 1).compute()
dzarrtmp = ddzarr.merge(zarr_inventory_dd, on='vstore', how='outer', suffixes=['', '_'], indicator=True)
dzarr = dzarrtmp[dzarrtmp._merge=='both']
ddzarr_groupby =dzarr.groupby(['zstore','vstore','version','source_id','table_id','institution_id','variable_id','member_id','grid_label','experiment_id'])#.sum('size').compute()
series_ddzarr = ddzarr_groupby['size'].sum().compute()
dzarr = series_ddzarr.to_frame()
dzarr = dzarr.sort_values(by=['version'])
dzarr=dzarr.reset_index(drop=False)
dzarr['zstore'] = dzarr.apply(lambda row: (row.vstore.split('s3://cmip6-pds/')[-1].split('v20')[0]), axis = 1)
dzarr['activity_id'] = dzarr.apply(lambda row: row.vstore.split('CMIP6/')[-1].split('/')[0], axis = 1)
#esgf-world catalog
#netcdf_cat = 'https://cmip6-nc.s3.us-east-2.amazonaws.com/esgf-world.csv.gz'
dnc = pd.read_csv(netcdf_cat, dtype='unicode')
dnc['key'] = dnc.apply(lambda row: row.path.split('s3://esgf-world/')[-1], axis = 1) #Add key or reuse path as needed--since parquet has the key, for merging later on
#merge dnc and nc_inventory_pd to incorporate size and then remove version duplicates
#n_inv_list = 's3://s3-inventory-dest/esgf-world/cmip6-inventory/hive/dt=2021-04-09-00-00/symlink.txt' #'s3://s3-inventory-dest/esgf-world/cmip6-inventory/hive/dt=2021-03-30-00-00/symlink.txt'
nsym = pd.read_csv(n_inv_list, delimiter='\n',dtype='unicode',header=None)
nc_inventory_pd = pd.concat(map(pd.read_parquet, (nsym[0].tolist()) ))
if('is_delete_marker' in nc_inventory_pd): nc_inventory_pd=nc_inventory_pd[nc_inventory_pd['is_delete_marker']==False]
if('is_latest' in nc_inventory_pd): nc_inventory_pd=nc_inventory_pd[nc_inventory_pd['is_latest']==True]
nc_inventory_pd = nc_inventory_pd[nc_inventory_pd['size'].notna()]
#merge dnetcdf and nc_inventory_pd to incorporate size and then remove version duplicate
dnctmp = dnc.merge(nc_inventory_pd, on='key', how='outer', suffixes=['', '_'], indicator=True)
dnetcdf = dnctmp[dnctmp._merge=='both']
dnetcdf = dnetcdf[dnetcdf['size'].notna()]
dnetcdf = dnetcdf.drop(['frequency', 'modeling_realm'], axis=1) #since its NA in esgf-world csv
dnetcdf['zstore'] = dnetcdf.apply(lambda row: row.key.split('s3://esgf-world/')[-1].split('v20')[0], axis = 1)
dnetcdf['vstore'] = dnetcdf.apply(lambda row: row.zstore + str(row.version)+'/', axis = 1)
dnetcdf=dnetcdf.groupby(['zstore','model','version','mip_table','institute','variable','ensemble_member','grid_label','experiment_id'], as_index=False).sum('size') #sum()['size'].sum()
dnetcdf = dnetcdf.sort_values(by=['version'])
print("Total AWS-netcdf in TB",bytesto(dnetcdf['size'].sum(),'t')) #
dnetcdf = dnetcdf.drop_duplicates(subset=["zstore"],keep='last')
dnetcdf = dnetcdf.rename(columns={"mip_table": "table_id","institute":"institution_id","variable":"variable_id"})
dnetcdf['activity_id'] = dnetcdf.apply(lambda row: row.zstore.split('CMIP6/')[-1].split('/')[0], axis = 1)
Total AWS-netcdf in TB 603.8197385220765
#VERIFY
searchval = 'CMIP6/CMIP/NOAA-GFDL/GFDL-CM4/historical/r1i1p1f1/Amon/rlus/gr1/v20180701/'
dnetcdf.loc[(dnetcdf['version']=='v20180701') & (dnetcdf['institution_id']=='NOAA-GFDL') & (dnetcdf['experiment_id']=='historical')
& (dnetcdf['table_id'] == 'Amon')
& (dnetcdf['variable_id'] == 'rlus')]
#size shows aggregate size as desired
#old version should not be displayed 'v20180701'
zstore | model | version | table_id | institution_id | variable_id | ensemble_member | grid_label | experiment_id | size | activity_id | |
---|---|---|---|---|---|---|---|---|---|---|---|
73275 | CMIP6/CMIP/NOAA-GFDL/GFDL-CM4/historical/r1i1p1f1/Amon/rlus/gr1/ | GFDL-CM4 | v20180701 | Amon | NOAA-GFDL | rlus | r1i1p1f1 | gr1 | historical | 264327907.0 | CMIP |
#cdzarr = dzarr.reset_index()
set_L = set(dzarr.zstore.unique())
set_G = set(dnetcdf.zstore.unique())
print('AWS-zarr (number of datasets): ',len(set_L),' AWS-netcdf (number of datasets):', len(set_G),' both (number of datasets):', len(set_G.union(set_L)))
#LATEST DRS VERSIONS
print("AWS-netcdf in TB",bytesto(dnetcdf['size'].sum(),'t')) #
print("AWS-zarr in TB",bytesto(dzarr['size'].sum(),'t')) #
AWS-zarr (number of datasets): 355047 AWS-netcdf (number of datasets): 241760 both (number of datasets): 520909 AWS-netcdf in TB 559.8580954597983 AWS-zarr in TB 817.5647525450613
dcpp_nc = len(dnetcdf[dnetcdf.activity_id == 'DCPP'])
dcpp_zarr = len(dzarr[dzarr.activity_id == 'DCPP'])
#We are not accounting for DCPP at this time since the definition of a dataset is yet to be determined here.
dZR = dzarr[dzarr.activity_id != 'DCPP']
dNC = dnetcdf[dnetcdf.activity_id != 'DCPP']
dzarr.columns
dzarr[dzarr.activity_id =='DCPP']
zstore | vstore | version | source_id | table_id | institution_id | variable_id | member_id | grid_label | experiment_id | size | activity_id |
---|
douter = dZR.merge(dNC, on='zstore', how='outer', suffixes=['', '_n'], indicator=True)
dZR_notNC = douter[douter._merge=='left_only']
dNC_notZR = douter[douter._merge=='right_only']
#d_inner = dZR.merge(dNC, on='zstore', how='inner', suffixes=['_', ''], indicator=True)
dboth = douter[douter._merge=='both']
dZRnNC = dZR_notNC[['activity_id', 'institution_id', 'source_id', 'experiment_id',
'member_id', 'table_id', 'variable_id', 'grid_label', 'zstore','version',
'vstore','size']]
dNCnZR = dNC_notZR[['activity_id_n', 'institution_id_n', 'model', 'experiment_id_n',
'member_id', 'table_id_n', 'variable_id_n', 'grid_label_n', 'zstore','version_n',
'vstore','size_n']] ##check size here
dNCnZR=dNCnZR.rename(columns={"size_n": "size","activity_id_n":"activity_id","institution_id_n":"institution_id","model":"source_id","experiment_id_n":"experiment_id"
,"table_id_n":"table_id","variable_id_n":"variable_id","grid_label_n":"grid_label","version_n":"version"})
dNCandZR = dboth[['activity_id', 'institution_id', 'source_id', 'experiment_id',
'member_id', 'table_id', 'variable_id', 'grid_label', 'zstore','version',
'vstore','size']] #consider the zarr size
dZR = dZRnNC.append(dNCandZR)
dNC = dNCnZR.append(dNCandZR)
#dNCnZR=dNCnZR.rename(columns={"size_n": "size"})
aws_zarr= len(dZR)
aws_nc = len(dNC)
zarr_MissingNC = len(dZRnNC)
nc_MissingZarr = len(dNCnZR)
in_both = len(dNCandZR) #aws_zarr - zarr_MissingNC
num_all = aws_zarr+aws_nc-in_both
print('DCPP (netcdf) datasets attribute to', dcpp_nc)
print('DCPP (zarr) datasets attribute to', dcpp_zarr)
print('For all activities other than DCPP, total unique datasets from both:', num_all )
print('S3/zarr has: ',aws_zarr)
print('S3/netcdf has:',aws_nc)
print('S3/zarr needs: ',nc_MissingZarr) #guestimate 402TB
print('S3/netcdf needs:', zarr_MissingNC )#guestimate 540TB
print('\n What is common in both buckets:',int(100*(in_both)/num_all),'%') #225TB
DCPP (netcdf) datasets attribute to 69619 DCPP (zarr) datasets attribute to 0 For all activities other than DCPP, total unique datasets from both: 451290 S3/zarr has: 355047 S3/netcdf has: 172141 S3/zarr needs: 96243 S3/netcdf needs: 279149 What is common in both buckets: 16 %
dZR_size= dZR['size'].sum()
dNC_size = dNC['size'].sum()
dNCnZR_size = dNCnZR['size'].sum()
dZRnNC_size = dZRnNC['size'].sum()
in_both_size = dNCandZR['size'].sum()
num_all_size = dZR_size+dNC_size -in_both
print('For all activities other than DCPP, size of total unique datasets from both:', num_all_size )
print('S3/zarr has: ',bytesto(dZR_size,'t'))
print('S3/netcdf has:',bytesto(dNC_size ,'t'))
print('S3/zarr needs: ',bytesto(dNCnZR_size,'t'),'TB') #guestimate 402TB
print('S3/netcdf needs:', bytesto(dZRnNC_size,'t'),'TB')#guestimate 540TB
print('\n What is common in both buckets:',round(bytesto(in_both_size,'t')),'T')
For all activities other than DCPP, size of total unique datasets from both: 1529143508832265.0 S3/zarr has: 817.5647525450613 S3/netcdf has: 573.1831670573774 S3/zarr needs: 359.45603408402985 TB S3/netcdf needs: 603.8376195717137 TB What is common in both buckets: 214 T
def autolabel(ax,rects):
"""Attach a text label above each bar in *rects*, displaying its height."""
for rect in rects:
height = rect.get_height()
ax.annotate('{}'.format(height),
xy=(rect.get_x() + rect.get_width() / 2, height),
xytext=(0, 3), # 3 points vertical offset
textcoords="offset points",
ha='center', va='bottom',fontsize=10)
def plot_facet(df_dict,facet='table_id',top_n=8,ymax=300000,ylabel='number',title_add='',units='t'):
labels = df_dict.keys()
stables = set()
for label in labels:
stables = stables.union(set(df_dict[label][facet].unique()))
tables = sorted(list(stables))
vdfs = {}
for label in labels:
df = df_dict[label]
nvdf = []
for table in tables:
if("volume" in ylabel):
ndf = df[df[facet]==table]['size'].sum()
ndf = round(bytesto(ndf,units))
else:
ndf = df[df[facet]==table].zstore.nunique()
nvdf += [ndf]
vdfs[label] = nvdf
names = []
totals = []
for item,table in enumerate(tables):
names += [table]
tsum = 0
for label in labels:
tsum += vdfs[label][item]
totals += [tsum]
num_dict = {'name': names,'total':totals}
for label in labels:
num_dict[label] = vdfs[label]
df_nums = pd.DataFrame(num_dict)
df_nums = df_nums.sort_values(by=['total'],ascending=False)
#df_nums = df_nums.sort_values(by=[list(labels)[1]],ascending=False)
width = 1.0/(1+len(labels)) # the width of the bars
fig, ax = plt.subplots()
names = df_nums.name.values[:top_n]
x = np.arange(len(names))
for item,label in enumerate(labels):
vdf = df_nums[label].values[:top_n]
rects = ax.bar(x - (len(labels)/2-item-0.5)*width, vdf, width, label=label)
autolabel(ax,rects)
# Add some text for labels, title and custom x-axis tick labels, etc.
ax.set_ylabel(ylabel)
ax.set_title(f'{ylabel} top {top_n} by {facet}'+title_add)
ax.set_xticks(x)
ax.set_xticklabels(names)
ax.set_ylim([0,ymax])
ax.legend()
fig.tight_layout()
#plt.savefig(f'compare-{facet}.png')
plt.show()
df_dict = {'ZARR':dZR,'NetCDF':dNC,'ZARR but not NetCDF':dZRnNC,'NetCDF but not ZARR':dNCnZR}
df_dict_size = {'ZARR':dZR,'NetCDF':dNC,'ZARR but not NetCDF':dZRnNC,'NetCDF but not ZARR':dNCnZR}
units = 't'
if(units == 't'): leg='TB'
if(units == 'g'): leg='GB'
plot_facet(df_dict,facet='activity_id',ymax=175000,ylabel='Number of datasets',title_add = ' : non_DCPP only')
plot_facet(df_dict_size,facet='activity_id',ymax=600, ylabel='Data volume in '+leg,title_add = ' : non_DCPP only',units='t')
plot_facet(df_dict,facet='table_id',ymax=175000,ylabel='Number of datasets',title_add = ' : non_DCPP only')
plot_facet(df_dict_size,facet='table_id',ymax=600,ylabel="Data volume in "+leg, title_add = ' : non_DCPP only',units='t')
plot_facet(df_dict,facet='institution_id',top_n=8,ymax=100000,ylabel='Number of datasets',title_add = ' : non_DCPP only')
plot_facet(df_dict_size,facet='institution_id',top_n=5,ymax=500,ylabel="Data volume "+leg,title_add = ' : non_DCPP only',units='t')
plot_facet(df_dict,facet='experiment_id',ymax=100000,ylabel='Number of datasets',title_add = ' : non_DCPP only')
plot_facet(df_dict_size,facet='experiment_id',ymax=500,ylabel="Data volume in "+leg,title_add = ' : non_DCPP only',units='t')
dZRm = dZR[dZR.table_id.str.contains('mon')]
dNCm = dNC[dNC.table_id.str.contains('mon')]
df_dict = {'ZARR':dZRm,'NetCDF':dNCm}
plot_facet(df_dict,facet='variable_id',ymax=30000,title_add = ' : monthly, non_DCPP only')
from matplotlib_venn import venn2
import matplotlib.pyplot as plt
b = round(bytesto(in_both_size,'t'))
z = round(bytesto(dZR_size,'t')) - b
n = round(bytesto(dNC_size ,'t')) - b
venn2(subsets = (z,n,b),set_labels=('zarr/cmip6-pds','netcdf/esgf-world'))
plt.title('Data volume (TB) quick look in zarr and netcdf buckets',fontweight='bold',fontsize=20,pad=30);
plt.show()
#ESGF CMIP6 holdings as of mid April, replication included.
desgf = pd.read_csv("cmip6-institutes.csv",dtype='unicode')#Source: http://esgf-ui.cmcc.it/esgf-dashboard-ui/data-archiveCMIP6.html
desgfsize = desgf['total_size_TB'].astype(float)
desgfsize= desgfsize.sum() / pow(2, 10)
print(desgfsize)#19483.629999999997
19.026982421874997