Coverage for src/efts_io/_internals.py: 20.00%

16 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2025-07-24 10:14 +1000

1# check_index_found(index_id, identifier, dimension_id) { 

2# if (length(index_id) == 0) 

3# stop(paste0("identifier '", identifier, "' not found in the dimension '", 

4# dimension_id, "'")) 

5# } 

6 

7# STATION_DIMNAME = "station" 

8# LEAD_TIME_DIMNAME = "lead_time" 

9# TIME_DIMNAME = "time" 

10# ENS_MEMBER_DIMNAME = "ens_member" 

11# STR_LEN_DIMNAME = "str_len" 

12 

13# # int station_id[station] 

14# STATION_ID_VARNAME = "station_id" 

15# # char station_name[str_len,station] 

16# STATION_NAME_VARNAME = "station_name" 

17# # float lat[station] 

18# LAT_VARNAME = LAT_VARNAME 

19# # float lon[station] 

20# lon_varname = "lon" 

21# # float x[station] 

22# x_varname = "x" 

23# # float y[station] 

24# y_varname = "y" 

25# # float area[station] 

26# area_varname = "area" 

27# # float elevation[station] 

28# elevation_varname = "elevation" 

29 

30# conventional_varnames = c( 

31# STATION_DIMNAME , 

32# LEAD_TIME_DIMNAME , 

33# TIME_DIMNAME , 

34# ENS_MEMBER_DIMNAME , 

35# STR_LEN_DIMNAME , 

36# STATION_ID_VARNAME , 

37# STATION_NAME_VARNAME , 

38# LAT_VARNAME , 

39# lon_varname , 

40# x_varname , 

41# y_varname , 

42# area_varname , 

43# elevation_varname 

44# ) 

45 

46# mandatory_global_attributes = c("title", INSTITUTION_ATTR_KEY, SOURCE_ATTR_KEY, CATCHMENT_ATTR_KEY, COMMENT_ATTR_KEY) 

47 

48 

49# get_default_dim_order() { 

50# return(c(LEAD_TIME_DIMNAME, STATION_DIMNAME, ENS_MEMBER_DIMNAME, TIME_DIMNAME)) 

51# } 

52 

53# splice_named_var(d, ncdims = character()) { 

54# default_order = get_default_dim_order() 

55# d = as.integer(d) 

56# stopifnot(length(d) == 4) 

57# stopifnot(is.vector(d)) 

58# # lead_time,station,ens_member,time 

59# names(d) = default_order 

60# if (length(ncdims) > 0) { 

61# if (!all(ncdims %in% default_order)) { 

62# stop(paste0("Invalid dimensions for a data variable: ", paste(ncdims, 

63# collapse = ","))) 

64# } else { 

65# d = d[ncdims] 

66# names(d) = ncdims 

67# } 

68# } 

69# return(d) 

70# } 

71 

72 

73# dim_names(x) { 

74# attr(x, 'dim_names') 

75# } 

76 

77# "dim_names<-"(x, value) { 

78# d = dim(x) 

79# if(is.array(x)) { 

80# if(length(d) != length(value)) stop("dim names is not equal to the number of dimensions of the array") 

81# if(length(unique(value)) != length(d)) stop("specified dim names are not unique") 

82# } else if (is.vector(x)){ 

83# stopifnot(length(value) == 1) 

84# } else { stop('not an array nor a vector - cannot set dim_names')} 

85# attr(x, 'dim_names') = value 

86# return(x) 

87# } 

88 

89# reduce_dimensions(x, subset_dim_names){ 

90# dimsize_input = dim(x) 

91# dn = dim_names(x) 

92# if(is.null(dn)) stop('the input array must have a valid dim_names attribute') 

93# if(length(dn) != length(dimsize_input)) stop('the input array and its dim_names attribute are differing in length') 

94 

95# names(dimsize_input) = dn 

96# if(missing(subset_dim_names) || is.na(subset_dim_names)) 

97# subset_dim_names = dn[dimsize_input > 1] 

98 

99# diffdim = setdiff(subset_dim_names, dn) 

100# if (length(diffdim)>0) stop(paste0('Dimension names to slice but not found in array dim names: ', paste(diffdim, collpase=', '))) 

101 

102# dropped_dims = setdiff(dn,subset_dim_names) 

103# if( any(dimsize_input[dropped_dims] > 1)) stop('Cannot drop non-degenerate when subsetting') 

104 

105# w = match(subset_dim_names,dn) 

106# other = match(setdiff(dn, subset_dim_names),dn) 

107 

108# x_reordered = aperm(x, c(w, other)) 

109 

110# reordered_dim_names = dn[c(w, other)] 

111# reordered_dim_sizes = dim(x_reordered) 

112 

113# new_dim_sizes = reordered_dim_sizes[1:length(w)] 

114# new_dim_names = reordered_dim_names[1:length(w)] 

115 

116# y = drop(x_reordered) 

117# # We want however to maintain degenerate 

118# # dimensions that have been explicitly asked for, 

119# # and that would have been otherwise dropped 

120# y = array(y, new_dim_sizes) 

121 

122# dim_names(y) = new_dim_names 

123# return(y) 

124# } 

125 

126#' @import magrittr 

127from typing import Any, Dict, Tuple 

128 

129import xarray as xr 

130 

131from efts_io.conventions import FILLVALUE_ATTR_KEY, UNITS_ATTR_KEY 

132 

133 

134def create_data_variable(data_var_def: Dict[str, Any], dimensions: Tuple[str, Tuple]) -> xr.Variable: 

135 import numpy as np 

136 import xarray as xr 

137 

138 a = data_var_def 

139 # (c("name", UNITS_ATTR_KEY) %in% names(a)) %>% all %>% stopifnot 

140 varname = a["name"] 

141 longname = a.get("longname", varname) 

142 precision = a.get("precision", "double") 

143 missval = a.get("missval", -9999) 

144 

145 dimnames = [d[0] for d in dimensions] 

146 if not isinstance(dimnames[0], str): 

147 raise TypeError("Dimension names must be strings.") 

148 shape = tuple(len(d[1]) for d in dimensions) 

149 return xr.Variable( 

150 dims=dimnames, 

151 data=np.empty(shape, dtype=float), # TODO: should this use precision? 

152 encoding={FILLVALUE_ATTR_KEY: missval}, 

153 attrs={ 

154 "longname": longname, 

155 UNITS_ATTR_KEY: a[UNITS_ATTR_KEY], 

156 "missval": missval, 

157 "precision": precision, 

158 }, 

159 ) 

160 

161 # xr.Variable(dims=dimensions, data, attrs=None, encoding=None, fastpath=False) 

162 # vardef = ncdf4::ncvar_def(name = varname, units = a[UNITS_ATTR_KEY], dim = dimensions, 

163 # longname = ifelse("longname" %in% names(a), a["longname"], varname) 

164 # precision = ifelse("precision" %in% names(a), a["precision"], "double") 

165 # missval = ifelse("missval" %in% names(a), a["missval"]], -9999) 

166 # vardef = ncdf4::ncvar_def(name = varname, units = a[UNITS_ATTR_KEY], dim = dimensions, 

167 # missval = missval, longname = longname, prec = precision)