Skip to content

wrapper

A thin wrapper around xarray for reading and writing Ensemble Forecast Time Series (EFTS) data sets.

EftsDataSet

EftsDataSet(data: Union[str, Dataset])

Convenience class for access to a Ensemble Forecast Time Series in netCDF file.

Source code in src/efts_io/wrapper.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def __init__(self, data: Union[str, xr.Dataset]) -> None:
    """Create a new EftsDataSet object."""
    from xarray.coding import times

    self.time_dim = None
    self.time_zone = "UTC"
    self.time_zone_timestamps = True  # Not sure about https://github.com/csiro-hydroinformatics/efts-io/issues/3
    self.STATION_DIMNAME = STATION_DIMNAME
    self.stations_varname = STATION_ID_VARNAME
    self.LEAD_TIME_DIMNAME = LEAD_TIME_DIMNAME
    self.ENS_MEMBER_DIMNAME = ENS_MEMBER_DIMNAME
    self.identifiers_dimensions = []
    if isinstance(data, str):
        # work around https://jira.csiro.au/browse/WIRADA-635
        # lead_time can be a problem with xarray, so do not decode "times"
        x = xr.open_dataset(data, decode_times=False)

        # replace the time and station names coordinates values
        # TODO This is probably not a long term solution for round-tripping a read/write or vice and versa
        decod = times.CFDatetimeCoder(use_cftime=True)
        var = xr.as_variable(x.coords[TIME_DIMNAME])
        self.time_zone = var.attrs[TIME_STANDARD_ATTR_KEY]
        time_coords = decod.decode(var, name=TIME_DIMNAME)
        tz = self.time_zone if self.time_zone_timestamps else None
        time_coords.values = cftimes_to_pdtstamps(
            time_coords.values,
            tz_str=tz,
        )
        # stat_coords = x.coords[self.STATION_DIMNAME]
        station_names = byte_stations_to_str(x[STATION_NAME_VARNAME].values)
        x = x.assign_coords(
            {TIME_DIMNAME: time_coords, self.STATION_DIMNAME: station_names},
        )

        self.data: xr.Dataset = x
    else:
        self.data: xr.Dataset = data

create_data_variables

create_data_variables(
    data_var_def: Dict[str, Dict[str, Any]]
) -> None

Create data variables in the data set.

var_defs_dict["variable_1"].keys() dict_keys(['name', 'longname', 'units', 'dim_type', 'missval', 'precision', 'attributes'])

Source code in src/efts_io/wrapper.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def create_data_variables(self, data_var_def: Dict[str, Dict[str, Any]]) -> None:
    """Create data variables in the data set.

    var_defs_dict["variable_1"].keys()
    dict_keys(['name', 'longname', 'units', 'dim_type', 'missval', 'precision', 'attributes'])
    """
    ens_fcast_data_var_def = [x for x in data_var_def.values() if x["dim_type"] == "4"]
    ens_data_var_def = [x for x in data_var_def.values() if x["dim_type"] == "3"]
    point_data_var_def = [x for x in data_var_def.values() if x["dim_type"] == "2"]

    four_dims_names = (LEAD_TIME_DIMNAME, STATION_DIMNAME, ENS_MEMBER_DIMNAME, TIME_DIMNAME)
    three_dims_names = (STATION_DIMNAME, ENS_MEMBER_DIMNAME, TIME_DIMNAME)
    two_dims_names = (STATION_DIMNAME, TIME_DIMNAME)

    four_dims_shape = tuple(self.data.sizes[dimname] for dimname in four_dims_names)
    three_dims_shape = tuple(self.data.sizes[dimname] for dimname in three_dims_names)
    two_dims_shape = tuple(self.data.sizes[dimname] for dimname in two_dims_names)
    for vardefs, dims_shape, dims_names in [
        (ens_fcast_data_var_def, four_dims_shape, four_dims_names),
        (ens_data_var_def, three_dims_shape, three_dims_names),
        (point_data_var_def, two_dims_shape, two_dims_names),
    ]:
        for x in vardefs:
            varname = x["name"]
            self.data[varname] = xr.DataArray(
                name=varname,
                data=nan_full(dims_shape),
                coords=self.data.coords,
                dims=dims_names,
                attrs={
                    "longname": x["longname"],
                    UNITS_ATTR_KEY: x[UNITS_ATTR_KEY],
                    "missval": x["missval"],
                    "precision": x["precision"],
                    **x["attributes"],
                },
            )

get_all_series

get_all_series(
    variable_name: str = "rain_obs",
    dimension_id: Optional[str] = None,
)

Return a multivariate time series, where each column is the series for one of the identifiers.

Source code in src/efts_io/wrapper.py
171
172
173
174
175
176
177
178
def get_all_series(
    self,
    variable_name: str = "rain_obs",
    dimension_id: Optional[str] = None,
):
    """Return a multivariate time series, where each column is the series for one of the identifiers."""
    # Return a multivariate time series, where each column is the series for one of the identifiers (self, e.g. rainfall station identifiers):
    return self.data[variable_name]

get_dim_names

get_dim_names() -> List[str]

Gets the name of all dimensions in the data set.

Source code in src/efts_io/wrapper.py
199
200
201
def get_dim_names(self) -> List[str]:
    """Gets the name of all dimensions in the data set."""
    return list(self.data.dims.keys())

get_ensemble_for_stations

get_ensemble_for_stations(
    variable_name: str = "rain_sim",
    identifier: Optional[str] = None,
    dimension_id: str = ENS_MEMBER_DIMNAME,
    start_time: Timestamp = None,
    lead_time_count: Optional[int] = None,
) -> DataArray

Not yet implemented.

Source code in src/efts_io/wrapper.py
203
204
205
206
207
208
209
210
211
212
213
def get_ensemble_for_stations(
    self,
    variable_name: str = "rain_sim",
    identifier: Optional[str] = None,
    dimension_id: str = ENS_MEMBER_DIMNAME,
    start_time: pd.Timestamp = None,
    lead_time_count: Optional[int] = None,
) -> xr.DataArray:
    """Not yet implemented."""
    # Return a time series, representing a single ensemble member forecast for all stations over the lead time
    raise NotImplementedError

get_ensemble_forecasts

get_ensemble_forecasts(
    variable_name: str = "rain_sim",
    identifier: Optional[str] = None,
    dimension_id: Optional[str] = None,
    start_time: Optional[Timestamp] = None,
    lead_time_count: Optional[int] = None,
) -> DataArray

Gets an ensemble forecast for a variable.

Source code in src/efts_io/wrapper.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def get_ensemble_forecasts(
    self,
    variable_name: str = "rain_sim",
    identifier: Optional[str] = None,
    dimension_id: Optional[str] = None,
    start_time: Optional[pd.Timestamp] = None,
    lead_time_count: Optional[int] = None,
) -> xr.DataArray:
    """Gets an ensemble forecast for a variable."""
    # Return a time series, ensemble of forecasts over the lead time
    if dimension_id is None:
        dimension_id = self.get_stations_varname()
    td = self.get_time_dim()
    if start_time is None:
        start_time = td[0]
    n_ens = self.get_ensemble_size()
    index_id = self.index_for_identifier(identifier, dimension_id)
    check_index_found(index_id, identifier, dimension_id)
    if lead_time_count is None:
        lead_time_count = self.get_lead_time_count()
    indx_time = self.index_for_time(start_time)
    # float rain_sim[lead_time,station,ens_member,time]
    ens_data = self.data.get(variable_name)[
        indx_time,
        :n_ens,
        index_id,
        :lead_time_count,
    ]
    # ensData = self.data.get(variable_name), start = [1, index_id, 1, indTime],
    #     count = c(lead_time_count, 1, nEns, 1), collapse_degen = FALSE)
    # tu = self.get_lead_time_unit()
    # if tu == "days":
    #     timeAxis = start_time + pd.Timedelta(ncfile$dim$lead_time$vals)
    # } else {
    # timeAxis = start_time + lubridate::dhours(1) * ncfile$dim$lead_time$vals
    # }
    # out = xts(x = ensData[, 1, , 1], order.by = timeAxis, tzone = tz(start_time))
    return ens_data

get_ensemble_forecasts_for_station

get_ensemble_forecasts_for_station(
    variable_name: str = "rain_sim",
    identifier: Optional[str] = None,
    dimension_id: Optional[str] = None,
)

Return an array, representing all ensemble member forecasts for a single stations over all lead times.

Source code in src/efts_io/wrapper.py
254
255
256
257
258
259
260
261
262
263
264
def get_ensemble_forecasts_for_station(
    self,
    variable_name: str = "rain_sim",
    identifier: Optional[str] = None,
    dimension_id: Optional[str] = None,
):
    """Return an array, representing all ensemble member forecasts for a single stations over all lead times."""
    # Return an array, representing all ensemble member forecasts for a single stations over all lead times
    if dimension_id is None:
        dimension_id = self.get_stations_varname()
    raise NotImplementedError

get_ensemble_series

get_ensemble_series(
    variable_name: str = "rain_ens",
    identifier: Optional[str] = None,
    dimension_id: Optional[str] = None,
)

Return an ensemble of point time series for a station identifier.

Source code in src/efts_io/wrapper.py
266
267
268
269
270
271
272
273
274
275
276
def get_ensemble_series(
    self,
    variable_name: str = "rain_ens",
    identifier: Optional[str] = None,
    dimension_id: Optional[str] = None,
):
    """Return an ensemble of point time series for a station identifier."""
    # Return an ensemble of point time series for a station identifier
    if dimension_id is None:
        dimension_id = self.get_stations_varname()
    raise NotImplementedError

get_ensemble_size

get_ensemble_size()

Return the length of the ensemble size dimension.

Source code in src/efts_io/wrapper.py
278
279
280
def get_ensemble_size(self):
    """Return the length of the ensemble size dimension."""
    return self.data.dims[self.ENS_MEMBER_DIMNAME]

get_lead_time_count

get_lead_time_count()

Length of the lead time dimension.

Source code in src/efts_io/wrapper.py
282
283
284
def get_lead_time_count(self):
    """Length of the lead time dimension."""
    return self.data.dims[self.LEAD_TIME_DIMNAME]

get_lead_time_values

get_lead_time_values()

Return the values of the lead time dimension.

Source code in src/efts_io/wrapper.py
286
287
288
def get_lead_time_values(self):
    """Return the values of the lead time dimension."""
    return self.data[self.LEAD_TIME_DIMNAME].values

get_single_series

get_single_series(
    variable_name: str = "rain_obs",
    identifier: Optional[str] = None,
    dimension_id: Optional[str] = None,
)

Return a single point time series for a station identifier.

Source code in src/efts_io/wrapper.py
294
295
296
297
298
299
300
301
302
303
304
def get_single_series(
    self,
    variable_name: str = "rain_obs",
    identifier: Optional[str] = None,
    dimension_id: Optional[str] = None,
):
    """Return a single point time series for a station identifier."""
    # Return a single point time series for a station identifier. Falls back on def get_all_series if the argument "identifier" is missing
    if dimension_id is None:
        dimension_id = self.get_stations_varname()
    return self.data[variable_name].sel({dimension_id: identifier})

get_station_count

get_station_count() -> int

Return the number of stations in the data set.

Source code in src/efts_io/wrapper.py
306
307
308
def get_station_count(self) -> int:
    """Return the number of stations in the data set."""
    self.data.dims[self.STATION_DIMNAME]

get_stations_varname

get_stations_varname() -> str

Return the name of the variable that has the station identifiers.

Source code in src/efts_io/wrapper.py
310
311
312
313
314
def get_stations_varname(self) -> str:
    """Return the name of the variable that has the station identifiers."""
    # Gets the name of the variable that has the station identifiers
    # TODO: station is integer normally in STF (Euargh)
    return STATION_ID_VARNAME

get_time_dim

get_time_dim()

Return the time dimension variable as a vector of date-time stamps.

Source code in src/efts_io/wrapper.py
316
317
318
319
def get_time_dim(self):
    """Return the time dimension variable as a vector of date-time stamps."""
    # Gets the time dimension variable as a vector of date-time stamps
    return self.data.time.values  # but loosing attributes.

get_time_unit

get_time_unit()

Return the time units of a read time series.

Source code in src/efts_io/wrapper.py
321
322
323
324
def get_time_unit(self):
    """Return the time units of a read time series."""
    # Gets the time units of a read time series, i.e. "hours since 2015-10-04 00:00:00 +1030". Returns the string "hours"
    return "dummy"

put_lead_time_values

put_lead_time_values(values)

Set the values of the lead time dimension.

Source code in src/efts_io/wrapper.py
290
291
292
def put_lead_time_values(self, values):
    """Set the values of the lead time dimension."""
    self.data[self.LEAD_TIME_DIMNAME].values = values

to_netcdf

to_netcdf(path: str, version: str = '2.0') -> None

Write the data set to a netCDF file.

Source code in src/efts_io/wrapper.py
127
128
129
130
131
def to_netcdf(self, path: str, version: str = "2.0") -> None:
    """Write the data set to a netCDF file."""
    if version != "2.0":
        raise ValueError("Only version 2.0 is supported for now")
    self.data.to_netcdf(path)

byte_to_string

byte_to_string(x: Union[int, bytes]) -> str

Convert a byte to a string.

Source code in src/efts_io/wrapper.py
42
43
44
45
46
47
48
49
50
def byte_to_string(x: Union[int, bytes]) -> str:
    """Convert a byte to a string."""
    if isinstance(x, int):
        if x > 255 or x < 0:
            raise ValueError("Integer value to bytes: must be in range [0-255]")
        x = x.to_bytes(1, "little")
    if not isinstance(x, bytes):
        raise TypeError(f"Cannot cast type {type(x)} to bytes")
    return str(x, encoding="UTF-8")