|
|
import time
|
|
|
from pathlib import Path
|
|
|
|
|
|
import numpy as np
|
|
|
import xarray as xr
|
|
|
from scipy.special import ndtri
|
|
|
|
|
|
from pythermogis.transmissivity.calculate_thick_perm_trans import (
|
|
|
calculate_transmissivity,
|
|
|
)
|
|
|
from pythermogis.workflow.utc.configuration import UTCConfiguration
|
|
|
from pythermogis.workflow.utc.doublet import (
|
|
|
DoubletInput,
|
|
|
DoubletOutput,
|
|
|
calculate_doublet_performance,
|
|
|
)
|
|
|
|
|
|
OUTPUT_NAMES = list(DoubletOutput.__annotations__.keys())
|
|
|
NAN_OUTPUTS = DoubletOutput(*[np.nan] * len(OUTPUT_NAMES)) # type: ignore[arg-type]
|
|
|
ZERO_OUTPUTS = DoubletOutput(*[0.0] * len(OUTPUT_NAMES)) # type: ignore[arg-type]
|
|
|
|
|
|
|
|
|
def run_utc_workflow(config: UTCConfiguration) -> xr.DataTree:
|
|
|
root = xr.DataTree()
|
|
|
base = Path(config.input_data_dir)
|
|
|
|
|
|
temperature_model = xr.open_dataset(
|
|
|
base / "NetherlandsTemperatureModel_extended.nc"
|
|
|
).transpose("y", "x", "z")
|
|
|
|
|
|
for aquifer in config.aquifers:
|
|
|
aquifer_ds = xr.Dataset()
|
|
|
|
|
|
for gridinfo in config.property_grid_infos:
|
|
|
if gridinfo.optional:
|
|
|
continue
|
|
|
|
|
|
filename = f"{aquifer}{gridinfo.postfix}"
|
|
|
fullpath = base / filename
|
|
|
|
|
|
if not fullpath.exists():
|
|
|
raise FileNotFoundError(f"Required grid not found: {fullpath}")
|
|
|
|
|
|
ds = xr.open_dataset(fullpath)
|
|
|
|
|
|
varnames = [v for v in ds.data_vars if v != "oblique_stereographic"]
|
|
|
if len(varnames) != 1:
|
|
|
raise ValueError(
|
|
|
f"Expected exactly one property variable in"
|
|
|
f" {fullpath}, found {varnames}"
|
|
|
)
|
|
|
|
|
|
varname = varnames[0]
|
|
|
|
|
|
aquifer_ds[gridinfo.name] = ds[varname]
|
|
|
|
|
|
root[aquifer] = xr.DataTree(dataset=aquifer_ds, name=aquifer)
|
|
|
|
|
|
for aquifer in root.children:
|
|
|
print(f"\nProcessing aquifer: {aquifer}")
|
|
|
|
|
|
start = time.perf_counter()
|
|
|
|
|
|
aquifer_ds = root[aquifer].to_dataset()
|
|
|
updated_ds = compute_results_for_aquifer(
|
|
|
aquifer_ds, aquifer, config, temperature_model
|
|
|
)
|
|
|
root[aquifer] = xr.DataTree(dataset=updated_ds, name=aquifer)
|
|
|
|
|
|
end = time.perf_counter()
|
|
|
elapsed = end - start
|
|
|
|
|
|
print(f"Time required for aquifer {aquifer}: {elapsed:.2f} seconds.")
|
|
|
|
|
|
if config.results_dir != "":
|
|
|
zarr_path = f"{config.results_dir}/ROSL_ROSLU_test.zarr"
|
|
|
root.to_zarr(zarr_path, mode="w")
|
|
|
|
|
|
return root
|
|
|
|
|
|
|
|
|
def compute_results_for_aquifer(
|
|
|
aquifer_ds: xr.Dataset,
|
|
|
aquifer_name: str,
|
|
|
config: UTCConfiguration,
|
|
|
temperature_model: xr.Dataset,
|
|
|
):
|
|
|
# apply hc mask
|
|
|
hc_mask = aquifer_ds["hc_accum"] != 0
|
|
|
vars_to_mask = [
|
|
|
"thickness",
|
|
|
"ntg",
|
|
|
"depth",
|
|
|
"porosity",
|
|
|
]
|
|
|
|
|
|
for v in vars_to_mask:
|
|
|
aquifer_ds[v] = aquifer_ds[v].where(hc_mask)
|
|
|
|
|
|
# calc temperature grid
|
|
|
mid_depth = -(aquifer_ds["depth"] + 0.5 * aquifer_ds["thickness"])
|
|
|
temp_xy_aligned = temperature_model["temperature"].interp(
|
|
|
x=aquifer_ds.x, y=aquifer_ds.y, method="linear"
|
|
|
)
|
|
|
temperature = temp_xy_aligned.interp(z=mid_depth, method="linear")
|
|
|
|
|
|
aquifer_ds["temperature"] = temperature
|
|
|
|
|
|
for p_value in config.p_values:
|
|
|
# calc transmissivity(_with_ntg)
|
|
|
stoch_results = apply_stochastics(aquifer_ds, p_value)
|
|
|
aquifer_ds["transmissivity"] = stoch_results["transmissivity"]
|
|
|
aquifer_ds["transmissivity_with_ntg"] = stoch_results["transmissivity_with_ntg"]
|
|
|
aquifer_ds["thickness"] = stoch_results["thickness"]
|
|
|
aquifer_ds["permeability"] = stoch_results["permeability"]
|
|
|
|
|
|
results = xr.apply_ufunc(
|
|
|
cell_calculation,
|
|
|
1.0e30,
|
|
|
aquifer_ds["thickness"],
|
|
|
aquifer_ds["transmissivity"],
|
|
|
aquifer_ds["transmissivity_with_ntg"],
|
|
|
aquifer_ds["ntg"],
|
|
|
aquifer_ds["depth"],
|
|
|
aquifer_ds["porosity"],
|
|
|
aquifer_ds["temperature"],
|
|
|
kwargs={"config": config},
|
|
|
input_core_dims=[[]] * 8,
|
|
|
output_core_dims=[[] for _ in OUTPUT_NAMES],
|
|
|
vectorize=True,
|
|
|
dask="parallelized",
|
|
|
output_dtypes=[np.float64] * len(OUTPUT_NAMES),
|
|
|
)
|
|
|
|
|
|
for name, result in zip(OUTPUT_NAMES, results, strict=True):
|
|
|
aquifer_ds[f"{name}_p{p_value}"] = result
|
|
|
|
|
|
aquifer_ds.compute()
|
|
|
|
|
|
print(f"Computed results for {aquifer_name}")
|
|
|
|
|
|
return aquifer_ds
|
|
|
|
|
|
|
|
|
def apply_stochastics(
|
|
|
aquifer_ds: xr.Dataset,
|
|
|
p_value: float,
|
|
|
n_samples: int = 10_000,
|
|
|
) -> xr.Dataset:
|
|
|
|
|
|
q = 1.0 - p_value / 100.0
|
|
|
z = ndtri(q)
|
|
|
|
|
|
thickness_p, permeability_p, transmissivity_p = calculate_transmissivity(
|
|
|
aquifer_ds["thickness"].values,
|
|
|
aquifer_ds["thickness_sd"].values,
|
|
|
np.log(aquifer_ds["permeability"].values),
|
|
|
aquifer_ds["permeability_lnsd"].values,
|
|
|
q,
|
|
|
z,
|
|
|
n_samples,
|
|
|
)
|
|
|
|
|
|
transmissivity_with_ntg = transmissivity_p * aquifer_ds["ntg"] / 1000.0
|
|
|
transmissivity_with_ntg = transmissivity_with_ntg.where(
|
|
|
transmissivity_with_ntg >= 0
|
|
|
)
|
|
|
|
|
|
return xr.Dataset(
|
|
|
{
|
|
|
"thickness": (aquifer_ds["thickness"].dims, thickness_p),
|
|
|
"permeability": (aquifer_ds["thickness"].dims, permeability_p),
|
|
|
"transmissivity": (aquifer_ds["thickness"].dims, transmissivity_p),
|
|
|
"transmissivity_with_ntg": transmissivity_with_ntg,
|
|
|
},
|
|
|
coords=aquifer_ds.coords,
|
|
|
)
|
|
|
|
|
|
|
|
|
def cell_calculation(
|
|
|
unknown_input_value: float,
|
|
|
thickness: float,
|
|
|
transmissivity: float,
|
|
|
transmissivity_with_ntg: float,
|
|
|
ntg: float,
|
|
|
depth: float,
|
|
|
porosity: float,
|
|
|
temperature: float,
|
|
|
config: UTCConfiguration,
|
|
|
) -> DoubletOutput:
|
|
|
inputs = [
|
|
|
unknown_input_value,
|
|
|
thickness,
|
|
|
transmissivity,
|
|
|
transmissivity_with_ntg,
|
|
|
ntg,
|
|
|
depth,
|
|
|
porosity,
|
|
|
temperature,
|
|
|
]
|
|
|
if any(np.isnan(v) for v in inputs):
|
|
|
return NAN_OUTPUTS
|
|
|
|
|
|
if transmissivity_with_ntg < config.kh_cutoff:
|
|
|
return ZERO_OUTPUTS
|
|
|
|
|
|
doublet_input = DoubletInput(
|
|
|
unknown_input_value=unknown_input_value,
|
|
|
thickness=thickness,
|
|
|
transmissivity=transmissivity,
|
|
|
transmissivity_with_ntg=transmissivity_with_ntg,
|
|
|
ntg=ntg,
|
|
|
depth=depth,
|
|
|
porosity=porosity / 100,
|
|
|
temperature=temperature,
|
|
|
)
|
|
|
|
|
|
try:
|
|
|
result = calculate_doublet_performance(config, doublet_input)
|
|
|
except ValueError:
|
|
|
return NAN_OUTPUTS
|
|
|
|
|
|
if result is None:
|
|
|
return NAN_OUTPUTS
|
|
|
|
|
|
if result.utc > 1000:
|
|
|
result = result._replace(utc=1000)
|
|
|
|
|
|
return result |