Coverage for src/geosdhydro/_internal/swift.py: 96.73%
125 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-09-10 21:20 +1000
« prev ^ index » next coverage.py v7.10.0, created at 2025-09-10 21:20 +1000
1"""Convert shapefile data to SWIFT JSON catchment structure."""
3import json
4from typing import Any, Dict, List, Optional, Tuple # noqa: UP035
6import geopandas as gpd
7import pandas as pd
9_default_linkid_field = "LinkID"
10_default_fromnodeid_field = "FromNodeID"
11_default_tonodeid_field = "ToNodeID"
12_default_spathlen_field = "SPathLen"
13_default_darea2_field = "DArea2"
14_default_geometry_field = "geometry"
18# List of dtypes known to be safely convertible to float64
19_safe_dtypes = [
20 # we leave out 8 bit rep for ints, too small range in this case
21 "float16", "float32", "float64", # Float types
22 "int16", "int32", # Integer types that fit in float64
23 "uint16", "uint32", # Unsigned int types that fit
24 # we may loose precision for very large int64/uint64 values,
25 # but at values > 2**53, so we allow them
26 "int64", "uint64", # These might lose precision for very large values
27]
29def _is_convertible_to_float64(df:pd.DataFrame, column_name:str) -> bool:
30 """Check if column has a dtype that is known to be safely convertible to float64.
32 Args:
33 df: DataFrame or GeoDataFrame containing the column
34 column_name: Name of the column to check
36 Returns:
37 bool: True if the column has a compatible dtype for float64 conversion
38 """
39 if column_name not in df.columns: 39 ↛ 40line 39 didn't jump to line 40 because the condition on line 39 was never true
40 return False
42 # Check if column dtype is in our safe list
43 return pd.api.types.is_dtype_equal(df[column_name].dtype, "float64") or \
44 any(pd.api.types.is_dtype_equal(df[column_name].dtype, dtype) for dtype in _safe_dtypes)
46class ShapefileToSwiftConverter:
47 """Converts shapefile data to SWIFT JSON catchment structure."""
49 def __init__(
50 self,
51 gdf: gpd.GeoDataFrame,
52 include_coordinates: bool = False, # noqa: FBT001, FBT002
53 linkid_field: str = "LinkID",
54 fromnodeid_field: str = "FromNodeID",
55 tonodeid_field: str = "ToNodeID",
56 spathlen_field: str = "SPathLen",
57 darea2_field: str = "DArea2",
58 geometry_field: str = "geometry",
59 linkname_field: Optional[str] = None,
60 subarea_name_field: Optional[str] = None,
61 node_names: Optional[Dict[str,str]] = None,
62 ):
63 """Initialize converter with geopandas dataframe.
65 Args:
66 gdf: GeoDataFrame loaded from shapefile containing link data
67 include_coordinates: Whether to include lat/lon in node definitions
68 linkid_field: Name of the column containing Link IDs
69 fromnodeid_field: Name of the column containing From Node IDs
70 tonodeid_field: Name of the column containing To Node IDs
71 spathlen_field: Name of the column containing Stream Path Lengths (in meters)
72 darea2_field: Name of the column containing Subarea Drainage Area (in square meters)
73 geometry_field: Name of the column containing geometry data
74 linkname_field: Name of the column containing Link Names (optional)
75 subarea_name_field: Name of the column containing SubArea Names (optional)
76 node_names: Optional mapping of node IDs to names (optional)
77 """
78 self.gdf = gdf
79 self.include_coordinates = include_coordinates
80 self._linkid_field = linkid_field if linkid_field else _default_linkid_field
81 self._fromnodeid_field = fromnodeid_field if fromnodeid_field else _default_fromnodeid_field
82 self._tonodeid_field = tonodeid_field if tonodeid_field else _default_tonodeid_field
83 self._spathlen_field = spathlen_field if spathlen_field else _default_spathlen_field
84 self._darea2_field = darea2_field if darea2_field else _default_darea2_field
85 self._geometry_field = geometry_field if geometry_field else _default_geometry_field
86 self._linkname_field = linkname_field
87 self._subarea_name_field = subarea_name_field
88 self._node_names = node_names if node_names else {}
89 self._check_geodf()
91 self._runoff_model = {
92 "PercFactor": 2.25,
93 "R": 0.0,
94 "RunoffModelType": "GR4J",
95 "S": 0.0,
96 "SurfaceRunoffRouting": {"SurfaceRunoffRoutingType": "NoRouting"},
97 "UHExponent": 2.5,
98 "x1": 350.0,
99 "x2": 0.0,
100 "x3": 40.0,
101 "x4": 0.5,
102 }
103 self.routing_model = {"ChannelRoutingType": "NoRouting"}
105 @property
106 def gdf(self) -> gpd.GeoDataFrame:
107 """The geodataframe from which we build the json file."""
108 return self._gdf
109 @gdf.setter
110 def gdf(self, value: gpd.GeoDataFrame) -> None:
111 self._gdf = value
113 @property
114 def include_coordinates(self) -> bool:
115 """Should the Latitude/Longitude coordinates be derived from the geometry and written in the json file."""
116 return self._include_coordinates
117 @include_coordinates.setter
118 def include_coordinates(self, value: bool) -> None:
119 self._include_coordinates = value
121 @property
122 def runoff_model(self) -> dict:
123 """Dictionary for the rainfall-runoff model sections of the json file."""
124 return self._runoff_model
125 @runoff_model.setter
126 def runoff_model(self, value:dict) -> None:
127 self._runoff_model = value
129 @property
130 def routing_model(self) -> dict:
131 """Dictionary for the routing model sections of the json file."""
132 return self._routing_model
133 @routing_model.setter
134 def routing_model(self, value:dict) -> None:
135 self._routing_model = value
137 def _check_geodf(self) -> None:
138 """Check the GeoDataFrame for required columns and types."""
139 required_columns_names = [
140 self._linkid_field,
141 self._fromnodeid_field,
142 self._tonodeid_field,
143 self._spathlen_field,
144 self._darea2_field,
145 self._geometry_field,
146 ]
148 if set(required_columns_names).intersection(set(self.gdf.columns)) != set(required_columns_names):
149 raise ValueError(f"The GeoDataFrame does not contain all the required columns: {required_columns_names}")
151 # IDs should be strings, even if legacy are ints.
152 self.gdf[self._linkid_field] = self.gdf[self._linkid_field].astype(str)
153 self.gdf[self._fromnodeid_field] = self.gdf[self._fromnodeid_field].astype(str)
154 self.gdf[self._tonodeid_field] = self.gdf[self._tonodeid_field].astype(str)
156 # TODO test geometry column, but I could not figure out how.
157 # self._geometry_field: gpd.array.GeometryDtype,
158 # Check numeric columns
159 numeric_columns = [self._spathlen_field, self._darea2_field]
160 for column in numeric_columns:
161 if not _is_convertible_to_float64(self.gdf, column):
162 raise TypeError(f"Column '{column}' has type {self.gdf[column].dtype} which cannot be safely converted to float64. Supported types are: {_safe_dtypes}.")
164 # Convert to float64 if not already
165 if self.gdf[column].dtype != "float64":
166 self.gdf[column] = self.gdf[column].astype("float64")
169 # Check for duplicate LinkID values
170 link_id_counts = self.gdf[self._linkid_field].value_counts()
171 duplicates = link_id_counts[link_id_counts > 1]
172 if not duplicates.empty:
173 duplicate_indices = self.gdf[self.gdf[self._linkid_field].isin(duplicates.index)].index.tolist()
174 raise ValueError(f"Column 'LinkID' contains duplicate values: {duplicates.index.tolist()} at indices {duplicate_indices}.")
177 def convert(self) -> Dict[str, Any]:
178 """Convert shapefile data to SWIFT JSON format.
180 Returns:
181 Dictionary containing Links, Nodes, and SubAreas sections
182 """
183 return {"Links": self._create_links(), "Nodes": self._create_nodes(), "SubAreas": self._create_subareas()}
185 def save_to_file(self, filepath: str, indent: int = 2) -> None:
186 """Save converted data to JSON file.
188 Args:
189 filepath: Path where to save the JSON file
190 indent: Number of spaces for JSON indentation (default: 2)
191 """
192 with open(filepath, "w") as f:
193 json.dump(self.convert(), f, indent=indent)
195 def _create_links(self) -> List[Dict[str, Any]]:
196 """Create links section of JSON from dataframe."""
197 links = []
198 linkname_field = self._linkname_field if self._linkname_field else self._linkid_field
199 for _, row in self.gdf.iterrows():
200 link = {
201 "ChannelRouting": self.routing_model,
202 "DownstreamNodeID": str(row[self._tonodeid_field]),
203 "ID": str(row[self._linkid_field]),
204 "Length": float(row[self._spathlen_field]),
205 "ManningsN": 1.0,
206 "Name": str(row[linkname_field]),
207 "Slope": 1.0,
208 "UpstreamNodeID": str(row[self._fromnodeid_field]),
209 "f": 1.0,
210 }
211 links.append(link)
212 return links
214 def _get_node_coordinates(self) -> Dict[int, Tuple[float, float]]:
215 """Extract node coordinates from geometry data.
217 Returns:
218 Dictionary mapping node_id to (longitude, latitude) tuple
219 """
220 node_coords = {}
221 for _, row in self.gdf.iterrows():
222 geom = row[self._geometry_field]
223 coords = list(geom.coords)
225 # Start point for FromNodeID
226 start_lon, start_lat = coords[0]
227 node_coords[row[self._fromnodeid_field]] = (start_lon, start_lat)
229 # End point for ToNodeID
230 end_lon, end_lat = coords[-1]
231 node_coords[row[self._tonodeid_field]] = (end_lon, end_lat)
233 return node_coords
236 def _node_name(self, node_id:str) -> str:
237 """Generate a name for a node based on its ID."""
238 if self._node_name and node_id in self._node_names:
239 return self._node_names[node_id]
240 return f"Node_{node_id}"
242 def _create_nodes(self) -> List[Dict[str, Any]]:
243 """Create nodes section of JSON from dataframe."""
244 from_nodes = set(self.gdf[self._fromnodeid_field])
245 to_nodes = set(self.gdf[self._tonodeid_field])
246 unique_nodes = from_nodes.union(to_nodes)
248 # Get coordinates if requested
249 node_coords = self._get_node_coordinates() if self.include_coordinates else {}
251 nodes = []
252 for node_id in sorted(unique_nodes):
253 node:Dict[str,Any] = {
254 "ErrorCorrection": {"ErrorCorrectionType": "NoErrorCorrection"},
255 "ID": str(node_id),
256 "Name": self._node_name(node_id),
257 "Reservoir": {"ReservoirType": "NoReservoir"},
258 }
260 # Add coordinates if available
261 if self.include_coordinates and node_id in node_coords:
262 lon, lat = node_coords[node_id]
263 node["Longitude"] = lon
264 node["Latitude"] = lat
266 nodes.append(node)
267 return nodes
269 def _create_subareas(self) -> List[Dict[str, Any]]:
270 """Create subareas section of JSON from dataframe."""
271 subareas = []
272 has_name_field = self._subarea_name_field is not None
273 def subarea_name(row:pd.Series) -> str:
274 if has_name_field:
275 return str(row[self._subarea_name_field])
276 return f"Subarea_{row[self._linkid_field]}"
278 for _, row in self.gdf.iterrows():
279 if row[self._darea2_field] > 0:
280 subarea = {
281 "AreaKm2": float(row[self._darea2_field]) / 1_000_000,
282 "ID": str(row[self._linkid_field]),
283 "LinkID": str(row[self._linkid_field]),
284 "Name": subarea_name(row),
285 "RunoffModel": self.runoff_model,
286 }
287 subareas.append(subarea)
288 return subareas