Authors: Luca Mariani, Clemens Kloss
Abstract: Demonstrates ground observatory data by direct access to the BGS FTP server (AUX_OBS dataset). Note that in the future there will be a VirES-based access method (work in progress).
%load_ext watermark
%watermark -i -v -p viresclient,pandas,xarray,matplotlib
# Python standard library
import os
import re
from contextlib import closing
from datetime import datetime
from ftplib import FTP
from pathlib import Path
from tempfile import TemporaryFile
from zipfile import ZipFile
# Extra libraries
import cartopy.crs as ccrs
import cartopy.feature as cfeature
import cdflib
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from tqdm import tqdm
from viresclient import SwarmRequest
# FTP server
HOST = 'ftp.nerc-murchison.ac.uk'
# Local directories (update paths according to your environment)
OBS_HOUR_LOCAL_DIR = Path('~/data/AUX_OBS/hour').expanduser()
OBS_MINUTE_LOCAL_DIR = Path('~/data/AUX_OBS/minute').expanduser()
OBS_SECOND_LOCAL_DIR = Path('~/data/AUX_OBS/second').expanduser()
# Create directories to use
os.makedirs(OBS_HOUR_LOCAL_DIR, exist_ok=True)
os.makedirs(OBS_MINUTE_LOCAL_DIR, exist_ok=True)
os.makedirs(OBS_SECOND_LOCAL_DIR, exist_ok=True)
def search(obstype, start_date=None, end_date=None):
"""Search OBS data file on the FTP server.
Parameters
----------
obstype : str
OBS file type: `hour`, `minute`, `second`.
start_date : str or numpy.datetime64
lower bound of the time interval (default: no time interval).
stop_date : str or numpy.datetime64
upper bound of the time interval (default: no time interval).
Returns
-------
list(str)
OBS data files.
Raises
------
ValueError
if `obstype` is not valid.
ftplib.all_errors
in case of FTP errors.
"""
OBS_HOUR_DIR = '/geomag/Swarm/AUX_OBS/hour'
OBS_MINUTE_DIR = '/geomag/Swarm/AUX_OBS/minute'
OBS_SECOND_DIR = '/geomag/Swarm/AUX_OBS/second'
PATTERN = re.compile(
r'SW_OPER_AUX_OBS[_MS]2__(?P<start>\d{8}T\d{6})_'
r'(?P<stop>\d{8}T\d{6})_\d{4}\.ZIP$'
)
MINDATE = np.datetime64('0000', 's')
MAXDATE = np.datetime64('9999', 's')
def _callback(line, result, start_date, end_date):
if line[0] == '-':
match = PATTERN.match(line[56:])
if match:
start, stop = match.groupdict().values()
start = np.datetime64(datetime.strptime(start, '%Y%m%dT%H%M%S'))
stop = np.datetime64(datetime.strptime(stop, '%Y%m%dT%H%M%S'))
if end_date >= start and start_date <= stop:
result.append(line[56:])
start_date = MINDATE if start_date is None else np.datetime64(start_date)
end_date = MAXDATE if end_date is None else np.datetime64(end_date)
paths = {
'hour': OBS_HOUR_DIR,
'minute': OBS_MINUTE_DIR,
'second': OBS_SECOND_DIR
}
if obstype not in paths:
raise ValueError(
f'obstype must be hour, minute or second, not {obstype}'
)
result = []
with FTP(HOST) as ftp:
ftp.login()
ftp.dir(paths[obstype], lambda line: _callback(line, result, start_date, end_date))
return [f'{paths[obstype]}/{name}' for name in sorted(result)]
def loacal_search(obstype, start_date=None, end_date=None):
"""Search OBS data file on local filesystem.
Parameters
----------
obstype : str
OBS file type: `hour`, `minute`, `second`.
start_date : str or numpy.datetime64
lower bound of the time interval (default: no time interval).
stop_date : str or numpy.datetime64
upper bound of the time interval (default: no time interval).
Returns
-------
list(pathlib.Path)
OBS data files.
Raises
------
ValueError
if `obstype` is not valid.
"""
PATTERN = re.compile(
r'SW_OPER_AUX_OBS[_MS]2__(?P<start>\d{8}T\d{6})_'
r'(?P<stop>\d{8}T\d{6})_\d{4}\.\w{3}$'
)
MINDATE = np.datetime64('0000', 's')
MAXDATE = np.datetime64('9999', 's')
start_date = MINDATE if start_date is None else np.datetime64(start_date)
end_date = MAXDATE if end_date is None else np.datetime64(end_date)
paths = {
'hour': OBS_HOUR_LOCAL_DIR,
'minute': OBS_MINUTE_LOCAL_DIR,
'second': OBS_SECOND_LOCAL_DIR
}
if obstype not in paths:
raise ValueError(
f'obstype must be hour, minute or second, not {obstype}'
)
result = []
for file in (elm for elm in paths[obstype].iterdir() if elm.is_file()):
match = PATTERN.match(file.name)
if match:
start, stop = match.groupdict().values()
start = np.datetime64(datetime.strptime(start, '%Y%m%dT%H%M%S'))
stop = np.datetime64(datetime.strptime(stop, '%Y%m%dT%H%M%S'))
if end_date >= start and start_date <= stop:
result.append(file)
return sorted(result)
def download(files, outdir='', show_progress=True):
"""Download files from the FTP server.
Parameters
----------
outdir : str or os.PathLike
output directory (default: current directory).
files : collections.abc.Iterable(str)
path(s) of the file(s) to be downloaded
Returns
-------
list(pathlib.Path)
list of downloaded files.
Raises
------
ftplib.all_errors
in case of FTP errors.
"""
def _callback(data, fh, pbar):
pbar.update(len(data))
fh.write(data)
outdir = Path(outdir)
downloaded = []
with FTP(HOST) as ftp:
ftp.login()
for file in files:
file = str(file)
basename = file.split('/')[-1]
with TemporaryFile(dir=outdir) as tmp:
with tqdm(total=ftp.size(file), unit='B',
unit_scale=True, desc=basename,
disable=not show_progress) as pbar:
ftp.retrbinary(f'RETR {file}', callback=lambda x: _callback(x, tmp, pbar))
with ZipFile(tmp) as zf:
hdr = Path(basename).with_suffix('.HDR').name
datafile = [elm for elm in zf.namelist()if elm != hdr][0]
outfile = zf.extract(datafile, outdir)
downloaded.append(Path(outfile))
return downloaded
def ascii_to_pandas(file):
"""Convert an OBS ASCII file to a pandas DataFrame.
Parameters
----------
file : str or os.PathLike
OBS ASCII file.
Returns
-------
pandas.DataFrame
data contained in the OBS ASCII file.
"""
df = pd.read_csv(
file,
comment='#',
delim_whitespace=True,
names = ['IAGA_code', 'Latitude', 'Longitude', 'Radius',
'yyyy', 'mm', 'dd', 'UT', 'B_N', 'B_E', 'B_C'],
parse_dates={'Timestamp': [4, 5, 6]},
infer_datetime_format=True
)
df['Timestamp'] = df['Timestamp'] + pd.to_timedelta(df['UT'], 'h')
df.drop(columns='UT', inplace=True)
df.set_index('Timestamp', inplace=True)
return df
def cdf_to_pandas(file):
"""Convert an OBS CDF file to a pandas DataFrame.
Parameters
----------
file : str or os.PathLike
OBS CDF file.
Returns
-------
pandas.DataFrame
data contained in the OBS CDF file.
"""
with closing(cdflib.cdfread.CDF(file)) as data:
ts = pd.DatetimeIndex(
cdflib.cdfepoch.encode(data.varget('Timestamp'), iso_8601=True),
name='Timestamp'
)
df = pd.DataFrame(
{
'IAGA_code': data.varget('IAGA_code')[:,0,0],
'Latitude': data.varget('Latitude'),
'Longitude': data.varget('Longitude'),
'Radius': data.varget('Radius'),
'B_N': data.varget('B_NEC')[:,0],
'B_E': data.varget('B_NEC')[:,1],
'B_C': data.varget('B_NEC')[:,2]
},
index=ts
)
return df
def download_obslist(outdir=''):
"""Search observatory list file on the FTP server.
Parameters
----------
outdir : str or os.PathLike
output directory (default: current directory).
Returns
-------
str
Observatory list file.
Raises
------
ftplib.all_errors
in case of FTP errors.
"""
OBS_HOUR_DIR = '/geomag/Swarm/AUX_OBS/hour'
def _callback(line, result):
if line[0] == '-':
match = re.match('obslist.+_gd\.all$', line[56:])
if match:
result.append(line[56:])
outdir = Path(outdir)
files = []
with FTP(HOST) as ftp:
ftp.login()
ftp.dir(OBS_HOUR_DIR, lambda line: _callback(line, files))
remote_obslist_file = f'{OBS_HOUR_DIR}/{files[0]}'
local_obslist_file = outdir / files[0]
with local_obslist_file.open('w') as fh:
ftp.retrlines(f'RETR {remote_obslist_file}', lambda line: print(line, file=fh))
return local_obslist_file
def read_obslist(file):
"""Convert observatory list ASCII file to a pandas DataFrame.
Parameters
----------
file : str or os.PathLike
observatory list ASCII file.
Returns
-------
pandas.DataFrame
data contained in the observatory list ASCII file.
"""
df = pd.read_csv(
file,
delim_whitespace=True,
names = ['IAGA_code', 'Latitude', 'Longitude', 'Altitude'],
)
return df
Hourly means hosted at:
Processing methodology:
Use the search()
function (see Settings and functions) to search OBS hourly data from 2018-01-01T00:00:00 to 2019-12-31T23:59:59 on the FTP server:
result = search('hour', '2018-01-01', '2019-12-31T23:59:59')
result
Use the download()
function (see Settings and functions) to download data:
downloaded = download(result, outdir=OBS_HOUR_LOCAL_DIR)
downloaded
Select one of the AUX_OBS_2_ files (e.g. the first one):
file1 = downloaded[0]
file1
Read ASCII file and convert data to a pandas.DataFrame
:
df1 = pd.read_csv(
file1,
comment='#',
delim_whitespace=True,
names = ['IAGA_code', 'Latitude', 'Longitude',
'Radius', 'yyyy', 'mm', 'dd', 'UT', 'B_N', 'B_E', 'B_C'],
parse_dates={'Timestamp': [4, 5, 6]},
infer_datetime_format=True
)
df1['Timestamp'] = df1['Timestamp'] + pd.to_timedelta(df1['UT'], 'h')
df1.drop(columns='UT', inplace=True)
df1.set_index('Timestamp', inplace=True)
df1
For more information on pandas.Dataframe
see: https://pandas.pydata.org/docs/reference/frame.
The same result can be obtained with the ascii_to_pandas()
function (see Settings and functions).
new = ascii_to_pandas(file1)
new
Compare the two data frames:
pd.testing.assert_frame_equal(df1, new)
Example: get minimum and maximum dates:
df1.index.min(), df1.index.max()
Example: get list of observatories (IAGA codes) stored in the files:
df1['IAGA_code'].unique()
Pandas dataframes can be concatenated to represent data obtained from more than one file. E.g. read data from the next AUX_OBS_2_ file:
file2 = downloaded[1]
df2 = ascii_to_pandas(file2)
df2
The two dataframes can be concatenated using the pandas.concat()
function (for more information see: https://pandas.pydata.org/docs/reference/api/pandas.concat.html#pandas.concat):
concatenated = pd.concat([df1, df2])
concatenated.sort_values(by=['IAGA_code', 'Timestamp'], inplace=True)
concatenated.index.min(), concatenated.index.max()
concatenated
Plot hourly mean values on a map:
df = ascii_to_pandas(file1)
# Add F column
df['F'] = np.linalg.norm(df[['B_N', 'B_E', 'B_C']], axis=1)
# Select date
date = '2018-01-01T01:30:00'
fig = plt.figure(figsize=(16, 10))
# Draw map
ax = plt.subplot2grid((1, 1), (0, 0), projection=ccrs.PlateCarree())
ax.coastlines()
ax.add_feature(cfeature.OCEAN, facecolor='lightgrey')
ax.gridlines()
# Plot observatory measurements at date
cm = ax.scatter(
df[date]['Longitude'], df[date]['Latitude'], c=df[date]['F'],
marker='D', transform=ccrs.PlateCarree(),
label=f'OBS F hourly mean value at {date}'
)
# Add IAGA codes
for row in df[date].itertuples():
ax.annotate(
row.IAGA_code, (row.Longitude, row.Latitude),
xycoords=ccrs.PlateCarree()._as_mpl_transform(ax)
)
# Set title and legendbb
plt.title('Magnetic field intensities')
plt.legend()
# Add colorbar
cax = fig.add_axes([0.92, 0.2, 0.02, 0.6])
plt.colorbar(cm, cax=cax, label='F [nT]')
plt.show()
Read list of all observatories (use the download_obslist()
and read_obslist()
functions defined in Settings and functions):
obslist = download_obslist(outdir=OBS_HOUR_LOCAL_DIR)
obslist
obs = read_obslist(obslist)
obs
Add the missing observatories, i.e. those not included in the observatory hourly mean values, to the plot:
df = ascii_to_pandas(file1)
# Add F column
df['F'] = np.linalg.norm(df[['B_N', 'B_E', 'B_C']], axis=1)
# Select date
date = '2018-01-01T01:30:00'
fig = plt.figure(figsize=(16, 10))
# Draw map
ax = plt.subplot2grid((1, 1), (0, 0), projection=ccrs.PlateCarree())
ax.coastlines()
ax.add_feature(cfeature.OCEAN, facecolor='lightgrey')
ax.gridlines()
# Plot observatory measurements at date
cm = ax.scatter(
df[date]['Longitude'], df[date]['Latitude'], c=df[date]['F'],
marker='D', transform=ccrs.PlateCarree(),
label=f'OBS F hourly mean value at {date}'
)
# Add IAGA codes
for row in df[date].itertuples():
ax.annotate(
row.IAGA_code, (row.Longitude, row.Latitude),
xycoords=ccrs.PlateCarree()._as_mpl_transform(ax)
)
# Add missing observatories from obslist (position only)
missing = obs[~obs['IAGA_code'].isin(df[date]['IAGA_code'].unique())]
cm2 = ax.scatter(missing['Longitude'], missing['Latitude'], c='black', marker='D', alpha=0.1)
# Set title and legendbb
plt.title('Magnetic field intensities')
plt.legend()
# Add colorbar
cax = fig.add_axes([0.92, 0.2, 0.02, 0.6])
plt.colorbar(cm, cax=cax, label='F [nT]')
plt.show()
Add Swarm F measurements between 01:00:00 and 02:00:00 of the same day:
# using viresclient
request = SwarmRequest()
request.set_collection('SW_OPER_MAGA_LR_1B')
request.set_products(measurements='F')
start_date = '2018-01-01T01:00:00'
end_date = '2018-01-01T02:00:00'
data = request.get_between(start_date, end_date)
df = ascii_to_pandas(file1)
# Add F column
df['F'] = np.linalg.norm(df[['B_N', 'B_E', 'B_C']], axis=1)
# Select date
date = '2018-01-01T01:30:00'
fig = plt.figure(figsize=(16, 10))
# Draw map
ax = plt.subplot2grid((1, 1), (0, 0), projection=ccrs.PlateCarree())
ax.coastlines()
ax.add_feature(cfeature.OCEAN, facecolor='lightgrey')
ax.gridlines()
# Plot observatory measurements at date
cm = ax.scatter(
df[date]['Longitude'], df[date]['Latitude'], c=df[date]['F'],
marker='D', transform=ccrs.PlateCarree(),
label=f'OBS F hourly mean value at {date}'
)
# Add IAGA codes
for row in df[date].itertuples():
ax.annotate(
row.IAGA_code, (row.Longitude, row.Latitude),
xycoords=ccrs.PlateCarree()._as_mpl_transform(ax)
)
# Add missing observatories from obslist (position only)
missing = obs[~obs['IAGA_code'].isin(df[date]['IAGA_code'].unique())]
ax.scatter(missing['Longitude'], missing['Latitude'], c='black', marker='D', alpha=0.1)
# Add Swarm A data
swarm = data.as_dataframe()
ax.scatter(
swarm['Longitude'], swarm['Latitude'], c=swarm['F'],
transform=ccrs.PlateCarree(),
label=f'Swarm A - F measurements between {start_date} and {end_date}'
)
# Set title and legendbb
plt.title('Magnetic field intensities')
plt.legend()
# Add colorbar
cax = fig.add_axes([0.92, 0.2, 0.02, 0.6])
plt.colorbar(cm, cax=cax, label='F [nT]')
plt.show()
Files containing observatory minute and second mean values have CDF format. They can be downloade from:
Use the search()
function (see Settings and functions) to search OBS minute/second data from 2019-12-01T00:00:00 to 2019-12-31T23:59:59 on the FTP server:
minute = search('minute', '2019-12-01', '2019-12-31T23:59:59')
minute
second = search('second', '2019-12-01', '2019-12-31T23:59:59')
second
Use the download()
function (see Settings and functions) to download data:
dl_minute = download(minute, outdir=OBS_MINUTE_LOCAL_DIR)
dl_second = download(second, outdir=OBS_SECOND_LOCAL_DIR)
Select one of the AUX_OBSM2_ files (e.g. the first one):
file1 = dl_minute[0]
file1
Read CDF file using cdflib
(for more information on cdflib
, see: https://github.com/MAVENSDC/cdflib)
data = cdflib.CDF(file1)
Get info about the file as a Python dictionary:
data.cdf_info()
You can see that measurements are stored as zVariables:
data.cdf_info()['zVariables']
Data can be retrieved via the .varget()
method, e.g:
data.varget('B_NEC')
Data is returned as a numpy.ndarray
object (for more information on numpy.ndarray
, see: https://docs.scipy.org/doc/numpy/reference/arrays.ndarray.html).
Variable attributes can be retrieved using the .varattsget()
method, e.g.:
data.varattsget('B_NEC')
Attributes are returned as a Python dictionary.
Let's retrieve the timestamps:
data.varget('Timestamp')
Timestamp
type is:
data.varget('Timestamp').dtype
Timestamps are represented as NumPy float64
values. Why? Get info about Timestamp
variable using the .varinq()
method:
data.varinq('Timestamp')
The returned dictionary shows that the data type is CDF_EPOCH consising in a floating point value representing the number of milliseconds since 01-Jan-0000 00:00:00.000. It can be converted to a more readable format (list of strings) using the cdflib.cdfepoch.encode()
function:
ts = cdflib.cdfepoch.encode(data.varget('Timestamp'), iso_8601=True)
ts[:5]
Or to a numpy array of numpy.datetime64
values:
ts = np.array(cdflib.cdfepoch.encode(data.varget('Timestamp'), iso_8601=True), dtype='datetime64')
ts[:5]
You may be interested also in the CDF global attributes:
data.globalattsget()
Close the file when you have finished:
data.close()
AUX_OBSS2_ data contains the same variables:
with closing(cdflib.cdfread.CDF(dl_second[0])) as data:
zvariables = data.cdf_info()['zVariables']
zvariables
Data can be represented as a pandas.DataFrame
object:
with closing(cdflib.cdfread.CDF(file1)) as data:
ts = pd.DatetimeIndex(
cdflib.cdfepoch.encode(data.varget('Timestamp'), iso_8601=True),
name='Timestamp'
)
df1 = pd.DataFrame(
{
'IAGA_code': data.varget('IAGA_code')[:,0,0],
'Latitude': data.varget('Latitude'),
'Longitude': data.varget('Longitude'),
'Radius': data.varget('Radius'),
'B_N': data.varget('B_NEC')[:,0],
'B_E': data.varget('B_NEC')[:,1],
'B_C': data.varget('B_NEC')[:,2]
},
index=ts
)
df1
For more information on pandas.Dataframe
see: https://pandas.pydata.org/docs/reference/frame.
The same result can be obtained with the cdf_to_pandas()
function (see Settings and functions).
new = cdf_to_pandas(file1)
new
Compare the two data frames:
pd.testing.assert_frame_equal(df1, new)
Example: get minimum and maximum dates:
df1.index.min(), df1.index.max()
Example: get list of observatories (IAGA codes) stored in the files:
df1['IAGA_code'].unique()
Example: get list of observatories (IAGA codes) included in the following ranges of coordinates:
df1[(df1['Latitude'] >= 30) & (df1['Latitude'] <= 70) & (df1['Longitude'] >= -10) & (df1['Longitude'] <= 40)]['IAGA_code'].unique()
You can do the same using the .query()
method (see: https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.query.html#pandas.DataFrame.query):
df1.query('(30 <= Latitude <= 70) and (-10 <= Longitude <= 40)')['IAGA_code'].unique()
Pandas dataframes can be concatenated to represent data obtained from more than one file. E.g. read data from the next AUX_OBSM2_ file:
file2 = dl_minute[1]
df2 = cdf_to_pandas(file2)
df2
The two dataframes can be concatenated using the pandas.concat()
function (for more information see: https://pandas.pydata.org/docs/reference/api/pandas.concat.html#pandas.concat):
concatenated = pd.concat([df1, df2])
concatenated.sort_values(by=['IAGA_code', 'Timestamp'], inplace=True)
concatenated.index.min(), concatenated.index.max()
concatenated
With AUX_OBSS2_ data:
files = dl_second[:2]
files
concatenated = pd.concat([cdf_to_pandas(file) for file in files])
concatenated.sort_values(by=['IAGA_code', 'Timestamp'], inplace=True)
concatenated.index.min(), concatenated.index.max()
concatenated