Loading src/pythermogis/workflow/utc/utc.py +71 −80 Original line number Diff line number Diff line Loading @@ -9,7 +9,15 @@ from pythermogis.transmissivity.calculate_thick_perm_trans import ( calculate_transmissivity, ) from pythermogis.workflow.utc.configuration import UTCConfiguration from pythermogis.workflow.utc.doublet import DoubletInput, calculate_doublet_performance from pythermogis.workflow.utc.doublet import ( DoubletInput, DoubletOutput, calculate_doublet_performance, ) OUTPUT_NAMES = list(DoubletOutput._fields) NAN_OUTPUTS = DoubletOutput(*[np.nan] * len(OUTPUT_NAMES)) ZERO_OUTPUTS = DoubletOutput(*[0.0] * len(OUTPUT_NAMES)) def run_utc_workflow(config: UTCConfiguration) -> xr.DataTree: Loading Loading @@ -77,76 +85,6 @@ def compute_results_for_aquifer( config: UTCConfiguration, temperature_model: xr.Dataset, ): output_names = [ "power", "hppower", "capex", "var_opex", "fixed_opex", "opex", "utc", "npv", "hprod", "cop", "cophp", "pres", "flow", "welld", "breakthrough", "cooling", "production_temp", "injection_temp", ] def cell_calculation( unknown_input_value, thickness, transmissivity, transmissivity_with_ntg, ntg, depth, porosity, temperature, ): inputs = [ unknown_input_value, thickness, transmissivity, transmissivity_with_ntg, ntg, depth, porosity, temperature, ] if any(np.isnan(v) for v in inputs): return tuple(np.nan for _ in output_names) if transmissivity_with_ntg < config.kh_cutoff: return tuple(0 for _ in output_names) doublet_input = DoubletInput( unknown_input_value=float(unknown_input_value), thickness=float(thickness), transmissivity=float(transmissivity), transmissivity_with_ntg=float(transmissivity_with_ntg), ntg=float(ntg), depth=float(depth), porosity=float(porosity / 100), temperature=float(temperature), ) try: result = calculate_doublet_performance(config, doublet_input) except ValueError: return tuple(np.nan for _ in output_names) if result.utc > 1000: result = result._replace(utc=1000) return ( tuple(result) if result is not None else tuple(np.nan for _ in output_names) ) # apply hc mask hc_mask = aquifer_ds["hc_accum"] != 0 vars_to_mask = [ Loading @@ -170,11 +108,11 @@ def compute_results_for_aquifer( for p_value in config.p_values: # calc transmissivity(_with_ntg) hydro = compute_hydro_properties_mc(aquifer_ds, p_value) aquifer_ds["transmissivity"] = hydro["transmissivity"] aquifer_ds["transmissivity_with_ntg"] = hydro["transmissivity_with_ntg"] aquifer_ds["thickness"] = hydro["thickness"] aquifer_ds["permeability"] = hydro["permeability"] stoch_results = apply_stochastics(aquifer_ds, p_value) aquifer_ds["transmissivity"] = stoch_results["transmissivity"] aquifer_ds["transmissivity_with_ntg"] = stoch_results["transmissivity_with_ntg"] aquifer_ds["thickness"] = stoch_results["thickness"] aquifer_ds["permeability"] = stoch_results["permeability"] results = xr.apply_ufunc( cell_calculation, Loading @@ -186,14 +124,15 @@ def compute_results_for_aquifer( aquifer_ds["depth"], aquifer_ds["porosity"], aquifer_ds["temperature"], kwargs={"config": config}, input_core_dims=[[]] * 8, output_core_dims=[[] for _ in output_names], output_core_dims=[[] for _ in OUTPUT_NAMES], vectorize=True, dask="parallelized", output_dtypes=[np.float64] * len(output_names), output_dtypes=[np.float64] * len(OUTPUT_NAMES), ) for name, result in zip(output_names, results, strict=True): for name, result in zip(OUTPUT_NAMES, results, strict=True): aquifer_ds[f"{name}_p{p_value}"] = result aquifer_ds.compute() Loading @@ -203,7 +142,7 @@ def compute_results_for_aquifer( return aquifer_ds def compute_hydro_properties_mc( def apply_stochastics( aquifer_ds: xr.Dataset, p_value: float, n_samples: int = 10_000, Loading Loading @@ -239,3 +178,55 @@ def compute_hydro_properties_mc( }, coords=aquifer_ds.coords, ) def cell_calculation( unknown_input_value: float, thickness: float, transmissivity: float, transmissivity_with_ntg: float, ntg: float, depth: float, porosity: float, temperature: float, config: UTCConfiguration, ) -> DoubletOutput: inputs = [ unknown_input_value, thickness, transmissivity, transmissivity_with_ntg, ntg, depth, porosity, temperature, ] if any(np.isnan(v) for v in inputs): return NAN_OUTPUTS if transmissivity_with_ntg < config.kh_cutoff: return ZERO_OUTPUTS doublet_input = DoubletInput( unknown_input_value=unknown_input_value, thickness=thickness, transmissivity=transmissivity, transmissivity_with_ntg=transmissivity_with_ntg, ntg=ntg, depth=depth, porosity=porosity / 100, temperature=temperature, ) try: result = calculate_doublet_performance(config, doublet_input) except ValueError: return NAN_OUTPUTS if result is None: return NAN_OUTPUTS if result.utc > 1000: result = result._replace(utc=1000) return result Loading
src/pythermogis/workflow/utc/utc.py +71 −80 Original line number Diff line number Diff line Loading @@ -9,7 +9,15 @@ from pythermogis.transmissivity.calculate_thick_perm_trans import ( calculate_transmissivity, ) from pythermogis.workflow.utc.configuration import UTCConfiguration from pythermogis.workflow.utc.doublet import DoubletInput, calculate_doublet_performance from pythermogis.workflow.utc.doublet import ( DoubletInput, DoubletOutput, calculate_doublet_performance, ) OUTPUT_NAMES = list(DoubletOutput._fields) NAN_OUTPUTS = DoubletOutput(*[np.nan] * len(OUTPUT_NAMES)) ZERO_OUTPUTS = DoubletOutput(*[0.0] * len(OUTPUT_NAMES)) def run_utc_workflow(config: UTCConfiguration) -> xr.DataTree: Loading Loading @@ -77,76 +85,6 @@ def compute_results_for_aquifer( config: UTCConfiguration, temperature_model: xr.Dataset, ): output_names = [ "power", "hppower", "capex", "var_opex", "fixed_opex", "opex", "utc", "npv", "hprod", "cop", "cophp", "pres", "flow", "welld", "breakthrough", "cooling", "production_temp", "injection_temp", ] def cell_calculation( unknown_input_value, thickness, transmissivity, transmissivity_with_ntg, ntg, depth, porosity, temperature, ): inputs = [ unknown_input_value, thickness, transmissivity, transmissivity_with_ntg, ntg, depth, porosity, temperature, ] if any(np.isnan(v) for v in inputs): return tuple(np.nan for _ in output_names) if transmissivity_with_ntg < config.kh_cutoff: return tuple(0 for _ in output_names) doublet_input = DoubletInput( unknown_input_value=float(unknown_input_value), thickness=float(thickness), transmissivity=float(transmissivity), transmissivity_with_ntg=float(transmissivity_with_ntg), ntg=float(ntg), depth=float(depth), porosity=float(porosity / 100), temperature=float(temperature), ) try: result = calculate_doublet_performance(config, doublet_input) except ValueError: return tuple(np.nan for _ in output_names) if result.utc > 1000: result = result._replace(utc=1000) return ( tuple(result) if result is not None else tuple(np.nan for _ in output_names) ) # apply hc mask hc_mask = aquifer_ds["hc_accum"] != 0 vars_to_mask = [ Loading @@ -170,11 +108,11 @@ def compute_results_for_aquifer( for p_value in config.p_values: # calc transmissivity(_with_ntg) hydro = compute_hydro_properties_mc(aquifer_ds, p_value) aquifer_ds["transmissivity"] = hydro["transmissivity"] aquifer_ds["transmissivity_with_ntg"] = hydro["transmissivity_with_ntg"] aquifer_ds["thickness"] = hydro["thickness"] aquifer_ds["permeability"] = hydro["permeability"] stoch_results = apply_stochastics(aquifer_ds, p_value) aquifer_ds["transmissivity"] = stoch_results["transmissivity"] aquifer_ds["transmissivity_with_ntg"] = stoch_results["transmissivity_with_ntg"] aquifer_ds["thickness"] = stoch_results["thickness"] aquifer_ds["permeability"] = stoch_results["permeability"] results = xr.apply_ufunc( cell_calculation, Loading @@ -186,14 +124,15 @@ def compute_results_for_aquifer( aquifer_ds["depth"], aquifer_ds["porosity"], aquifer_ds["temperature"], kwargs={"config": config}, input_core_dims=[[]] * 8, output_core_dims=[[] for _ in output_names], output_core_dims=[[] for _ in OUTPUT_NAMES], vectorize=True, dask="parallelized", output_dtypes=[np.float64] * len(output_names), output_dtypes=[np.float64] * len(OUTPUT_NAMES), ) for name, result in zip(output_names, results, strict=True): for name, result in zip(OUTPUT_NAMES, results, strict=True): aquifer_ds[f"{name}_p{p_value}"] = result aquifer_ds.compute() Loading @@ -203,7 +142,7 @@ def compute_results_for_aquifer( return aquifer_ds def compute_hydro_properties_mc( def apply_stochastics( aquifer_ds: xr.Dataset, p_value: float, n_samples: int = 10_000, Loading Loading @@ -239,3 +178,55 @@ def compute_hydro_properties_mc( }, coords=aquifer_ds.coords, ) def cell_calculation( unknown_input_value: float, thickness: float, transmissivity: float, transmissivity_with_ntg: float, ntg: float, depth: float, porosity: float, temperature: float, config: UTCConfiguration, ) -> DoubletOutput: inputs = [ unknown_input_value, thickness, transmissivity, transmissivity_with_ntg, ntg, depth, porosity, temperature, ] if any(np.isnan(v) for v in inputs): return NAN_OUTPUTS if transmissivity_with_ntg < config.kh_cutoff: return ZERO_OUTPUTS doublet_input = DoubletInput( unknown_input_value=unknown_input_value, thickness=thickness, transmissivity=transmissivity, transmissivity_with_ntg=transmissivity_with_ntg, ntg=ntg, depth=depth, porosity=porosity / 100, temperature=temperature, ) try: result = calculate_doublet_performance(config, doublet_input) except ValueError: return NAN_OUTPUTS if result is None: return NAN_OUTPUTS if result.utc > 1000: result = result._replace(utc=1000) return result