Coverage for tests/test_swift.py: 100.00%

134 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-09-10 21:20 +1000

1import geopandas as gpd 

2import numpy as np 

3import pandas as pd 

4import pytest 

5from shapely.geometry import LineString 

6 

7from geosdhydro import ShapefileToSwiftConverter 

8 

9 

10def test_one_link_two_nodes_one_subarea() -> None: 

11 """Test conversion with one link, two nodes, and one subarea.""" 

12 # Create test data 

13 data = { 

14 "LinkID": [1], 

15 "FromNodeID": [1], 

16 "ToNodeID": [2], 

17 "SPathLen": [1000.0], 

18 "DArea2": [5000000.0], # 5 km² in m² 

19 "geometry": [LineString([(1.1, 1.2), (2.1, 2.2)])], 

20 } 

21 gdf = gpd.GeoDataFrame(data) 

22 

23 converter = ShapefileToSwiftConverter(gdf) 

24 result = converter.convert() 

25 

26 # Test structure 

27 assert len(result["Links"]) == 1 

28 assert len(result["Nodes"]) == 2 

29 assert len(result["SubAreas"]) == 1 

30 

31 # Test link details 

32 link = result["Links"][0] 

33 assert link["ID"] == "1" 

34 assert link["UpstreamNodeID"] == "1" 

35 assert link["DownstreamNodeID"] == "2" 

36 assert link["Length"] == 1000.0 

37 assert link["Name"] == "1" 

38 

39 # Test nodes 

40 node_ids = {node["ID"] for node in result["Nodes"]} 

41 assert node_ids == {"1", "2"} 

42 

43 # Test subarea 

44 subarea = result["SubAreas"][0] 

45 assert subarea["ID"] == "1" 

46 assert subarea["LinkID"] == "1" 

47 assert subarea["AreaKm2"] == 5.0 

48 assert subarea["Name"] == "Subarea_1" 

49 

50 

51def test_one_link_two_nodes_no_subarea() -> None: 

52 """Test conversion with one link, two nodes, and no subarea.""" 

53 # Create test data with negative DArea2 (no subarea) 

54 data = { 

55 "LinkID": [1], 

56 "FromNodeID": [1], 

57 "ToNodeID": [2], 

58 "SPathLen": [1500.0], 

59 "DArea2": [-1.0], # Negative value means no subarea 

60 "geometry": [LineString([(1.1, 1.2), (2.1, 2.2)])], 

61 } 

62 gdf = gpd.GeoDataFrame(data) 

63 

64 converter = ShapefileToSwiftConverter(gdf) 

65 result = converter.convert() 

66 

67 # Test structure 

68 assert len(result["Links"]) == 1 

69 assert len(result["Nodes"]) == 2 

70 assert len(result["SubAreas"]) == 0 # No subareas expected 

71 

72 # Test link details 

73 link = result["Links"][0] 

74 assert link["ID"] == "1" 

75 assert link["UpstreamNodeID"] == "1" 

76 assert link["DownstreamNodeID"] == "2" 

77 assert link["Length"] == 1500.0 

78 

79 # Test nodes exist 

80 node_ids = {node["ID"] for node in result["Nodes"]} 

81 assert node_ids == {"1", "2"} 

82 

83 

84def test_coordinates_included() -> None: 

85 """Test conversion with coordinates included in nodes.""" 

86 # Create test data 

87 data = { 

88 "LinkID": [1], 

89 "FromNodeID": [1], 

90 "ToNodeID": [2], 

91 "SPathLen": [1000.0], 

92 "DArea2": [5000000.0], 

93 "geometry": [LineString([(1.1, 1.2), (2.1, 2.2)])], 

94 } 

95 gdf = gpd.GeoDataFrame(data) 

96 

97 converter = ShapefileToSwiftConverter(gdf, include_coordinates=True) 

98 result = converter.convert() 

99 

100 # Find nodes by ID 

101 nodes_by_id = {node["ID"]: node for node in result["Nodes"]} 

102 

103 # Test node 1 coordinates (start point) 

104 node1 = nodes_by_id["1"] 

105 assert "Longitude" in node1 

106 assert "Latitude" in node1 

107 assert node1["Longitude"] == 1.1 

108 assert node1["Latitude"] == 1.2 

109 

110 # Test node 2 coordinates (end point) 

111 node2 = nodes_by_id["2"] 

112 assert node2["Longitude"] == 2.1 

113 assert node2["Latitude"] == 2.2 

114 

115 

116def test_complex_catchment_structure() -> None: 

117 """Test conversion with complex catchment: 5 links, 6 nodes, 4 subareas.""" 

118 # Create test data 

119 data = { 

120 "LinkID": [1, 2, 3, 4, 5], 

121 "FromNodeID": [2, 3, 4, 5, 6], 

122 "ToNodeID": [1, 2, 2, 2, 5], 

123 "SPathLen": [1000.0, 1500.0, 2000.0, 800.0, 1200.0], 

124 "DArea2": [3000000.0, 4000000.0, 2500000.0, -1.0, 3500000.0], # Link 4 has negative area 

125 "geometry": [ 

126 LineString([(2.1, 2.2), (1.1, 1.2)]), # Link 1: node 2 -> node 1 

127 LineString([(3.1, 3.2), (2.1, 2.2)]), # Link 2: node 3 -> node 2 

128 LineString([(4.1, 4.2), (2.1, 2.2)]), # Link 3: node 4 -> node 2 

129 LineString([(5.1, 5.2), (2.1, 2.2)]), # Link 4: node 5 -> node 2 

130 LineString([(6.1, 6.2), (5.1, 5.2)]), # Link 5: node 6 -> node 5 

131 ], 

132 } 

133 gdf = gpd.GeoDataFrame(data) 

134 

135 converter = ShapefileToSwiftConverter(gdf) 

136 result = converter.convert() 

137 

138 # Test structure 

139 assert len(result["Links"]) == 5 

140 assert len(result["Nodes"]) == 6 

141 assert len(result["SubAreas"]) == 4 # Links 1,2,3,5 have subareas 

142 

143 # Test nodes exist 

144 node_ids = {node["ID"] for node in result["Nodes"]} 

145 assert node_ids == {"1", "2", "3", "4", "5", "6"} 

146 

147 # Test subareas (should be for links 1,2,3,5 only) 

148 subarea_link_ids = {subarea["LinkID"] for subarea in result["SubAreas"]} 

149 assert subarea_link_ids == {"1", "2", "3", "5"} 

150 

151 # Verify link 4 has no subarea 

152 assert "4" not in subarea_link_ids 

153 

154 

155 

156def test_invalid_spathlen_type() -> None: 

157 """Test that an exception is raised when spathlen is not a numeric type.""" 

158 # Create test data with ToNodeID as float 

159 data = { 

160 "LinkID": [1], 

161 "FromNodeID": [1], 

162 "ToNodeID": ["2"], 

163 "SPathLen": ["1000.0"], # valid, but wrong type, cannot be converted to float 

164 "DArea2": [5000000.0], 

165 "geometry": [LineString([(1.1, 1.2), (2.1, 2.2)])], 

166 } 

167 gdf = gpd.GeoDataFrame(data) 

168 

169 # Expect a TypeError due to wrong column type 

170 with pytest.raises(TypeError): 

171 ShapefileToSwiftConverter(gdf) 

172 

173 

174def test_invalid_spathlenname_type() -> None: 

175 """Test that an exception is raised when SPathLen column is not default expected name.""" 

176 data = { 

177 "LinkID": [1], 

178 "FromNodeID": [1], 

179 "ToNodeID": ["2"], 

180 "SPathLen_WrongName": [1000.0], 

181 "DArea2": [5000000.0], 

182 "geometry": [LineString([(1.1, 1.2), (2.1, 2.2)])], 

183 } 

184 gdf = gpd.GeoDataFrame(data) 

185 

186 # Expect a TypeError due to wrong column type 

187 with pytest.raises(ValueError): # noqa: PT011 

188 ShapefileToSwiftConverter(gdf) 

189 

190def test_duplicate_link_ids() -> None: 

191 """Test that an exception is raised when LinkID column contains duplicate values.""" 

192 # Create test data with duplicate LinkID values 

193 data = { 

194 "LinkID": [1, 2, 1, 3, 2, 2], # LinkID 1 and 2 are duplicated 

195 "FromNodeID": [1, 2, 1, 3, 2, 2], 

196 "ToNodeID": [2, 3, 2, 4, 3, 3], 

197 "SPathLen": [1000.0, 1500.0, 1000.0, 2000.0, 1500.0, 1500.0], 

198 "DArea2": [5000000.0, 4000000.0, 5000000.0, 3000000.0, 4000000.0, 4000000.0], 

199 "geometry": [ 

200 LineString([(1.1, 1.2), (2.1, 2.2)]), 

201 LineString([(2.1, 2.2), (3.1, 3.2)]), 

202 LineString([(1.1, 1.2), (2.1, 2.2)]), 

203 LineString([(3.1, 3.2), (4.1, 4.2)]), 

204 LineString([(2.1, 2.2), (3.1, 3.2)]), 

205 LineString([(2.1, 2.2), (3.1, 3.2)]), 

206 ], 

207 } 

208 gdf = gpd.GeoDataFrame(data) 

209 

210 # Expect a ValueError due to duplicate LinkID values 

211 with pytest.raises(ValueError) as excinfo: # noqa: PT011 

212 ShapefileToSwiftConverter(gdf) 

213 

214 # Check the error message 

215 assert "Column 'LinkID' contains duplicate values: ['2', '1'] at indices" in str(excinfo.value) 

216 

217def test_valid_numeric_types() -> None: 

218 """Test that valid numeric types for SPathLen and DArea2 columns are accepted.""" 

219 # Create test data with various numeric types 

220 for t in [float, np.float32, np.int32, np.uint16]: 

221 data = { 

222 "LinkID": [1], 

223 "FromNodeID": [1], 

224 "ToNodeID": [2], 

225 "SPathLen": [t(1000.0)], 

226 "DArea2": [t(64000.0)], # Valid numeric type, small enough to fit in uint16 

227 "geometry": [LineString([(1.1, 1.2), (2.1, 2.2)])], 

228 } 

229 gdf = gpd.GeoDataFrame(data) 

230 

231 # Should successfully create converter without errors 

232 converter = ShapefileToSwiftConverter(gdf) 

233 result = converter.convert() 

234 

235 # Verify data was properly converted to float64 

236 links = result["Links"] 

237 assert len(links) == 1 

238 

239 # Check that values match expected (all converted to float64) 

240 assert links[0]["Length"] == 1000.0 

241 

242 # Check subareas (DArea2 converted to km²) 

243 subareas = result["SubAreas"] 

244 assert len(subareas) == 1 

245 assert subareas[0]["AreaKm2"] == 0.064 

246 

247 

248def test_invalid_numeric_types() -> None: 

249 """Test that invalid types for SPathLen and DArea2 columns raise appropriate errors.""" 

250 # Test 1: Non-numeric string in SPathLen 

251 data1 = { 

252 "LinkID": [1], 

253 "FromNodeID": [1], 

254 "ToNodeID": [2], 

255 "SPathLen": ["not-a-number"], # String that can't be converted to float 

256 "DArea2": [5000000.0], 

257 "geometry": [LineString([(1.1, 1.2), (2.1, 2.2)])], 

258 } 

259 gdf1 = gpd.GeoDataFrame(data1) 

260 with pytest.raises(TypeError): 

261 ShapefileToSwiftConverter(gdf1) 

262 # Test 2: Boolean in DArea2 

263 data2 = { 

264 "LinkID": [1], 

265 "FromNodeID": [1], 

266 "ToNodeID": [2], 

267 "SPathLen": [1000.0], 

268 "DArea2": [True], # Boolean value 

269 "geometry": [LineString([(1.1, 1.2), (2.1, 2.2)])], 

270 } 

271 gdf2 = gpd.GeoDataFrame(data2) 

272 with pytest.raises(TypeError): 

273 ShapefileToSwiftConverter(gdf2) 

274 # Test 3: Object with mixed types 

275 data3 = { 

276 "LinkID": [1], 

277 "FromNodeID": [1], 

278 "ToNodeID": [2], 

279 "SPathLen": [1000.0], 

280 "DArea2": [pd.Series([1, 2, 3])], # A pandas Series object 

281 "geometry": [LineString([(1.1, 1.2), (2.1, 2.2)])], 

282 } 

283 gdf3 = gpd.GeoDataFrame(data3) 

284 with pytest.raises(TypeError): 

285 ShapefileToSwiftConverter(gdf3) 

286 # Test 4: Date/time object 

287 data4 = { 

288 "LinkID": [1], 

289 "FromNodeID": [1], 

290 "ToNodeID": [2], 

291 "SPathLen": [np.datetime64("2023-01-01")], # Datetime object 

292 "DArea2": [5000000.0], 

293 "geometry": [LineString([(1.1, 1.2), (2.1, 2.2)])], 

294 } 

295 gdf4 = gpd.GeoDataFrame(data4) 

296 

297 with pytest.raises(TypeError): 

298 ShapefileToSwiftConverter(gdf4) 

299 

300def test_element_names() -> None: 

301 """Test default/custom names for the elements.""" 

302 # Create test data, first without custom names 

303 data = { 

304 "LinkID": [1, 2, 3, 4, 5], 

305 "FromNodeID": [2, 3, 4, 5, 6], 

306 "ToNodeID": [1, 2, 2, 2, 5], 

307 "SPathLen": [1000.0, 1500.0, 2000.0, 800.0, 1200.0], 

308 "DArea2": [3000000.0, 4000000.0, 2500000.0, -1.0, 3500000.0], # Link 4 has negative area 

309 "geometry": [ 

310 LineString([(2.1, 2.2), (1.1, 1.2)]), # Link 1: node 2 -> node 1 

311 LineString([(3.1, 3.2), (2.1, 2.2)]), # Link 2: node 3 -> node 2 

312 LineString([(4.1, 4.2), (2.1, 2.2)]), # Link 3: node 4 -> node 2 

313 LineString([(5.1, 5.2), (2.1, 2.2)]), # Link 4: node 5 -> node 2 

314 LineString([(6.1, 6.2), (5.1, 5.2)]), # Link 5: node 6 -> node 5 

315 ], 

316 } 

317 gdf = gpd.GeoDataFrame(data) 

318 

319 # Test default names 

320 converter = ShapefileToSwiftConverter(gdf) 

321 result = converter.convert() 

322 

323 # Default names for links 

324 assert all(link["Name"] == str(link["ID"]) for link in result["Links"]) 

325 

326 # Default names for nodes 

327 assert all(node["Name"] == f"Node_{node['ID']}" for node in result["Nodes"]) 

328 

329 # Default names for subareas 

330 assert all(subarea["Name"] == f"Subarea_{subarea['ID']}" for subarea in result["SubAreas"]) 

331 

332 # Custom names for links 

333 custom_linkname_fieldname = "LinkName" 

334 data[custom_linkname_fieldname] = [f"CustomLinkName_{i}" for i in range(5)] 

335 gdf = gpd.GeoDataFrame(data) 

336 

337 # Custom names for subareas 

338 custom_subarea_name_fieldname = "SubAreaName" 

339 data[custom_subarea_name_fieldname] = [f"CustomSubAreaName_{i}" for i in range(5)] 

340 gdf = gpd.GeoDataFrame(data) 

341 

342 # Custom node names 

343 some_dict = {str(i): f"CustomNodeName_{i}" for i in range(1, 7)} 

344 

345 converter = ShapefileToSwiftConverter( 

346 gdf, 

347 linkname_field=custom_linkname_fieldname, 

348 subarea_name_field=custom_subarea_name_fieldname, 

349 node_names=some_dict, 

350 ) 

351 result = converter.convert() 

352 

353 # Assertions for custom link names 

354 assert all(link["Name"] == f"CustomLinkName_{int(link['ID'])-1}" for link in result["Links"]) 

355 

356 # Assertions for custom node names 

357 assert all(node["Name"] == f"CustomNodeName_{node['ID']}" for node in result["Nodes"]) 

358 

359 # Assertions for custom subarea names 

360 assert all(subarea["Name"] == f"CustomSubAreaName_{int(subarea['ID'])-1}" for subarea in result["SubAreas"])