Coverage for src/efts_io/_internals.py: 20.00%
16 statements
« prev ^ index » next coverage.py v7.6.1, created at 2025-07-24 10:14 +1000
« prev ^ index » next coverage.py v7.6.1, created at 2025-07-24 10:14 +1000
1# check_index_found(index_id, identifier, dimension_id) {
2# if (length(index_id) == 0)
3# stop(paste0("identifier '", identifier, "' not found in the dimension '",
4# dimension_id, "'"))
5# }
7# STATION_DIMNAME = "station"
8# LEAD_TIME_DIMNAME = "lead_time"
9# TIME_DIMNAME = "time"
10# ENS_MEMBER_DIMNAME = "ens_member"
11# STR_LEN_DIMNAME = "str_len"
13# # int station_id[station]
14# STATION_ID_VARNAME = "station_id"
15# # char station_name[str_len,station]
16# STATION_NAME_VARNAME = "station_name"
17# # float lat[station]
18# LAT_VARNAME = LAT_VARNAME
19# # float lon[station]
20# lon_varname = "lon"
21# # float x[station]
22# x_varname = "x"
23# # float y[station]
24# y_varname = "y"
25# # float area[station]
26# area_varname = "area"
27# # float elevation[station]
28# elevation_varname = "elevation"
30# conventional_varnames = c(
31# STATION_DIMNAME ,
32# LEAD_TIME_DIMNAME ,
33# TIME_DIMNAME ,
34# ENS_MEMBER_DIMNAME ,
35# STR_LEN_DIMNAME ,
36# STATION_ID_VARNAME ,
37# STATION_NAME_VARNAME ,
38# LAT_VARNAME ,
39# lon_varname ,
40# x_varname ,
41# y_varname ,
42# area_varname ,
43# elevation_varname
44# )
46# mandatory_global_attributes = c("title", INSTITUTION_ATTR_KEY, SOURCE_ATTR_KEY, CATCHMENT_ATTR_KEY, COMMENT_ATTR_KEY)
49# get_default_dim_order() {
50# return(c(LEAD_TIME_DIMNAME, STATION_DIMNAME, ENS_MEMBER_DIMNAME, TIME_DIMNAME))
51# }
53# splice_named_var(d, ncdims = character()) {
54# default_order = get_default_dim_order()
55# d = as.integer(d)
56# stopifnot(length(d) == 4)
57# stopifnot(is.vector(d))
58# # lead_time,station,ens_member,time
59# names(d) = default_order
60# if (length(ncdims) > 0) {
61# if (!all(ncdims %in% default_order)) {
62# stop(paste0("Invalid dimensions for a data variable: ", paste(ncdims,
63# collapse = ",")))
64# } else {
65# d = d[ncdims]
66# names(d) = ncdims
67# }
68# }
69# return(d)
70# }
73# dim_names(x) {
74# attr(x, 'dim_names')
75# }
77# "dim_names<-"(x, value) {
78# d = dim(x)
79# if(is.array(x)) {
80# if(length(d) != length(value)) stop("dim names is not equal to the number of dimensions of the array")
81# if(length(unique(value)) != length(d)) stop("specified dim names are not unique")
82# } else if (is.vector(x)){
83# stopifnot(length(value) == 1)
84# } else { stop('not an array nor a vector - cannot set dim_names')}
85# attr(x, 'dim_names') = value
86# return(x)
87# }
89# reduce_dimensions(x, subset_dim_names){
90# dimsize_input = dim(x)
91# dn = dim_names(x)
92# if(is.null(dn)) stop('the input array must have a valid dim_names attribute')
93# if(length(dn) != length(dimsize_input)) stop('the input array and its dim_names attribute are differing in length')
95# names(dimsize_input) = dn
96# if(missing(subset_dim_names) || is.na(subset_dim_names))
97# subset_dim_names = dn[dimsize_input > 1]
99# diffdim = setdiff(subset_dim_names, dn)
100# if (length(diffdim)>0) stop(paste0('Dimension names to slice but not found in array dim names: ', paste(diffdim, collpase=', ')))
102# dropped_dims = setdiff(dn,subset_dim_names)
103# if( any(dimsize_input[dropped_dims] > 1)) stop('Cannot drop non-degenerate when subsetting')
105# w = match(subset_dim_names,dn)
106# other = match(setdiff(dn, subset_dim_names),dn)
108# x_reordered = aperm(x, c(w, other))
110# reordered_dim_names = dn[c(w, other)]
111# reordered_dim_sizes = dim(x_reordered)
113# new_dim_sizes = reordered_dim_sizes[1:length(w)]
114# new_dim_names = reordered_dim_names[1:length(w)]
116# y = drop(x_reordered)
117# # We want however to maintain degenerate
118# # dimensions that have been explicitly asked for,
119# # and that would have been otherwise dropped
120# y = array(y, new_dim_sizes)
122# dim_names(y) = new_dim_names
123# return(y)
124# }
126#' @import magrittr
127from typing import Any, Dict, Tuple
129import xarray as xr
131from efts_io.conventions import FILLVALUE_ATTR_KEY, UNITS_ATTR_KEY
134def create_data_variable(data_var_def: Dict[str, Any], dimensions: Tuple[str, Tuple]) -> xr.Variable:
135 import numpy as np
136 import xarray as xr
138 a = data_var_def
139 # (c("name", UNITS_ATTR_KEY) %in% names(a)) %>% all %>% stopifnot
140 varname = a["name"]
141 longname = a.get("longname", varname)
142 precision = a.get("precision", "double")
143 missval = a.get("missval", -9999)
145 dimnames = [d[0] for d in dimensions]
146 if not isinstance(dimnames[0], str):
147 raise TypeError("Dimension names must be strings.")
148 shape = tuple(len(d[1]) for d in dimensions)
149 return xr.Variable(
150 dims=dimnames,
151 data=np.empty(shape, dtype=float), # TODO: should this use precision?
152 encoding={FILLVALUE_ATTR_KEY: missval},
153 attrs={
154 "longname": longname,
155 UNITS_ATTR_KEY: a[UNITS_ATTR_KEY],
156 "missval": missval,
157 "precision": precision,
158 },
159 )
161 # xr.Variable(dims=dimensions, data, attrs=None, encoding=None, fastpath=False)
162 # vardef = ncdf4::ncvar_def(name = varname, units = a[UNITS_ATTR_KEY], dim = dimensions,
163 # longname = ifelse("longname" %in% names(a), a["longname"], varname)
164 # precision = ifelse("precision" %in% names(a), a["precision"], "double")
165 # missval = ifelse("missval" %in% names(a), a["missval"]], -9999)
166 # vardef = ncdf4::ncvar_def(name = varname, units = a[UNITS_ATTR_KEY], dim = dimensions,
167 # missval = missval, longname = longname, prec = precision)