import s3fs
import requests
from urllib import request
from http.cookiejar import CookieJar
import numpy as np
import xarray as xr
import matplotlib.pyplot as plt
from json import dumps
from io import StringIO
from os.path import dirname, join
import netrc
import os
import fsspec
import ujson # fast json
from fsspec_reference_maker.hdf import SingleHdf5ToZarr
from fsspec_reference_maker.combine import MultiZarrToZarr
import xarray as xr
import dask
import hvplot.xarray
import fsspec_reference_maker
fsspec_reference_maker.__version__
'0.0.3+5.gfda8a53'
#########Setting up earthdata login credentials
def setup_earthdata_login_auth(endpoint):
"""
Set up the request library so that it authenticates against the given Earthdata Login
endpoint and is able to track cookies between requests. This looks in the .netrc file
first and if no credentials are found, it prompts for them.
Valid endpoints:
urs.earthdata.nasa.gov - Earthdata Login production
"""
try:
username, _, password = netrc.netrc().authenticators(endpoint)
except (FileNotFoundError, TypeError):
# FileNotFound = There's no .netrc file
# TypeError = The endpoint isn't in the netrc file, causing the above to try unpacking None
print("There's no .netrc file or the The endpoint isn't in the netrc file")
manager = request.HTTPPasswordMgrWithDefaultRealm()
manager.add_password(None, endpoint, username, password)
auth = request.HTTPBasicAuthHandler(manager)
jar = CookieJar()
processor = request.HTTPCookieProcessor(jar)
opener = request.build_opener(auth, processor)
request.install_opener(opener)
###############################################################################
edl="urs.earthdata.nasa.gov"
setup_earthdata_login_auth(edl)
def begin_s3_direct_access():
url="https://archive.podaac.earthdata.nasa.gov/s3credentials"
response = requests.get(url).json()
return s3fs.S3FileSystem(key=response['accessKeyId'],secret=response['secretAccessKey'],token=response['sessionToken'],client_kwargs={'region_name':'us-west-2'})
import os
import sys
sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib'))
import ebdpy as ebd
ebd.set_credentials(profile='esip-qhub')
profile = 'esip-qhub'
region = 'us-west-2'
endpoint = f's3.{region}.amazonaws.com'
ebd.set_credentials(profile=profile, region=region, endpoint=endpoint)
worker_max = 30
client,cluster = ebd.start_dask_cluster(profile=profile,worker_max=worker_max,
region=region, use_existing_cluster=True,
adaptive_scaling=False, wait_for_cluster=False,
environment='pangeo', worker_profile='Medium Worker',
propagate_env=True)
/home/conda/store/c37333ddf2c7a75f8b9c2f18df15ab096f5e7c49bb1aa06ca31fcab52dd54528-pangeo/lib/python3.9/site-packages/dask_gateway/client.py:21: FutureWarning: format_bytes is deprecated and will be removed in a future release. Please use dask.utils.format_bytes instead. from distributed.utils import LoopRunner, format_bytes
No Cluster running. Starting new cluster. Setting Fixed Scaling workers=30 Reconnect client to clear cache client.dashboard_link (for new browser tab/window or dashboard searchbar in Jupyterhub): https://jupyter.qhub.esipfed.org/gateway/clusters/dev.cc990959c7554138828671f7b4f48c36/status Propagating environment variables to workers
fs = begin_s3_direct_access()
flist = []
for lyr in range(2002,2003): #2022):
for imon in range(12,13): #(1,13):
fstr = str(lyr)+str(imon).zfill(2)+'*.nc'
files = fs.glob(join("podaac-ops-cumulus-protected/", "MUR-JPL-L4-GLOB-v4.1", fstr))
for file in files:
flist.append(file)
print('total number of individual netcdf files:',len(flist))
urls = ["s3://" + f for f in flist]
so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first')
fs2 = fsspec.filesystem('s3', anon=False)
json_dir = 's3://esip-qhub/nasa/mur/jsons/'
#If the directory exists, remove it (and all the files)
try:
fs2.rm(json_dir, recursive=True)
except:
pass
def gen_json(u):
with fs.open(u, **so) as infile:
h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300)
p = u.split('/')
date = p[4][0:8] #p[3]
fname = p[4] #p[5]
outf = f'{json_dir}{date}.{fname}.json'
print(outf)
with fs2.open(outf, 'wb') as f:
f.write(ujson.dumps(h5chunks.translate()).encode());
%%time
_ = dask.compute(*[dask.delayed(gen_json)(u) for u in urls[0:30]], retries=10);
flist2 = fs2.ls(json_dir)
furls = sorted(['s3://'+f for f in flist2])
furls[0]
client.close(); cluster.shutdown()
from dask.distributed import Client
client = Client()
client
mzz = MultiZarrToZarr(furls,
storage_options={'anon':False},
remote_protocol='s3',
remote_options={'anon' : 'True'}, #JSON files
xarray_open_kwargs={
'decode_cf' : False,
'mask_and_scale' : False,
'decode_times' : False,
'use_cftime' : False,
'drop_variables': ['reference_time', 'crs'],
'decode_coords' : False
},
xarray_concat_args={
# "data_vars": "minimal",
# "coords": "minimal",
# "compat": "override",
"join": "override",
"combine_attrs": "override",
"dim": "time"
}
)
%%time
#%%prun -D multizarr_profile
mzz.translate('mur_consolidated.json')
rpath = 's3://esip-qhub-public/nasa/mur/mur4.1_consolidated.json'
fs2.put_file(lpath='mur_consolidated.json', rpath=rpath)
url="https://archive.podaac.earthdata.nasa.gov/s3credentials"
response = requests.get(url).json()
turl ='s3://esip-qhub/nasa/mur/jsons/20021201.20021201090000-JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1.nc.json'
s_opts = {'requester_pays':True, 'skip_instance_cache':True}
r_opts = {'key':response['accessKeyId'],
'secret':response['secretAccessKey'],
'token':response['sessionToken'],
'client_kwargs':{'region_name':'us-west-2'}}
fs = fsspec.filesystem("reference", fo=turl, ref_storage_args=s_opts,
remote_protocol='s3', remote_options=r_opts)
m = fs.get_mapper("")
ds = xr.open_dataset(m, engine="zarr", consolidated=False)
ds
r_opts = {'key':response['accessKeyId'],
'secret':response['secretAccessKey'],
'token':response['sessionToken'],
'client_kwargs':{'region_name':'us-west-2'}}
s_opts = {'requester_pays':True, 'skip_instance_cache':True}
fs = fsspec.filesystem("reference", fo=rpath, ref_storage_args=s_opts,
remote_protocol='s3', remote_options=r_opts)
m = fs.get_mapper("")
ds = xr.open_dataset(m, engine="zarr", consolidated=False)
ds
<xarray.Dataset> Dimensions: (time: 30, lat: 17999, lon: 36000) Coordinates: * lat (lat) float32 -89.99 -89.98 -89.97 ... 89.97 89.98 89.99 * lon (lon) float32 -180.0 -180.0 -180.0 ... 180.0 180.0 180.0 * time (time) datetime64[us] 2002-12-01T09:00:00 ... 2002-12-3... Data variables: analysed_sst (time, lat, lon) float32 ... analysis_error (time, lat, lon) float32 ... mask (time, lat, lon) float32 ... sea_ice_fraction (time, lat, lon) float32 ... Attributes: (12/47) Conventions: CF-1.5 Metadata_Conventions: Unidata Observation Dataset v1.0 acknowledgment: Please acknowledge the use of these data with... cdm_data_type: grid comment: MUR = "Multi-scale Ultra-high Reolution" creator_email: ghrsst@podaac.jpl.nasa.gov ... ... summary: A merged, multi-sensor L4 Foundation SST anal... time_coverage_end: 20021201T210000Z time_coverage_start: 20021130T210000Z title: Daily MUR SST, Final product uuid: 27665bc0-d5fc-11e1-9b23-0800200c9a66 westernmost_longitude: -180.0
array([-89.99, -89.98, -89.97, ..., 89.97, 89.98, 89.99], dtype=float32)
array([-179.99, -179.98, -179.97, ..., 179.98, 179.99, 180. ], dtype=float32)
array(['2002-12-01T09:00:00.000000', '2002-12-02T09:00:00.000000', '2002-12-03T09:00:00.000000', '2002-12-04T09:00:00.000000', '2002-12-05T09:00:00.000000', '2002-12-06T09:00:00.000000', '2002-12-07T09:00:00.000000', '2002-12-08T09:00:00.000000', '2002-12-09T09:00:00.000000', '2002-12-10T09:00:00.000000', '2002-12-11T09:00:00.000000', '2002-12-12T09:00:00.000000', '2002-12-13T09:00:00.000000', '2002-12-14T09:00:00.000000', '2002-12-15T09:00:00.000000', '2002-12-16T09:00:00.000000', '2002-12-17T09:00:00.000000', '2002-12-18T09:00:00.000000', '2002-12-19T09:00:00.000000', '2002-12-20T09:00:00.000000', '2002-12-21T09:00:00.000000', '2002-12-22T09:00:00.000000', '2002-12-23T09:00:00.000000', '2002-12-24T09:00:00.000000', '2002-12-25T09:00:00.000000', '2002-12-26T09:00:00.000000', '2002-12-27T09:00:00.000000', '2002-12-28T09:00:00.000000', '2002-12-29T09:00:00.000000', '2002-12-30T09:00:00.000000'], dtype='datetime64[us]')
[19438920000 values with dtype=float32]
[19438920000 values with dtype=float32]
[19438920000 values with dtype=float32]
[19438920000 values with dtype=float32]
import hvplot.xarray
sst = ds['analysed_sst'].sel(time='2002-12-20 12:00', method='nearest').load()
sst.hvplot.quadmesh(x='lon', y='lat', geo=True, rasterize=True, cmap='turbo' )