Coverage for src/efts_io/wrapper.py: 30.26%
253 statements
« prev ^ index » next coverage.py v7.6.1, created at 2025-07-24 10:14 +1000
« prev ^ index » next coverage.py v7.6.1, created at 2025-07-24 10:14 +1000
1"""A thin wrapper around xarray for reading and writing Ensemble Forecast Time Series (EFTS) data sets."""
3import os
4from datetime import datetime
5from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
7# import netCDF4
8import numpy as np
9import pandas as pd
10import xarray as xr
12from efts_io._ncdf_stf2 import StfDataType, StfVariable
13from efts_io.conventions import (
14 AREA_VARNAME,
15 AXIS_ATTR_KEY,
16 CATCHMENT_ATTR_KEY,
17 COMMENT_ATTR_KEY,
18 ENS_MEMBER_DIMNAME,
19 HISTORY_ATTR_KEY,
20 INSTITUTION_ATTR_KEY,
21 LAT_VARNAME,
22 LEAD_TIME_DIMNAME,
23 LON_VARNAME,
24 LONG_NAME_ATTR_KEY,
25 REALISATION_DIMNAME,
26 SOURCE_ATTR_KEY,
27 STANDARD_NAME_ATTR_KEY,
28 STATION_DIMNAME,
29 STATION_ID_DIMNAME,
30 STATION_ID_VARNAME,
31 STATION_NAME_VARNAME,
32 STF_2_0_URL,
33 STF_CONVENTION_VERSION_ATTR_KEY,
34 STF_NC_SPEC_ATTR_KEY,
35 TIME_DIMNAME,
36 TIME_STANDARD_ATTR_KEY,
37 TITLE_ATTR_KEY,
38 UNITS_ATTR_KEY,
39 ConvertibleToTimestamp,
40 check_index_found,
41)
42from efts_io.dimensions import cftimes_to_pdtstamps
43from efts_io.variables import create_efts_variables
46def byte_to_string(x: Union[int, bytes]) -> str:
47 """Convert a byte to a string."""
48 if isinstance(x, int):
49 if x > 255 or x < 0: # noqa: PLR2004
50 raise ValueError("Integer value to bytes: must be in range [0-255]")
51 x = x.to_bytes(1, "little")
52 if not isinstance(x, bytes):
53 raise TypeError(f"Cannot cast type {type(x)} to bytes")
54 return str(x, encoding="UTF-8")
57def byte_array_to_string(x: np.ndarray) -> str:
58 """Convert a byte array to a string."""
59 s = "".join([byte_to_string(s) for s in x])
60 return s.strip()
63def byte_stations_to_str(byte_names: np.ndarray) -> np.ndarray:
64 """Convert byte array of station names to string array."""
65 return np.array([byte_array_to_string(x) for x in byte_names])
68def _first_where(condition: np.ndarray) -> int:
69 """Return the first index where the condition is true."""
70 x = np.where(condition)[0]
71 if len(x) < 1:
72 raise ValueError("first_where: Invalid condition, no element is true")
73 return x[0]
75def load_from_stf2_file(file_path:str, time_zone_timestamps:bool) -> xr.Dataset : # noqa: FBT001
76 """Load data from an STF 2.0 netcdf file to an xarray representation.
78 Args:
79 file_path (str): file path
80 time_zone_timestamps (bool): should we try to recognise the time zone and include it in each xarray time stamp?
82 Returns:
83 _type_: xarray Dataset
84 """
85 from xarray.coding import times
86 # work around https://jira.csiro.au/browse/WIRADA-635
87 # lead_time can be a problem with xarray, so do not decode "times"
88 x = xr.open_dataset(file_path, decode_times=False)
90 # replace the time and station names coordinates values
91 # TODO This is probably not a long term solution for round-tripping a read/write or vice and versa
92 decod = times.CFDatetimeCoder(use_cftime=True)
93 var = xr.as_variable(x.coords[TIME_DIMNAME])
94 time_zone = var.attrs[TIME_STANDARD_ATTR_KEY]
95 time_coords = decod.decode(var, name=TIME_DIMNAME)
96 tz = time_zone if time_zone_timestamps else None
97 time_coords.values = cftimes_to_pdtstamps(
98 time_coords.values,
99 tz_str=tz,
100 )
101 # stat_coords = x.coords[self.STATION_DIMNAME]
102 # see the use of astype later on in variable transfer, following line not needed.
103 # station_names = byte_stations_to_str(x[STATION_NAME_VARNAME].values).astype(np.str_)
104 station_ids_strings = x[STATION_ID_VARNAME].values.astype(np.str_)
105 # x = x.assign_coords(
106 # {TIME_DIMNAME: time_coords, self.STATION_DIMNAME: station_names},
107 # )
109 # Create a new dataset with the desired structure
110 new_dataset = xr.Dataset(
111 coords={
112 REALISATION_DIMNAME: (REALISATION_DIMNAME, x[ENS_MEMBER_DIMNAME].values),
113 LEAD_TIME_DIMNAME: (LEAD_TIME_DIMNAME, x[LEAD_TIME_DIMNAME].values),
114 STATION_ID_DIMNAME: (STATION_ID_DIMNAME, station_ids_strings),
115 TIME_DIMNAME: (TIME_DIMNAME, time_coords),
116 },
117 attrs=x.attrs,
118 )
119 # Copy data variables from the renamed dataset
120 for var_name in x.data_vars:
121 if var_name not in (STATION_ID_VARNAME, STATION_NAME_VARNAME):
122 # Get the variable from the original dataset
123 orig_var = x[var_name]
124 # Determine the dimensions for the new variable
125 new_dims = []
126 for dim in orig_var.dims:
127 if dim == ENS_MEMBER_DIMNAME:
128 new_dims.append(REALISATION_DIMNAME)
129 elif dim == STATION_DIMNAME:
130 new_dims.append(STATION_ID_DIMNAME)
131 else:
132 new_dims.append(dim)
133 # Create a new DataArray with the correct dimensions
134 new_dataset[var_name] = xr.DataArray(
135 data=orig_var.values,
136 dims=new_dims,
137 coords={dim: new_dataset[dim] for dim in new_dims if dim in new_dataset.coords},
138 attrs=orig_var.attrs,
139 )
140 # Handle station names separately
141 station_names_var = x[STATION_NAME_VARNAME]
142 new_dataset[STATION_NAME_VARNAME] = xr.DataArray(
143 data=station_names_var.values.astype(np.str_),
144 dims=[STATION_ID_DIMNAME],
145 coords={STATION_ID_DIMNAME: new_dataset[STATION_ID_DIMNAME]},
146 attrs=station_names_var.attrs,
147 )
148 return new_dataset
150class EftsDataSet:
151 """Convenience class for access to a Ensemble Forecast Time Series in netCDF file."""
153 # Reference class convenient for access to a Ensemble Forecast Time Series in netCDF file.
154 # Description
155 # Reference class convenient for access to a Ensemble Forecast Time Series in netCDF file.
157 # Fields
158 # time_dim
159 # a cached POSIXct vector, the values for the time dimension of the data set.
161 # time_zone
162 # the time zone for the time dimensions of this data set.
164 # identifiers_dimensions
165 # a cache, list of values of the primary data identifiers; e.g. station_name or station_id
167 # stations_varname
168 # name of the variable that stores the names of the stations for this data set.
170 def __init__(self, data: Union[str, xr.Dataset]) -> None:
171 """Create a new EftsDataSet object."""
172 self.time_dim = None
173 self.time_zone = "UTC"
174 self.time_zone_timestamps = True # Not sure about https://github.com/csiro-hydroinformatics/efts-io/issues/3
175 self.STATION_DIMNAME = STATION_DIMNAME
176 self.stations_varname = STATION_ID_VARNAME
177 self.LEAD_TIME_DIMNAME = LEAD_TIME_DIMNAME
178 self.ENS_MEMBER_DIMNAME = ENS_MEMBER_DIMNAME
179 # self.identifiers_dimensions: list = []
180 self.data: xr.Dataset
181 if isinstance(data, str):
182 new_dataset = load_from_stf2_file(data, self.time_zone_timestamps)
183 self.data = new_dataset
184 else:
185 self.data = data
187 @property
188 def title(self) -> str:
189 """Get or set the title attribute of the dataset."""
190 return self.data.attrs.get(TITLE_ATTR_KEY, "")
192 @title.setter
193 def title(self, value: str) -> None:
194 """Get or set the title attribute of the dataset."""
195 self.data.attrs[TITLE_ATTR_KEY] = value
197 @property
198 def institution(self) -> str:
199 """Get or set the institution attribute of the dataset."""
200 return self.data.attrs.get(INSTITUTION_ATTR_KEY, "")
202 @institution.setter
203 def institution(self, value: str) -> None:
204 """Get or set the institution attribute of the dataset."""
205 self.data.attrs[INSTITUTION_ATTR_KEY] = value
207 @property
208 def source(self) -> str:
209 """Get or set the source attribute of the dataset."""
210 return self.data.attrs.get(SOURCE_ATTR_KEY, "")
212 @source.setter
213 def source(self, value: str) -> None:
214 """Get or set the source attribute of the dataset."""
215 self.data.attrs[SOURCE_ATTR_KEY] = value
217 @property
218 def catchment(self) -> str:
219 """Get or set the catchment attribute of the dataset."""
220 return self.data.attrs.get(CATCHMENT_ATTR_KEY, "")
222 @catchment.setter
223 def catchment(self, value: str) -> None:
224 """Get or set the catchment attribute of the dataset."""
225 self.data.attrs[CATCHMENT_ATTR_KEY] = value
227 @property
228 def stf_convention_version(self) -> float:
229 """Get or set the STF_convention_version attribute of the dataset."""
230 return self.data.attrs.get(STF_CONVENTION_VERSION_ATTR_KEY, "")
232 @stf_convention_version.setter
233 def stf_convention_version(self, value: float) -> None:
234 """Get or set the STF_convention_version attribute of the dataset."""
235 self.data.attrs[STF_CONVENTION_VERSION_ATTR_KEY] = value
237 @property
238 def stf_nc_spec(self) -> str:
239 """Get or set the STF_nc_spec attribute of the dataset."""
240 return self.data.attrs.get(STF_NC_SPEC_ATTR_KEY, "")
242 @stf_nc_spec.setter
243 def stf_nc_spec(self, value: str) -> None:
244 """Get or set the STF_nc_spec attribute of the dataset."""
245 self.data.attrs[STF_NC_SPEC_ATTR_KEY] = value
247 @property
248 def comment(self) -> str:
249 """Get or set the comment attribute of the dataset."""
250 return self.data.attrs.get(COMMENT_ATTR_KEY, "")
252 @comment.setter
253 def comment(self, value: str) -> None:
254 """Get or set the comment attribute of the dataset."""
255 self.data.attrs[COMMENT_ATTR_KEY] = value
257 @property
258 def history(self) -> str:
259 """Gets/sets the history attribute of the dataset."""
260 return self.data.attrs.get(HISTORY_ATTR_KEY, "")
262 @history.setter
263 def history(self, value: str) -> None:
264 """Gets/sets the history attribute of the dataset."""
265 self.data.attrs[HISTORY_ATTR_KEY] = value
267 def append_history(self, message: str, timestamp: Optional[datetime] = None) -> None:
268 """Append a new entry to the `history` attribute with a timestamp.
270 message: The message to append.
271 timestamp: If not provided, the current UTC time is used.
272 """
273 if timestamp is None:
274 timestamp = datetime.now(datetime.timezone.utc).isoformat()
276 current_history = self.data.attrs.get(HISTORY_ATTR_KEY, "")
277 if current_history:
278 self.data.attrs[HISTORY_ATTR_KEY] = f"{current_history}\n{timestamp} - {message}"
279 else:
280 self.data.attrs[HISTORY_ATTR_KEY] = f"{timestamp} - {message}"
282 def to_netcdf(self, path: str, version: Optional[str] = "2.0") -> None:
283 """Write the data set to a netCDF file."""
284 if version is None:
285 self.data.to_netcdf(path)
286 elif version == "2.0":
287 self.save_to_stf2(path)
288 else:
289 raise ValueError("Only version 2.0 is supported for now")
291 def set_mandatory_global_attributes(
292 self,
293 title: str = "not provided",
294 institution: str = "not provided",
295 catchment: str = "not provided",
296 source: str = "not provided",
297 comment: str = "not provided",
298 history: str = "not provided",
299 append_history: bool = False, # noqa: FBT001, FBT002
300 ) -> None:
301 """Sets mandatory global attributes for an EFTS dataset."""
302 self.title = title
303 self.institution = institution
304 self.catchment = catchment
305 self.source = source
306 self.comment = comment
307 if append_history:
308 self.append_history(history)
309 else:
310 self.history = history
311 self.stf_convention_version = "2.0"
312 self.stf_nc_spec = STF_2_0_URL
314 def writeable_to_stf2(self) -> bool:
315 """Check if the dataset can be written to a netCDF file compliant with STF 2.0 specification.
317 This method checks if the underlying xarray dataset or dataarray has the required dimensions and global attributes as specified by the STF 2.0 convention.
319 Returns:
320 bool: True if the dataset can be written to a STF 2.0 compliant netCDF file, False otherwise.
321 """
322 from efts_io.conventions import exportable_to_stf2
323 return exportable_to_stf2(self.data)
325 def save_to_stf2(
326 self,
327 path: str,
328 variable_name: Optional[str] = None,
329 var_type: StfVariable = StfVariable.STREAMFLOW,
330 data_type: StfDataType = StfDataType.OBSERVED,
331 ens: bool = False, # noqa: FBT001, FBT002
332 timestep:str="days",
333 data_qual: Optional[xr.DataArray] = None,
334 ) -> None:
335 """Save to file."""
336 from efts_io._ncdf_stf2 import write_nc_stf2
337 if isinstance(self.data, xr.Dataset):
338 if variable_name is None:
339 raise ValueError("Inner data is a DataSet, so an explicit variable name must be explicitely specified.")
340 d = self.data[variable_name]
341 #elif isinstance(self.data, xr.DataArray):
342 # d = self.data
343 else:
344 raise TypeError(f"Unsupported data type {type(self.data)}")
345 write_nc_stf2(
346 out_nc_file=path, # : str,
347 dataset=self.data,
348 data=d, # : xr.DataArray,
349 var_type=var_type, # : int = 1,
350 data_type=data_type, # : int = 3,
351 stf_nc_vers = 2, # : int = 2,
352 ens=ens, # : bool = False,
353 timestep=timestep, # :str="days",
354 data_qual=data_qual, # : Optional[xr.DataArray] = None,
355 overwrite=True, # :bool=True,
356 # loc_info=loc_info, # : Optional[Dict[str, Any]] = None,
357 )
359 def create_data_variables(self, data_var_def: Dict[str, Dict[str, Any]]) -> None:
360 """Create data variables in the data set.
362 var_defs_dict["variable_1"].keys()
363 dict_keys(['name', 'longname', 'units', 'dim_type', 'missval', 'precision', 'attributes'])
364 """
365 ens_fcast_data_var_def = [x for x in data_var_def.values() if x["dim_type"] == "4"]
366 ens_data_var_def = [x for x in data_var_def.values() if x["dim_type"] == "3"]
367 point_data_var_def = [x for x in data_var_def.values() if x["dim_type"] == "2"]
369 four_dims_names = (LEAD_TIME_DIMNAME, STATION_DIMNAME, ENS_MEMBER_DIMNAME, TIME_DIMNAME)
370 three_dims_names = (STATION_DIMNAME, ENS_MEMBER_DIMNAME, TIME_DIMNAME)
371 two_dims_names = (STATION_DIMNAME, TIME_DIMNAME)
373 four_dims_shape = tuple(self.data.sizes[dimname] for dimname in four_dims_names)
374 three_dims_shape = tuple(self.data.sizes[dimname] for dimname in three_dims_names)
375 two_dims_shape = tuple(self.data.sizes[dimname] for dimname in two_dims_names)
376 for vardefs, dims_shape, dims_names in [
377 (ens_fcast_data_var_def, four_dims_shape, four_dims_names),
378 (ens_data_var_def, three_dims_shape, three_dims_names),
379 (point_data_var_def, two_dims_shape, two_dims_names),
380 ]:
381 for x in vardefs:
382 varname = x["name"]
383 self.data[varname] = xr.DataArray(
384 name=varname,
385 data=nan_full(dims_shape),
386 coords=self.data.coords,
387 dims=dims_names,
388 attrs={
389 "longname": x["longname"],
390 UNITS_ATTR_KEY: x[UNITS_ATTR_KEY],
391 "missval": x["missval"],
392 "precision": x["precision"],
393 **x["attributes"],
394 },
395 )
397 def get_all_series(
398 self,
399 variable_name: str = "rain_obs",
400 dimension_id: Optional[str] = None, # noqa: ARG002
401 ) -> xr.DataArray:
402 """Return a multivariate time series, where each column is the series for one of the identifiers."""
403 # Return a multivariate time series, where each column is the series for one of the identifiers (self, e.g. rainfall station identifiers):
404 return self.data[variable_name]
405 # stopifnot(variable_name %in% names(ncfile$var))
406 # td = self.get_time_dim()
407 # if dimension_id is None: dimension_id = self.get_stations_varname()
408 # identifiers = self._get_values(dimension_id)
409 # ncdims = self.get_variable_dim_names(variable_name)
410 # could be e.g.: double q_obs[lead_time,station,ens_member,time] float
411 # rain_obs[station,time] lead_time,station,ens_member,time reordered
412 # according to the variable present dimensions:
413 # tsstart = splice_named_var(c(1, 1, 1, 1), ncdims)
414 # tscount = splice_named_var(c(1, length(identifiers), 1, length(td)), ncdims)
415 # rawData = ncdf4::ncvar_get(ncfile, variable_name, start = tsstart, count = tscount,
416 # collapse_degen = FALSE)
417 # dim_names(rawData) = ncdims
418 # # [station,time] to [time, station] for xts creation
419 # # NOTE: why can this not be dimension_id instead of STATION_DIMNAME?
420 # tsData = reduce_dimensions(rawData,c(TIME_DIMNAME, STATION_DIMNAME))
421 # v = xts(x = tsData, order.by = td, tzone = tz(td))
422 # colnames(v) = identifiers
423 # return(v)
425 def get_dim_names(self) -> List[str]:
426 """Gets the name of all dimensions in the data set."""
427 return [x for x in self.data.sizes.keys()] # noqa: C416, SIM118
428 # Note: self._dim_size will return a list of str in the future
429 # return [x for x in self._dim_size.keys()]
431 def get_ensemble_for_stations(
432 self,
433 variable_name: str = "rain_sim",
434 identifier: Optional[str] = None,
435 dimension_id: str = ENS_MEMBER_DIMNAME,
436 start_time: pd.Timestamp = None,
437 lead_time_count: Optional[int] = None,
438 ) -> xr.DataArray:
439 """Not yet implemented."""
440 # Return a time series, representing a single ensemble member forecast for all stations over the lead time
441 raise NotImplementedError
443 def get_ensemble_forecasts(
444 self,
445 variable_name: str = "rain_sim",
446 identifier: Optional[str] = None,
447 dimension_id: Optional[str] = None,
448 start_time: Optional[pd.Timestamp] = None,
449 lead_time_count: Optional[int] = None,
450 ) -> xr.DataArray:
451 """Gets an ensemble forecast for a variable."""
452 # Return a time series, ensemble of forecasts over the lead time
453 if dimension_id is None:
454 dimension_id = self.get_stations_varname()
455 td = self.get_time_dim()
456 if start_time is None:
457 start_time = td[0]
458 n_ens = self.get_ensemble_size()
459 raise NotImplementedError(
460 "get_ensemble_forecasts: not yet implemented",
461 )
462 index_id = self.index_for_identifier(identifier, dimension_id)
463 check_index_found(index_id, identifier, dimension_id)
464 if lead_time_count is None:
465 lead_time_count = self.get_lead_time_count()
466 indx_time = self.index_for_time(start_time)
467 # float rain_sim[lead_time,station,ens_member,time]
468 ens_data = self.data.get(variable_name)[
469 indx_time,
470 :n_ens,
471 index_id,
472 :lead_time_count,
473 ]
474 # ensData = self.data.get(variable_name), start = [1, index_id, 1, indTime],
475 # count = c(lead_time_count, 1, nEns, 1), collapse_degen = FALSE)
476 # tu = self.get_lead_time_unit()
477 # if tu == "days":
478 # timeAxis = start_time + pd.Timedelta(ncfile$dim$lead_time$vals)
479 # } else {
480 # timeAxis = start_time + lubridate::dhours(1) * ncfile$dim$lead_time$vals
481 # }
482 # out = xts(x = ensData[, 1, , 1], order.by = timeAxis, tzone = tz(start_time))
483 return ens_data # noqa: RET504
485 # def get_ensemble_forecasts_for_station(
486 # self,
487 # variable_name: str = "rain_sim",
488 # identifier: Optional[str] = None,
489 # dimension_id: Optional[str] = None,
490 # ):
491 # """Return an array, representing all ensemble member forecasts for a single stations over all lead times."""
492 # if dimension_id is None:
493 # dimension_id = self.get_stations_varname()
494 # raise NotImplementedError
496 # def get_ensemble_series(
497 # self,
498 # variable_name: str = "rain_ens",
499 # identifier: Optional[str] = None,
500 # dimension_id: Optional[str] = None,
501 # ):
502 # """Return an ensemble of point time series for a station identifier."""
503 # # Return an ensemble of point time series for a station identifier
504 # if dimension_id is None:
505 # dimension_id = self.get_stations_varname()
506 # raise NotImplementedError
508 def _dim_size(self, dimname:str) -> int:
509 return self.data.sizes[dimname]
511 def get_ensemble_size(self) -> int:
512 """Return the length of the ensemble size dimension."""
513 return self._dim_size(self.ENS_MEMBER_DIMNAME)
515 def get_lead_time_count(self) -> int:
516 """Length of the lead time dimension."""
517 return self._dim_size(self.LEAD_TIME_DIMNAME)
519 def get_lead_time_values(self) -> np.ndarray:
520 """Return the values of the lead time dimension."""
521 return self.data[self.LEAD_TIME_DIMNAME].values
523 def put_lead_time_values(self, values:Iterable[float]) -> None:
524 """Set the values of the lead time dimension."""
525 self.data[self.LEAD_TIME_DIMNAME].values = np.array(values)
527 def get_single_series(
528 self,
529 variable_name: str = "rain_obs",
530 identifier: Optional[str] = None,
531 dimension_id: Optional[str] = None,
532 ) -> xr.DataArray:
533 """Return a single point time series for a station identifier."""
534 # Return a single point time series for a station identifier. Falls back on def get_all_series if the argument "identifier" is missing
535 if dimension_id is None:
536 dimension_id = self.get_stations_varname()
537 return self.data[variable_name].sel({dimension_id: identifier})
539 def get_station_count(self) -> int:
540 """Return the number of stations in the data set."""
541 self._dim_size(self.STATION_DIMNAME)
543 def get_stations_varname(self) -> str:
544 """Return the name of the variable that has the station identifiers."""
545 # Gets the name of the variable that has the station identifiers
546 # TODO: station is integer normally in STF (Euargh)
547 return STATION_ID_VARNAME
549 def get_time_dim(self) -> np.ndarray:
550 """Return the time dimension variable as a vector of date-time stamps."""
551 # Gets the time dimension variable as a vector of date-time stamps
552 return self.data.time.values # but loosing attributes.
554 # def get_time_unit(self) -> str:
555 # """Return the time units of a read time series."""
556 # # Gets the time units of a read time series, i.e. "hours since 2015-10-04 00:00:00 +1030". Returns the string "hours"
557 # return "dummy"
559 # def get_time_zone(self) -> str:
560 # # Gets the time zone to use for the read time series
561 # return "dummy"
563 # def get_utc_offset(self, as_string: bool = True):
564 # # Gets the time zone to use for the read time series, i.e. "hours since 2015-10-04 00:00:00 +1030". Returns the string "+1030" or "-0845" if as_string is TRUE, or a lubridate Duration object if FALSE
565 # return None
567 # def _get_values(self, variable_name: str):
568 # # Gets (and cache in memory) all the values in a variable. Should be used only for dimension variables
569 # from efts_io.conventions import conventional_varnames
571 # if variable_name not in conventional_varnames:
572 # raise ValueError(
573 # variable_name + " cannot be directly retrieved. Must be in " + ", ".join(conventional_varnames),
574 # )
575 # return self.data[variable_name].values
577 # def get_variable_dim_names(self, variable_name):
578 # # Gets the names of the dimensions that define the geometry of a given variable
579 # return [x for x in self.data[[variable_name]].coords.keys()]
581 # def get_variable_names(self):
582 # # Gets the name of all variables in the data set
583 # return [x for x in self.data.variables.keys()]
585 # def index_for_identifier(self, identifier, dimension_id=None):
586 # # Gets the index at which an identifier is found in a dimension variable
587 # if dimension_id is None:
588 # dimension_id = self.get_stations_varname()
589 # identValues = self._get_values(dimension_id)
590 # if identifier is None:
591 # raise Exception("Identifier cannot be NA")
592 # return _first_where(identifier == identValues)
594 # def index_for_time(self, dateTime):
595 # # Gets the index at which a date-time is found in the main time axis of this data set
596 # return _first_where(self.data.time == dateTime)
598 # def put_ensemble_forecasts(
599 # self,
600 # x,
601 # variable_name="rain_sim",
602 # identifier: str = None,
603 # dimension_id=None,
604 # start_time=None,
605 # ):
606 # # Puts one or more ensemble forecast into a netCDF file
607 # if dimension_id is None:
608 # dimension_id = self.get_stations_varname()
609 # raise NotImplementedError
611 # def put_ensemble_forecasts_for_station(
612 # self,
613 # x,
614 # variable_name="rain_sim",
615 # identifier: str = None,
616 # dimension_id=ENS_MEMBER_DIMNAME,
617 # start_time=None,
618 # ):
619 # # Puts a single ensemble member forecasts for all stations into a netCDF file
620 # raise NotImplementedError
622 # def put_ensemble_series(
623 # self,
624 # x,
625 # variable_name="rain_ens",
626 # identifier: str = None,
627 # dimension_id=None,
628 # ):
629 # # Puts an ensemble of time series, e.g. replicate rainfall series
630 # if dimension_id is None:
631 # dimension_id = self.get_stations_varname()
632 # raise NotImplementedError
634 # def put_single_series(
635 # self,
636 # x,
637 # variable_name="rain_obs",
638 # identifier: str = None,
639 # dimension_id=None,
640 # start_time=None,
641 # ):
642 # # Puts a time series, or part thereof
643 # if dimension_id is None:
644 # dimension_id = self.get_stations_varname()
645 # raise NotImplementedError
647 # def put_values(self, x, variable_name):
648 # # Puts all the values in a variable. Should be used only for dimension variables
649 # raise NotImplementedError
651 # def set_time_zone(self, tzone_id):
652 # # Sets the time zone to use for the read time series
653 # raise NotImplementedError
655 # def summary(self):
656 # # Print a summary of this EFTS netCDF file
657 # raise NotImplementedError
659 # See Also
660 # See create_efts and open_efts for examples on how to read or write EFTS netCDF files using this dataset.
663#' Creates a EftsDataSet for access to a netCDF EFTS data set
664#'
665#' Creates a EftsDataSet for access to a netCDF EFTS data set
666#'
667#' @param ncfile name of the netCDF file, or an object of class 'ncdf4'
668#' @param writein if TRUE the data set is opened in write mode
669#' @export
670#' @import ncdf4
671#' @examples
672#' library(efts)
673#' ext_data = system.file('extdata', package='efts')
674#' ens_fcast_file = file.path(ext_data, 'Upper_Murray_sample_ensemble_rain_fcast.nc')
675#' stopifnot(file.exists(ens_fcast_file))
676#' snc = open_efts(ens_fcast_file)
677#' (variable_names = snc$get_variable_names())
678#' (stations_ids = snc$get_values(STATION_ID_DIMNAME))
679#' nEns = snc$get_ensemble_size()
680#' nLead = snc$get_lead_time_count()
681#' td = snc$get_time_dim()
682#' stopifnot('rain_fcast_ens' %in% variable_names)
683#'
684#' ens_fcast_rainfall = snc$get_ensemble_forecasts('rain_fcast_ens',
685#' stations_ids[1], start_time=td[2])
686#' names(ens_fcast_rainfall) = as.character(1:ncol(ens_fcast_rainfall))
687#' plot(ens_fcast_rainfall, legend.loc='right')
688#'
689#' snc$close()
690#'
691#' @return A EftsDataSet object
692#' @importFrom methods is
693def open_efts(ncfile:Any, writein:bool=False) -> EftsDataSet: # noqa: ARG001, FBT001, FBT002
694 """Open an EFTS NetCDF file."""
695 # raise NotImplemented("open_efts")
696 # if isinstance(ncfile, str):
697 # nc = ncdf4::nc_open(ncfile, readunlim = FALSE, write = writein)
698 # } else if (methods::is(ncfile, "ncdf4")) {
699 # nc = ncfile
700 # }
701 return EftsDataSet(ncfile)
704def nan_full(shape: Union[Tuple, int]) -> np.ndarray:
705 """Create a full array of NaNs with the given shape."""
706 if isinstance(shape, int):
707 shape = (shape,)
708 return np.full(shape=shape, fill_value=np.nan)
711def xr_efts(
712 issue_times: Iterable[ConvertibleToTimestamp],
713 station_ids: Iterable[str],
714 lead_times: Optional[Iterable[int]] = None,
715 lead_time_tstep: str = "hours",
716 ensemble_size: int = 1,
717 # variables
718 station_names: Optional[Iterable[str]] = None,
719 latitudes: Optional[Iterable[float]] = None,
720 longitudes: Optional[Iterable[float]] = None,
721 areas: Optional[Iterable[float]] = None,
722 nc_attributes: Optional[Dict[str, str]] = None,
723) -> xr.Dataset:
724 """Create an xarray Dataset for EFTS data."""
725 if lead_times is None:
726 lead_times = [0]
727 coords = {
728 TIME_DIMNAME: issue_times,
729 STATION_DIMNAME: np.arange(start=1, stop=len(station_ids) + 1, step=1),
730 ENS_MEMBER_DIMNAME: np.arange(start=1, stop=ensemble_size + 1, step=1),
731 LEAD_TIME_DIMNAME: lead_times,
732 # New coordinate can also be attached to an existing dimension:
733 # https://docs.xarray.dev/en/latest/generated/xarray.DataArray.assign_coords.html#xarray.DataArray.assign_coords
734 STATION_ID_VARNAME: (STATION_DIMNAME, station_ids),
735 }
736 n_stations = len(station_ids)
737 latitudes = latitudes if latitudes is not None else nan_full(n_stations)
738 longitudes = longitudes if longitudes is not None else nan_full(n_stations)
739 areas = areas if areas is not None else nan_full(n_stations)
740 station_names = station_names if station_names is not None else [f"{i}" for i in station_ids]
741 data_vars = {
742 STATION_NAME_VARNAME: (STATION_DIMNAME, station_names),
743 LAT_VARNAME: (STATION_DIMNAME, latitudes),
744 LON_VARNAME: (STATION_DIMNAME, longitudes),
745 AREA_VARNAME: (STATION_DIMNAME, areas),
746 }
747 nc_attributes = nc_attributes or _stf2_mandatory_global_attributes()
748 d = xr.Dataset(
749 data_vars=data_vars,
750 coords=coords,
751 attrs=nc_attributes,
752 )
753 # Credits to the work reported in https://github.com/pydata/xarray/issues/2028#issuecomment-1265252754
754 d = d.set_xindex(STATION_ID_VARNAME)
755 d.time.attrs = {
756 STANDARD_NAME_ATTR_KEY: TIME_DIMNAME,
757 LONG_NAME_ATTR_KEY: TIME_DIMNAME,
758 # TIME_STANDARD_KEY: "UTC",
759 AXIS_ATTR_KEY: "t",
760 # UNITS_ATTR_KEY: "days since 2000-11-14 23:00:00.0 +0000",
761 }
762 d.lead_time.attrs = {
763 STANDARD_NAME_ATTR_KEY: "lead time",
764 LONG_NAME_ATTR_KEY: "forecast lead time",
765 AXIS_ATTR_KEY: "v",
766 UNITS_ATTR_KEY: f"{lead_time_tstep} since time",
767 }
768 d.ens_member.attrs = {
769 STANDARD_NAME_ATTR_KEY: ENS_MEMBER_DIMNAME,
770 LONG_NAME_ATTR_KEY: "ensemble member",
771 UNITS_ATTR_KEY: "member id",
772 AXIS_ATTR_KEY: "u",
773 }
774 d.station_id.attrs = {LONG_NAME_ATTR_KEY: "station or node identification code"}
775 d.station_name.attrs = {LONG_NAME_ATTR_KEY: "station or node name"}
776 d.lat.attrs = {LONG_NAME_ATTR_KEY: "latitude", UNITS_ATTR_KEY: "degrees_north", AXIS_ATTR_KEY: "y"}
777 d.lon.attrs = {LONG_NAME_ATTR_KEY: "longitude", UNITS_ATTR_KEY: "degrees_east", AXIS_ATTR_KEY: "x"}
778 d.area.attrs = {
779 LONG_NAME_ATTR_KEY: "station area",
780 UNITS_ATTR_KEY: "km^2",
781 STANDARD_NAME_ATTR_KEY: AREA_VARNAME,
782 }
783 return d
786def _stf2_mandatory_global_attributes(
787 title: str = "not provided",
788 institution: str = "not provided",
789 catchment: str = "not provided",
790 source: str = "not provided",
791 comment: str = "not provided",
792 history: str = "not provided",
793) -> Dict[str, str]:
794 """Create a dictionary of mandatory global attributes for an EFTS dataset."""
795 return {
796 TITLE_ATTR_KEY: title,
797 INSTITUTION_ATTR_KEY: institution,
798 CATCHMENT_ATTR_KEY: catchment,
799 SOURCE_ATTR_KEY: source,
800 COMMENT_ATTR_KEY: comment,
801 HISTORY_ATTR_KEY: history,
802 STF_CONVENTION_VERSION_ATTR_KEY: "2.0",
803 STF_NC_SPEC_ATTR_KEY: STF_2_0_URL,
804 }
807#' Creates a EftsDataSet for write access to a netCDF EFTS data set
808#'
809#' Creates a EftsDataSet for write access to a netCDF EFTS data set
810#'
811#' @param fname file name to create to. The file must not exist already.
812#' @param time_dim_info a list with the units and values defining the time dimension of the data set
813#' @param data_var_definitions a data frame, acceptable by \code{\link{create_variable_definitions}}, or list of netCDF variable definitions, e.g.
814#' \code{list(rain_sim=list(name='rain_sim', longname='ECMWF Rainfall ensemble forecasts', units='mm', missval=-9999.0, precision='double', attributes=list(type=2, type_description='accumulated over the preceding interval')))}
815#' @param stations_ids station identifiers, coercible to an integer vector (note: may change to be a more flexible character storage)
816#' @param station_names optional; names of the stations
817#' @param nc_attributes a named list of characters, attributes for the whole file,
818#' including mandatory ones: title, institution, source, catchment, comment.
819#' You may use \code{\link{create_global_attributes}} as a starting template.
820#' @param lead_length length of the lead forecasting time series.
821#' @param optional_vars a data frame defining optional netCDF variables. For a templated default see
822#' \code{\link{default_optional_variable_definitions_v2_0}} and
823#' \url{https://github.com/jmp75/efts/blob/107c553045a37e6ef36b2eababf6a299e7883d50/docs/netcdf_for_water_forecasting.md#optional-variables}
824#' @param lead_time_tstep string specifying the time step of the forecast lead length.
825#' @param ensemble_length number of ensembles, i.e. number of forecasts for each point on the main time axis of the data set
826#' @examples
827#'
828#' # NOTE
829#' # The sample code below is purposely generic; to produce
830#' # a data set conforming with the conventions devised for
831#' # ensemble streamflow forecast you will need to
832#' # follow the additional guidelines at
833#' # https://github.com/jmp75/efts/blob/master/docs/netcdf_for_water_forecasting.md
834#'
835#' fname = tempfile()
836#'
837#' stations_ids = c(123,456)
838#' nEns = 3
839#' nLead = 4
840#' nTimeSteps = 12
841#'
842#' timeAxisStart = ISOdate(year=2010, month=08, day=01, hour = 14, min = 0, sec = 0, tz = 'UTC')
843#' time_dim_info = create_time_info(from=timeAxisStart,
844#' n=nTimeSteps, time_step = "hours since")
845#'
846#' # It is possible to define variables for three combinations of dimensions.
847#' # dimensions '4' ==> [lead_time,station,ens_member,time]
848#' # dimensions '3' ==> [station,ens_member,time]
849#' # dimensions '2' ==> [station,time]
850#'
851#' variable_names = c('var1_fcast_ens','var2_fcast_ens', 'var1_obs',
852#' 'var2_obs', 'var1_ens','var2_ens')
853#'
854#' va = create_var_attribute_definition(
855#' type = 2L,
856#' type_description = "accumulated over the preceding interval",
857#' dat_type = "der",
858#' dat_type_description = paste(rep(c("var1", "var2"), 3), "synthetic test data"),
859#' location_type = "Point")
860#'
861#'
862#' (varDef = create_variable_definition_dataframe(
863#' variable_names=variable_names,
864#' long_names = paste(variable_names, 'synthetic data'),
865#' dimensions = c(4L,4L,2L,2L,3L,3L),
866#' var_attributes = va))
867#'
868#' glob_attr = create_global_attributes(
869#' title="data set title",
870#' institution="my org",
871#' catchment="Upper_Murray",
872#' source="A journal reference, URL",
873#' comment="example for vignette")
874#'
875#' (opt_metadatavars = default_optional_variable_definitions_v2_0())
876#'
877#' snc = create_efts(
878#' fname=fname,
879#' time_dim_info=time_dim_info,
880#' data_var_definitions=varDef,
881#' stations_ids=stations_ids,
882#' nc_attributes=glob_attr,
883#' optional_vars = opt_metadatavars,
884#' lead_length=nLead,
885#' ensemble_length=nEns,
886#' lead_time_tstep = "hours")
887#'
888#' # Following is code that was used to create unit tests for EFTS.
889#' # This is kept in this example to provide sample on now to write data of various dimension.
890#' td = snc$get_time_dim()
891#' m = matrix(ncol=nEns, nrow=nLead)
892#' for (rnum in 1:nLead) {
893#' for (cnum in 1:nEns) {
894#' m[rnum,cnum] = rnum*0.01 + cnum*0.1
895#' }
896#' }
897#' # [,1] [,2] [,3]
898#' # [1,] 0.11 0.21 0.31
899#' # [2,] 0.12 0.22 0.32
900#' # [3,] 0.13 0.23 0.33
901#' # [4,] 0.14 0.24 0.34
902#' for (i in 1:length(td)) {
903#' for (j in 1:length(stations_ids)) {
904#' station = stations_ids[j]
905#' var1Values = i + 0.1*j + m
906#' var2Values = 2*var1Values
907#' dtime = td[i]
908#' snc$put_ensemble_forecasts(var1Values, variable_name = variable_names[1],
909#' identifier = station, start_time = dtime)
910#' snc$put_ensemble_forecasts(var2Values, variable_name = variable_names[2],
911#' identifier = station, start_time = dtime)
912#' }
913#' }
914#'
915#' timeSteps = 1:length(td)
916#' for (j in 1:length(stations_ids)) {
917#' var3Values = timeSteps + 0.1*j
918#' var4Values = var3Values + 0.01*timeSteps + 0.001*j
919#'
920#' station = stations_ids[j]
921#' snc$put_single_series(var3Values, variable_name = variable_names[3], identifier = station)
922#' snc$put_single_series(var4Values, variable_name = variable_names[4], identifier = station)
923#' }
924#'
925#' for (j in 1:length(stations_ids)) {
926#'
927#' var5Xts = matrix(rep(1:nEns, each=nTimeSteps) + timeSteps + 0.1*j, ncol=nEns)
928#'
929#' # [time,ens_member] to [ens_member,time], as expected by put_ensemble_series
930#' var5Values = t(var5Xts)
931#' var6Values = 0.25 * var5Values
932#'
933#' station = stations_ids[j]
934#' snc$put_ensemble_series(var5Values, variable_name = variable_names[5], identifier = station)
935#' snc$put_ensemble_series(var6Values, variable_name = variable_names[6], identifier = station)
936#' }
937#'
938#' # We can get/put values for some metadata variables:
939#' snc$get_values("x")
940#' snc$put_values(c(1.1, 2.2), "x")
941#' snc$put_values(letters[1:2], STATION_NAME_VARNAME)
942#'
943#' # Direct get/set access to data variables, however, is prevented;
944#' # the following would thus cause an error:
945#' # snc$get_values("var1_fcast_ens")
946#'
947#' snc$close()
948#' # Cleaning up temp file:
949#' if (file.exists(fname))
950#' file.remove(fname)
951#'
952#'
953#'
954#' @export
955#' @import ncdf4
956#' @importFrom utils packageDescription
957#' @importFrom methods new
958#' @return A EftsDataSet object
959def create_efts(
960 fname: str,
961 time_dim_info: Dict,
962 data_var_definitions: List[Dict[str, Any]],
963 stations_ids: List[int],
964 station_names: Optional[List[str]] = None, # noqa: ARG001
965 nc_attributes: Optional[Dict[str, str]] = None,
966 optional_vars:Optional[dict[str,Any]]=None,
967 lead_length:int=48,
968 ensemble_length:int=50,
969 lead_time_tstep:str="hours",
970) -> EftsDataSet:
971 """Create a new EFTS dataset."""
972 import xarray as xr
974 from efts_io.conventions import mandatory_global_attributes
976 if stations_ids is None:
977 raise ValueError(
978 "You must provide station identifiers when creating a new EFTS netCDF data set",
979 )
982 if nc_attributes is None:
983 raise ValueError(
984 "You must provide a suitable list for nc_attributes, including" + ", ".join(mandatory_global_attributes),
985 )
987 # check_global_attributes(nc_attributes)
989 if os.path.exists(fname):
990 raise FileExistsError("File already exists: " + fname)
992 if isinstance(data_var_definitions, pd.DataFrame):
993 raise TypeError(
994 "data_var_definitions should be a list of dictionaries, not a pandas DataFrame",
995 )
997 var_defs = create_efts_variables(
998 data_var_definitions,
999 time_dim_info,
1000 num_stations=len(stations_ids),
1001 lead_length=lead_length,
1002 ensemble_length=ensemble_length,
1003 optional_vars=optional_vars,
1004 lead_time_tstep=lead_time_tstep,
1005 )
1007 ## attributes for dimensions variables
1008 def add_dim_attribute(v: xr.Variable, dimname: str, attr_key: str, attr_value: str) -> None:
1009 pass
1011 add_dim_attribute(var_defs, TIME_DIMNAME, STANDARD_NAME_ATTR_KEY, TIME_DIMNAME)
1012 add_dim_attribute(var_defs, TIME_DIMNAME, TIME_STANDARD_ATTR_KEY, "UTC")
1013 add_dim_attribute(var_defs, TIME_DIMNAME, AXIS_ATTR_KEY, "t")
1014 add_dim_attribute(var_defs, ENS_MEMBER_DIMNAME, STANDARD_NAME_ATTR_KEY, ENS_MEMBER_DIMNAME)
1015 add_dim_attribute(var_defs, ENS_MEMBER_DIMNAME, AXIS_ATTR_KEY, "u")
1016 add_dim_attribute(var_defs, LEAD_TIME_DIMNAME, STANDARD_NAME_ATTR_KEY, LEAD_TIME_DIMNAME)
1017 add_dim_attribute(var_defs, LEAD_TIME_DIMNAME, AXIS_ATTR_KEY, "v")
1018 add_dim_attribute(var_defs, LAT_VARNAME, AXIS_ATTR_KEY, "y")
1019 add_dim_attribute(var_defs, LON_VARNAME, AXIS_ATTR_KEY, "x")
1021 d = xr.Dataset(
1022 data_vars=var_defs["datavars"],
1023 coords=var_defs["metadatavars"],
1024 attrs={"description": "TODO: put the right attributes"},
1025 )
1027 ## Determine if there is real value in a tryCatch. What is the point if we cannot close/delete the file.
1028 # nc = tryCatch(
1029 # createSchema(fname, varDefs, data_var_definitions, nc_attributes, optional_vars,
1030 # stations_ids, lead_length, ensemble_length, station_names),
1031 # error = function(e) {
1032 # stop(paste("netCDF schema creation failed", e))
1033 # None
1034 # }, finally = function() {
1035 # }
1036 # )
1037 # nc = createSchema(fname, varDefs, data_var_definitions, nc_attributes, optional_vars,
1038 # stations_ids, lead_length, ensemble_length, station_names)
1040 return EftsDataSet(d)
1043# ########################################
1044# # Below are functions not exported
1045# ########################################
1047# infoList(theList) {
1048# paste(paste(names(theList), theList, sep = ": "), collapse = ", ")
1049# }
1051# createSchema(fname, varDefs, data_var_definitions, nc_attributes, optional_vars,
1052# stations_ids, lead_length, ensemble_length, station_names=NA) {
1054# allVars = c(varDefs$datavars, varDefs$metadatavars)
1055# nc = ncdf4::nc_create(fname, vars = allVars)
1057# ## attributes for data variables
1058# lapply(data_var_definitions, put_variable_attributes, nc)
1060# ## attributes for dimensions variables
1061# ncdf4::ncatt_put(nc, TIME_DIMNAME, STANDARD_NAME_KEY, TIME_DIMNAME)
1062# ncdf4::ncatt_put(nc, TIME_DIMNAME, TIME_STANDARD_KEY, "UTC")
1063# ncdf4::ncatt_put(nc, TIME_DIMNAME, AXIS_ATTR_KEY, "t")
1064# ncdf4::ncatt_put(nc, ENS_MEMBER_DIMNAME, STANDARD_NAME_KEY, ENS_MEMBER_DIMNAME)
1065# ncdf4::ncatt_put(nc, ENS_MEMBER_DIMNAME, AXIS_ATTR_KEY, "u")
1066# ncdf4::ncatt_put(nc, LEAD_TIME_DIMNAME, STANDARD_NAME_KEY, LEAD_TIME_DIMNAME)
1067# ncdf4::ncatt_put(nc, LEAD_TIME_DIMNAME, AXIS_ATTR_KEY, "v")
1068# ncdf4::ncatt_put(nc, LAT_VARNAME, AXIS_ATTR_KEY, "y")
1069# ncdf4::ncatt_put(nc, lon_varname, AXIS_ATTR_KEY, "x")
1071# ## attributes for optional metadata variables
1072# if(!is.None(optional_vars))
1073# {
1074# var_names = rownames(optional_vars)
1075# if(STANDARD_NAME_KEY %in% colnames(optional_vars)){
1076# for (v in var_names) {
1077# sn = optional_vars[v, STANDARD_NAME_KEY]
1078# if(!is.na(sn)) ncdf4::ncatt_put(nc, v, STANDARD_NAME_KEY, sn)
1079# }
1080# }
1081# if(x_varname %in% var_names){
1082# ncdf4::ncatt_put(nc, x_varname, AXIS_ATTR_KEY, "x")
1083# }
1084# if(y_varname %in% var_names){
1085# ncdf4::ncatt_put(nc, y_varname, AXIS_ATTR_KEY, "y")
1086# }
1087# }
1089# ## Add global attributes
1090# ncdf4::ncatt_put(nc, 0, STF_CONVENTION_VERSION_ATTR_KEY, 2)
1091# ncdf4::ncatt_put(nc, 0, STF_NC_SPEC_ATTR_KEY, "https://github.com/jmp75/efts/blob/107c553045a37e6ef36b2eababf6a299e7883d50/docs/netcdf_for_water_forecasting.md")
1092# ncdf4::ncatt_put(nc, 0, HISTORY_ATTR_KEY,
1093# paste(
1094# as.character(lubridate::now(tzone="UTC")),
1095# "UTC",
1096# "file created with the R package efts", packageDescription("efts")$Version
1097# ) %>% infoList)
1099# if(!is.None(nc_attributes)) {
1100# for (k in names(nc_attributes)) {
1101# pad_global_attribute(nc, k, nc_attributes[k])
1102# }
1103# }
1105# ## populate metadata variables
1106# ncdf4::ncvar_put(nc, STATION_ID_VARNAME, stations_ids)
1107# ncdf4::ncvar_put(nc, LEAD_TIME_DIMNAME, 1:lead_length)
1108# ncdf4::ncvar_put(nc, ENS_MEMBER_DIMNAME, 1:ensemble_length)
1109# if (!is.None(station_names)) {
1110# ncdf4::ncvar_put(nc, STATION_NAME_VARNAME, station_names)
1111# }
1112# # One seems to need to close/reopen the newly created file, otherwise some
1113# # ncvar_get operations will fail with a cryptic message. I follow the
1114# # advice in this and associated posts
1115# # https://www.unidata.ucar.edu/mailing_lists/archives/netcdfgroup/2012/msg00270.html
1116# ncdf4::nc_close(nc)
1117# nc = ncdf4::nc_open(fname, write = TRUE, readunlim = FALSE)
1118# return(nc)
1119# }