Coverage for tests/test_read_file.py: 32.48%

103 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2025-07-24 10:14 +1000

1import os 

2from typing import Optional 

3 

4# import netCDF4 

5import numpy as np 

6import pandas as pd 

7 

8from efts_io.dimensions import create_time_info, create_timestamps 

9from efts_io.wrapper import EftsDataSet 

10 

11pkg_dir = os.path.join(os.path.dirname(__file__), "..") 

12 

13variable_names = ["variable_1", "variable_2"] 

14station_ids_ints = [123, 456] 

15 

16nEns = 3 

17nLead = 4 

18x = np.arange(1, (nEns * nLead) + 1) 

19x = x.reshape((nLead, nEns)) 

20y = x + nEns * nLead 

21 

22timeAxisStart = pd.Timestamp( 

23 year=2010, 

24 month=8, 

25 day=1, 

26 hour=12, 

27 minute=0, 

28 second=0, 

29 tz="UTC", 

30) 

31tested_fcast_issue_time = timeAxisStart + pd.Timedelta(6, "h") 

32v1 = variable_names[0] 

33s1 = station_ids_ints[0] 

34v2 = variable_names[1] 

35s2 = station_ids_ints[1] 

36 

37 

38def dhours(i): 

39 return pd.Timedelta(i, "h") 

40 

41 

42def ddays(i): 

43 return pd.Timedelta(i * 24, "h") 

44 

45import pytest 

46 

47@pytest.mark.skip(reason="Ported from the R package, but may not be relevant or the best approach anymore") 

48def test_read_thing(): 

49 fn = os.path.join(pkg_dir, "tests", "data", "hourly_test.nc") 

50 assert os.path.exists(fn) 

51 ds = EftsDataSet(fn) 

52 assert set(ds.get_dim_names()) == {"ens_member", "lead_time", "station", "str_len", "time"} 

53 r1 = ds.get_ensemble_forecasts( 

54 variable_name=v1, 

55 identifier=s1, 

56 start_time=tested_fcast_issue_time, 

57 ) 

58 r2 = ds.get_ensemble_forecasts( 

59 variable_name=v2, 

60 identifier=s2, 

61 start_time=tested_fcast_issue_time, 

62 ) 

63 assert r1[1, 1] == 6 

64 assert r2[1, 1] == 18 

65 # Check the lead time axix: 

66 # fcast_timeaxis = index(r1) 

67 # assert (fcast_timeaxis[0], tested_fcast_issue_time + lead_ts(lead_time_step_start_offset)) 

68 # assert (fcast_timeaxis[1], tested_fcast_issue_time + lead_ts(lead_time_step_start_offset + lead_time_step_delta)) 

69 

70 

71def _do_time_axis_test( 

72 tstart: pd.Timestamp, 

73 time_step: str = "days since", 

74 time_step_delta: int = 1, 

75 n: int = 10, 

76 tz_str: Optional[str] = None, 

77 expected_offset: Optional[pd.DateOffset] = None, 

78): 

79 time_dim_info = create_time_info( 

80 start=tstart, 

81 n=n, 

82 time_step=time_step, 

83 time_step_delta=time_step_delta, 

84 ) 

85 timestamps = create_timestamps(time_dim_info, tz_str) 

86 expected_timestamps = [tstart + expected_offset * i for i in range(n)] 

87 assert np.all(timestamps == expected_timestamps) 

88 

89 

90def test_time_axis(): 

91 n = 10 

92 tz_str = "UTC" 

93 tstart = pd.Timestamp( 

94 year=2010, 

95 month=8, 

96 day=1, 

97 hour=23, 

98 minute=0, 

99 second=0, 

100 tz=tz_str, 

101 ) 

102 

103 for time_step, time_step_delta, expected_offset in [ 

104 ("hours since", 1, pd.DateOffset(hours=1)), 

105 ("hours since", 3, pd.DateOffset(hours=3)), 

106 ("days since", 1, pd.DateOffset(days=1)), 

107 ("days since", 3, pd.DateOffset(days=3)), 

108 # TODO 

109 # ("weeks since", 1, pd.DateOffset(weeks=1)), 

110 # ("months since", 1, pd.DateOffset(months=1)), 

111 ]: 

112 _do_time_axis_test( 

113 tstart, 

114 time_step, 

115 time_step_delta, 

116 n, 

117 tz_str=tz_str, 

118 expected_offset=expected_offset, 

119 ) 

120 

121 

122# put tests in a tryCatch, to maximise the chances of cleaning up temporary 

123# files. 

124def doTests( 

125 tempNcFname, 

126 lead_time_tstep="hours", 

127 time_step="hours since", 

128 time_step_delta=1, 

129 lead_time_step_start_offset=1, 

130 lead_time_step_delta=1, 

131): 

132 # lead_time_tstep = "days" 

133 # time_step = "days since" 

134 # time_step_delta = 1L 

135 # lead_time_step_start_offset = 1L 

136 # lead_time_step_delta = 1L 

137 

138 case_params = "".join( 

139 [ 

140 "lts=", 

141 lead_time_tstep, 

142 ",ts=", 

143 time_step, 

144 ",tsdelta=", 

145 str(time_step_delta), 

146 ",ltsoffset=", 

147 str(lead_time_step_start_offset), 

148 ",ltsdelta=", 

149 str(lead_time_step_delta), 

150 ], 

151 ) 

152 from efts_io.dimensions import create_time_info 

153 

154 time_dim_info = create_time_info( 

155 start=timeAxisStart, 

156 n=10, 

157 time_step=time_step, 

158 time_step_delta=time_step_delta, 

159 ) 

160 

161 n = len(variable_names) 

162 varsDef = pd.DataFrame.from_dict( 

163 { 

164 "name": variable_names, 

165 "longname": ["long name for " + name for name in variable_names], 

166 UNITS_ATTR_KEY: np.repeat("mm", n), 

167 "missval": np.repeat(-999, n), 

168 "precision": np.repeat("double", n), 

169 TYPE_ATTR_KEY: np.repeat(2, n), 

170 "dimensions": np.repeat("4", n), 

171 TYPE_DESCRIPTION_ATTR_KEY: np.repeat("accumulated over the previous time step", n), 

172 LOCATION_TYPE_ATTR_KEY: np.repeat("Point", n), 

173 }, 

174 ) 

175 from efts_io.attributes import create_global_attributes 

176 

177 glob_attr = create_global_attributes( 

178 title="title test", 

179 institution="test", 

180 source="test", 

181 catchment="dummy", 

182 comment="none", 

183 ) 

184 

185 from efts_io.variables import create_variable_definitions 

186 

187 var_defs_dict = create_variable_definitions(varsDef) 

188 lead_times_offsets = ( 

189 np.arange(lead_time_step_start_offset, lead_time_step_start_offset + nLead) * lead_time_step_delta 

190 ) 

191 

192 tz_str = "UTC" 

193 

194 issue_times = create_timestamps(time_dim_info, tz_str) 

195 from efts_io.wrapper import xr_efts 

196 

197 # TODO: expand to test non-integer station_ids 

198 station_ids = [str(i) for i in station_ids_ints] 

199 ensemble_size = nEns 

200 station_names = ["station_" + str(i) for i in station_ids_ints] 

201 xr_data = xr_efts( 

202 issue_times, 

203 station_ids, 

204 lead_times_offsets, 

205 lead_time_tstep, 

206 ensemble_size, 

207 station_names, 

208 nc_attributes=glob_attr, 

209 ) 

210 

211 # snc = create_efts( 

212 # tempNcFname, 

213 # time_dim_info, 

214 # var_defs_dict, 

215 # station_ids_ints, 

216 # nc_attributes=glob_attr, 

217 # lead_length=nLead, 

218 # ensemble_length=nEns, 

219 # lead_time_tstep=lead_time_tstep, 

220 # ) 

221 snc = EftsDataSet(xr_data) 

222 

223 snc.create_data_variables(var_defs_dict) 

224 

225 snc.put_ensemble_forecasts( 

226 x, 

227 variable_name=v1, 

228 identifier=s1, 

229 start_time=tested_fcast_issue_time, 

230 ) 

231 snc.put_ensemble_forecasts( 

232 y, 

233 variable_name=v2, 

234 identifier=s2, 

235 start_time=tested_fcast_issue_time, 

236 ) 

237 

238 r1 = snc.get_ensemble_forecasts( 

239 variable_name=v1, 

240 identifier=s1, 

241 start_time=tested_fcast_issue_time, 

242 ) 

243 r2 = snc.get_ensemble_forecasts( 

244 variable_name=v2, 

245 identifier=s2, 

246 start_time=tested_fcast_issue_time, 

247 ) 

248 assert r1[1, 1] == 6 

249 assert r2[1, 1] == 18 

250 snc.write() 

251 

252 if lead_time_tstep == "hours": 

253 lead_ts = dhours 

254 elif lead_time_tstep == "days": 

255 lead_ts = ddays 

256 

257 snc.to_netcdf(tempNcFname) 

258 

259 from efts_io.wrapper import open_efts 

260 

261 snc = open_efts(tempNcFname) 

262 r1 = snc.get_ensemble_forecasts( 

263 variable_name=v1, 

264 identifier=s1, 

265 start_time=tested_fcast_issue_time, 

266 ) 

267 r2 = snc.get_ensemble_forecasts( 

268 variable_name=v2, 

269 identifier=s2, 

270 start_time=tested_fcast_issue_time, 

271 ) 

272 assert r1[1, 1] == 6 

273 assert r2[1, 1] == 18 

274 # Check the lead time axix: 

275 fcast_timeaxis = r1.lead_time 

276 assert fcast_timeaxis[0] == tested_fcast_issue_time + lead_ts( 

277 lead_time_step_start_offset, 

278 ) 

279 assert fcast_timeaxis[1] == tested_fcast_issue_time + lead_ts( 

280 lead_time_step_start_offset + lead_time_step_delta, 

281 ) 

282 snc.close() 

283 

284 

285import tempfile 

286import pytest 

287 

288@pytest.mark.skip(reason="Ported from the R package, but may not be relevant or the best approach anymore") 

289def test_round_trip(): 

290 with tempfile.TemporaryDirectory() as temp_dir: 

291 tested_fcast_issue_time = timeAxisStart + ddays(2) 

292 # Covers https://github.com/jmp75/efts/issues/6 

293 tempNcFname = os.path.join(temp_dir, "days.nc") 

294 doTests( 

295 tempNcFname, 

296 lead_time_tstep="days", 

297 time_step="days since", 

298 time_step_delta=1, 

299 lead_time_step_start_offset=1, 

300 lead_time_step_delta=1, 

301 ) 

302 

303 tested_fcast_issue_time = timeAxisStart + dhours(6) 

304 

305 tempNcFname = os.path.join(temp_dir, "hourly.nc") 

306 doTests( 

307 tempNcFname, 

308 lead_time_tstep="hours", 

309 time_step="hours since", 

310 time_step_delta=1, 

311 lead_time_step_start_offset=1, 

312 lead_time_step_delta=1, 

313 ) 

314 

315 tempNcFname = os.path.join(temp_dir, "three_hourly.nc") 

316 doTests( 

317 tempNcFname, 

318 lead_time_tstep="hours", 

319 time_step="hours since", 

320 time_step_delta=1, 

321 lead_time_step_start_offset=1, 

322 lead_time_step_delta=3, 

323 ) 

324 

325 

326if __name__ == "__main__": 326 ↛ 329line 326 didn't jump to line 329 because the condition on line 326 was never true

327 # test_time_axis() 

328 # test_read_thing() 

329 test_round_trip()