Question: Is it possible to define our own "lazy" Iris data source, similar to that provided by Iris' OPeNDAP interface?
Answer: There are a few ways to solve this. Unless there is an underlying capability to fetch metadata without fetching the actual data payload, the easiest approach would be for a client to hard-code knowledge of what is in the data-store.
In the following example, I show the creation of a "mars_data" function. Given a forecast reference time, it returns a list of cubes that may be queried from mars. Knowledge of what is available is hard-coded, but the retrieval of the data-payload is entirely dependent upon the data requested. In addition, I show that once cubes are constructed we see the benefit from Iris' lazy loading capabilities, allowing a client (such as Iris) to do the subsetting in a far richer way than can be offered through a single CLI.
First, let's get hold of the packages we need...
import datetime
import textwrap
import numpy as np
import iris
import biggus
import cf_units
Next define a "Fetcher" that can be used to lazily fetch data from our client (MARS). This is similar to the pattern presented in https://pelson.github.io/2013/massive_virtual_arrays_with_biggus/.
class MarsFetcher(object):
def __init__(self, grib_param, yx_resolution, forecast_rt, times, levels):
self._param = grib_param
self._times = np.asarray(times)
self._levels = np.asarray(levels)
self._forecast_rt = forecast_rt
# The required attributes of a "concrete" array, according
# to biggus, are 'ndim', 'dtype', and 'shape'.
self.ndim = 4
self.dtype = np.float32
self.shape = (len(self._times), len(self._levels)) + tuple(yx_resolution)
def __getitem__(self, keys):
# Implement the getitem/slicing interface which is the interface needed for biggus to create numpy arrays.
keys = biggus._init._full_keys(keys, self.ndim)
# Just make the numbers up for now.
result = np.empty(self.shape)[keys]
# But we could do whatever we like... including calling out to the MARS client with this request...
# Subset the times as requested.
steps = self._times[keys[0]]
if not isinstance(keys[0], slice):
steps = [steps]
# Subset the levels as requested.
levels = self._levels[keys[1]]
if not isinstance(keys[1], slice):
levels = [levels]
request = textwrap.dedent("""
retrieve,
class=od,
date={forecast_rt:%Y-%m-%d},
expver=1,
levelist={levels},
levtype=pl,
param={param},
step={steps},
stream=oper,
time={forecast_rt:%H:%M:%S},
type=fc
""").strip().format(levels='/'.join(map(str, levels)),
param=self._param,
steps='/'.join(map(str, steps)),
forecast_rt=self._forecast_rt)
# Print the request, so that we can see what would happen if we really implemented a client.
print(request)
return result
Now, produce cubes based on the expectation of what is available.
def mars_data(forecast_reference=None):
"""
Generates a list of cubes that can be accessed from MARS.
Parameters
----------
forecast_reference
A datetime
"""
if forecast_reference is None:
now_m12 = datetime.datetime.now() - datetime.timedelta(hours=12)
forecast_reference = datetime.datetime(now_m12.year, now_m12.month, now_m12.day,
0 if now_m12.hour <= 12 else 12)
forecast_periods = [0, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57, 60,
63, 66, 69, 72, 75, 78, 81, 84, 87, 90, 93, 96, 99, 102, 105, 108, 111, 114, 117, 120]
levels = [1000, 950, 925, 900, 850, 800, 700, 600, 500, 400, 300, 250, 200, 150, 100, 70, 50, 30, 20, 10, 7, 5, 3, 2, 1]
t_unit = cf_units.Unit('hours since 2000-01-01')
times = [t_unit.date2num(forecast_reference) + hour
for hour in forecast_periods]
cubes = iris.cube.CubeList()
for std_name, param in (['air_temperature', '130.128'],
['geopotential_height', '129.128']):
template = iris.cube.Cube(standard_name=std_name, attributes={'Source': 'My MARS dataset'},
data=biggus.NumpyArrayAdapter(MarsFetcher(param, (769, 1024),
forecast_reference,
forecast_periods, levels)))
template.add_dim_coord(iris.coords.DimCoord(points=np.linspace(-180, 180, 1024),
standard_name='longitude'), 3)
template.add_dim_coord(iris.coords.DimCoord(points=np.linspace(-90, 90, 769),
standard_name='latitude'), 2)
template.add_dim_coord(iris.coords.DimCoord(points=levels,
long_name='pressure', units='hPa'), 1)
template.add_dim_coord(iris.coords.DimCoord(points=times,
standard_name='time',
units=t_unit),
0)
template.add_aux_coord(iris.coords.DimCoord(points=forecast_periods,
standard_name='forecast_period'),
0)
template.add_aux_coord(iris.coords.DimCoord(points=[t_unit.date2num(forecast_reference)],
standard_name='forecast_reference_time', units=t_unit))
cubes.append(template)
return cubes
As expected, calling this function produces a number of cubes:
cubes = mars_data()
print(cubes)
[temp] = cubes.extract('air_temperature')
print('----')
print(temp)
0: air_temperature / (unknown) (time: 41; pressure: 25; latitude: 769; longitude: 1024) 1: geopotential_height / (unknown) (time: 41; pressure: 25; latitude: 769; longitude: 1024) ---- air_temperature / (unknown) (time: 41; pressure: 25; latitude: 769; longitude: 1024) Dimension coordinates: time x - - - pressure - x - - latitude - - x - longitude - - - x Auxiliary coordinates: forecast_period x - - - Scalar coordinates: forecast_reference_time: 2017-01-02 12:00:00 Attributes: Source: My MARS dataset
Whilst the MarsFetcher that we have implemented currently only returns empty data, we can see the MARS queries that it would take to subset the data as requested.
First, the full cube...
data = temp[...].data
retrieve, class=od, date=2017-01-02, expver=1, levelist=1000/950/925/900/850/800/700/600/500/400/300/250/200/150/100/70/50/30/20/10/7/5/3/2/1, levtype=pl, param=130.128, step=0/3/6/9/12/15/18/21/24/27/30/33/36/39/42/45/48/51/54/57/60/63/66/69/72/75/78/81/84/87/90/93/96/99/102/105/108/111/114/117/120, stream=oper, time=12:00:00, type=fc
Now a subset of levels and timesteps...
data = temp[::2, :4].data
retrieve, class=od, date=2017-01-02, expver=1, levelist=1000/950/925/900, levtype=pl, param=130.128, step=0/6/12/18/24/30/36/42/48/54/60/66/72/78/84/90/96/102/108/114/120, stream=oper, time=12:00:00, type=fc
And just to show it works as expected, standard Iris constraints also work...
data = temp.extract(iris.Constraint(pressure=1000,
forecast_period=lambda c: 5 < c.point < 25)).data
retrieve, class=od, date=2017-01-02, expver=1, levelist=1000, levtype=pl, param=130.128, step=6/9/12/15/18/21/24, stream=oper, time=12:00:00, type=fc