Coverage for src/geosdhydro/_internal/swift.py: 96.73%

125 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-09-10 21:20 +1000

1"""Convert shapefile data to SWIFT JSON catchment structure.""" 

2 

3import json 

4from typing import Any, Dict, List, Optional, Tuple # noqa: UP035 

5 

6import geopandas as gpd 

7import pandas as pd 

8 

9_default_linkid_field = "LinkID" 

10_default_fromnodeid_field = "FromNodeID" 

11_default_tonodeid_field = "ToNodeID" 

12_default_spathlen_field = "SPathLen" 

13_default_darea2_field = "DArea2" 

14_default_geometry_field = "geometry" 

15 

16 

17 

18# List of dtypes known to be safely convertible to float64 

19_safe_dtypes = [ 

20 # we leave out 8 bit rep for ints, too small range in this case 

21 "float16", "float32", "float64", # Float types 

22 "int16", "int32", # Integer types that fit in float64 

23 "uint16", "uint32", # Unsigned int types that fit 

24 # we may loose precision for very large int64/uint64 values, 

25 # but at values > 2**53, so we allow them 

26 "int64", "uint64", # These might lose precision for very large values 

27] 

28 

29def _is_convertible_to_float64(df:pd.DataFrame, column_name:str) -> bool: 

30 """Check if column has a dtype that is known to be safely convertible to float64. 

31 

32 Args: 

33 df: DataFrame or GeoDataFrame containing the column 

34 column_name: Name of the column to check 

35 

36 Returns: 

37 bool: True if the column has a compatible dtype for float64 conversion 

38 """ 

39 if column_name not in df.columns: 39 ↛ 40line 39 didn't jump to line 40 because the condition on line 39 was never true

40 return False 

41 

42 # Check if column dtype is in our safe list 

43 return pd.api.types.is_dtype_equal(df[column_name].dtype, "float64") or \ 

44 any(pd.api.types.is_dtype_equal(df[column_name].dtype, dtype) for dtype in _safe_dtypes) 

45 

46class ShapefileToSwiftConverter: 

47 """Converts shapefile data to SWIFT JSON catchment structure.""" 

48 

49 def __init__( 

50 self, 

51 gdf: gpd.GeoDataFrame, 

52 include_coordinates: bool = False, # noqa: FBT001, FBT002 

53 linkid_field: str = "LinkID", 

54 fromnodeid_field: str = "FromNodeID", 

55 tonodeid_field: str = "ToNodeID", 

56 spathlen_field: str = "SPathLen", 

57 darea2_field: str = "DArea2", 

58 geometry_field: str = "geometry", 

59 linkname_field: Optional[str] = None, 

60 subarea_name_field: Optional[str] = None, 

61 node_names: Optional[Dict[str,str]] = None, 

62 ): 

63 """Initialize converter with geopandas dataframe. 

64 

65 Args: 

66 gdf: GeoDataFrame loaded from shapefile containing link data 

67 include_coordinates: Whether to include lat/lon in node definitions 

68 linkid_field: Name of the column containing Link IDs 

69 fromnodeid_field: Name of the column containing From Node IDs 

70 tonodeid_field: Name of the column containing To Node IDs 

71 spathlen_field: Name of the column containing Stream Path Lengths (in meters) 

72 darea2_field: Name of the column containing Subarea Drainage Area (in square meters) 

73 geometry_field: Name of the column containing geometry data 

74 linkname_field: Name of the column containing Link Names (optional) 

75 subarea_name_field: Name of the column containing SubArea Names (optional) 

76 node_names: Optional mapping of node IDs to names (optional) 

77 """ 

78 self.gdf = gdf 

79 self.include_coordinates = include_coordinates 

80 self._linkid_field = linkid_field if linkid_field else _default_linkid_field 

81 self._fromnodeid_field = fromnodeid_field if fromnodeid_field else _default_fromnodeid_field 

82 self._tonodeid_field = tonodeid_field if tonodeid_field else _default_tonodeid_field 

83 self._spathlen_field = spathlen_field if spathlen_field else _default_spathlen_field 

84 self._darea2_field = darea2_field if darea2_field else _default_darea2_field 

85 self._geometry_field = geometry_field if geometry_field else _default_geometry_field 

86 self._linkname_field = linkname_field 

87 self._subarea_name_field = subarea_name_field 

88 self._node_names = node_names if node_names else {} 

89 self._check_geodf() 

90 

91 self._runoff_model = { 

92 "PercFactor": 2.25, 

93 "R": 0.0, 

94 "RunoffModelType": "GR4J", 

95 "S": 0.0, 

96 "SurfaceRunoffRouting": {"SurfaceRunoffRoutingType": "NoRouting"}, 

97 "UHExponent": 2.5, 

98 "x1": 350.0, 

99 "x2": 0.0, 

100 "x3": 40.0, 

101 "x4": 0.5, 

102 } 

103 self.routing_model = {"ChannelRoutingType": "NoRouting"} 

104 

105 @property 

106 def gdf(self) -> gpd.GeoDataFrame: 

107 """The geodataframe from which we build the json file.""" 

108 return self._gdf 

109 @gdf.setter 

110 def gdf(self, value: gpd.GeoDataFrame) -> None: 

111 self._gdf = value 

112 

113 @property 

114 def include_coordinates(self) -> bool: 

115 """Should the Latitude/Longitude coordinates be derived from the geometry and written in the json file.""" 

116 return self._include_coordinates 

117 @include_coordinates.setter 

118 def include_coordinates(self, value: bool) -> None: 

119 self._include_coordinates = value 

120 

121 @property 

122 def runoff_model(self) -> dict: 

123 """Dictionary for the rainfall-runoff model sections of the json file.""" 

124 return self._runoff_model 

125 @runoff_model.setter 

126 def runoff_model(self, value:dict) -> None: 

127 self._runoff_model = value 

128 

129 @property 

130 def routing_model(self) -> dict: 

131 """Dictionary for the routing model sections of the json file.""" 

132 return self._routing_model 

133 @routing_model.setter 

134 def routing_model(self, value:dict) -> None: 

135 self._routing_model = value 

136 

137 def _check_geodf(self) -> None: 

138 """Check the GeoDataFrame for required columns and types.""" 

139 required_columns_names = [ 

140 self._linkid_field, 

141 self._fromnodeid_field, 

142 self._tonodeid_field, 

143 self._spathlen_field, 

144 self._darea2_field, 

145 self._geometry_field, 

146 ] 

147 

148 if set(required_columns_names).intersection(set(self.gdf.columns)) != set(required_columns_names): 

149 raise ValueError(f"The GeoDataFrame does not contain all the required columns: {required_columns_names}") 

150 

151 # IDs should be strings, even if legacy are ints. 

152 self.gdf[self._linkid_field] = self.gdf[self._linkid_field].astype(str) 

153 self.gdf[self._fromnodeid_field] = self.gdf[self._fromnodeid_field].astype(str) 

154 self.gdf[self._tonodeid_field] = self.gdf[self._tonodeid_field].astype(str) 

155 

156 # TODO test geometry column, but I could not figure out how. 

157 # self._geometry_field: gpd.array.GeometryDtype, 

158 # Check numeric columns 

159 numeric_columns = [self._spathlen_field, self._darea2_field] 

160 for column in numeric_columns: 

161 if not _is_convertible_to_float64(self.gdf, column): 

162 raise TypeError(f"Column '{column}' has type {self.gdf[column].dtype} which cannot be safely converted to float64. Supported types are: {_safe_dtypes}.") 

163 

164 # Convert to float64 if not already 

165 if self.gdf[column].dtype != "float64": 

166 self.gdf[column] = self.gdf[column].astype("float64") 

167 

168 

169 # Check for duplicate LinkID values 

170 link_id_counts = self.gdf[self._linkid_field].value_counts() 

171 duplicates = link_id_counts[link_id_counts > 1] 

172 if not duplicates.empty: 

173 duplicate_indices = self.gdf[self.gdf[self._linkid_field].isin(duplicates.index)].index.tolist() 

174 raise ValueError(f"Column 'LinkID' contains duplicate values: {duplicates.index.tolist()} at indices {duplicate_indices}.") 

175 

176 

177 def convert(self) -> Dict[str, Any]: 

178 """Convert shapefile data to SWIFT JSON format. 

179 

180 Returns: 

181 Dictionary containing Links, Nodes, and SubAreas sections 

182 """ 

183 return {"Links": self._create_links(), "Nodes": self._create_nodes(), "SubAreas": self._create_subareas()} 

184 

185 def save_to_file(self, filepath: str, indent: int = 2) -> None: 

186 """Save converted data to JSON file. 

187 

188 Args: 

189 filepath: Path where to save the JSON file 

190 indent: Number of spaces for JSON indentation (default: 2) 

191 """ 

192 with open(filepath, "w") as f: 

193 json.dump(self.convert(), f, indent=indent) 

194 

195 def _create_links(self) -> List[Dict[str, Any]]: 

196 """Create links section of JSON from dataframe.""" 

197 links = [] 

198 linkname_field = self._linkname_field if self._linkname_field else self._linkid_field 

199 for _, row in self.gdf.iterrows(): 

200 link = { 

201 "ChannelRouting": self.routing_model, 

202 "DownstreamNodeID": str(row[self._tonodeid_field]), 

203 "ID": str(row[self._linkid_field]), 

204 "Length": float(row[self._spathlen_field]), 

205 "ManningsN": 1.0, 

206 "Name": str(row[linkname_field]), 

207 "Slope": 1.0, 

208 "UpstreamNodeID": str(row[self._fromnodeid_field]), 

209 "f": 1.0, 

210 } 

211 links.append(link) 

212 return links 

213 

214 def _get_node_coordinates(self) -> Dict[int, Tuple[float, float]]: 

215 """Extract node coordinates from geometry data. 

216 

217 Returns: 

218 Dictionary mapping node_id to (longitude, latitude) tuple 

219 """ 

220 node_coords = {} 

221 for _, row in self.gdf.iterrows(): 

222 geom = row[self._geometry_field] 

223 coords = list(geom.coords) 

224 

225 # Start point for FromNodeID 

226 start_lon, start_lat = coords[0] 

227 node_coords[row[self._fromnodeid_field]] = (start_lon, start_lat) 

228 

229 # End point for ToNodeID 

230 end_lon, end_lat = coords[-1] 

231 node_coords[row[self._tonodeid_field]] = (end_lon, end_lat) 

232 

233 return node_coords 

234 

235 

236 def _node_name(self, node_id:str) -> str: 

237 """Generate a name for a node based on its ID.""" 

238 if self._node_name and node_id in self._node_names: 

239 return self._node_names[node_id] 

240 return f"Node_{node_id}" 

241 

242 def _create_nodes(self) -> List[Dict[str, Any]]: 

243 """Create nodes section of JSON from dataframe.""" 

244 from_nodes = set(self.gdf[self._fromnodeid_field]) 

245 to_nodes = set(self.gdf[self._tonodeid_field]) 

246 unique_nodes = from_nodes.union(to_nodes) 

247 

248 # Get coordinates if requested 

249 node_coords = self._get_node_coordinates() if self.include_coordinates else {} 

250 

251 nodes = [] 

252 for node_id in sorted(unique_nodes): 

253 node:Dict[str,Any] = { 

254 "ErrorCorrection": {"ErrorCorrectionType": "NoErrorCorrection"}, 

255 "ID": str(node_id), 

256 "Name": self._node_name(node_id), 

257 "Reservoir": {"ReservoirType": "NoReservoir"}, 

258 } 

259 

260 # Add coordinates if available 

261 if self.include_coordinates and node_id in node_coords: 

262 lon, lat = node_coords[node_id] 

263 node["Longitude"] = lon 

264 node["Latitude"] = lat 

265 

266 nodes.append(node) 

267 return nodes 

268 

269 def _create_subareas(self) -> List[Dict[str, Any]]: 

270 """Create subareas section of JSON from dataframe.""" 

271 subareas = [] 

272 has_name_field = self._subarea_name_field is not None 

273 def subarea_name(row:pd.Series) -> str: 

274 if has_name_field: 

275 return str(row[self._subarea_name_field]) 

276 return f"Subarea_{row[self._linkid_field]}" 

277 

278 for _, row in self.gdf.iterrows(): 

279 if row[self._darea2_field] > 0: 

280 subarea = { 

281 "AreaKm2": float(row[self._darea2_field]) / 1_000_000, 

282 "ID": str(row[self._linkid_field]), 

283 "LinkID": str(row[self._linkid_field]), 

284 "Name": subarea_name(row), 

285 "RunoffModel": self.runoff_model, 

286 } 

287 subareas.append(subarea) 

288 return subareas