TNO Intern

Commit 0a30a27e authored by Florian Knappers's avatar Florian Knappers
Browse files

update to the tests

parent 7ae11906
Loading
Loading
Loading
Loading
+16 −4
Original line number Diff line number Diff line
@@ -34,6 +34,10 @@ class ThermoGISDoubletResults(BaseModel):
    inj_temp: FloatOrArray
    prd_temp: FloatOrArray
    transmissivity_with_ntg: FloatOrArray
    thickness: FloatOrArray
    transmissivity: FloatOrArray
    permeability: FloatOrArray | None
    temperature: FloatOrArray

    def to_dataset(self) -> xr.Dataset:
        return xr.Dataset({field: getattr(self, field) for field in self.model_fields})
@@ -67,9 +71,10 @@ class ThermoGISDoublet:
            mask_value = 0.0 if p_values is not None else np.nan

        if p_values is not None:
            thickness, transmissivity = self._resolve_stochastic_properties(p_values)
            thickness, permeability, transmissivity = self._resolve_stochastic_properties(p_values)
        else:
            thickness = self.aquifer.thickness
            permeability = self.aquifer.permeability
            transmissivity = self.aquifer.transmissivity

        transmissivity_with_ntg = (transmissivity * self.aquifer.ntg) / 1e3
@@ -110,6 +115,9 @@ class ThermoGISDoublet:
                if isinstance(transmissivity_with_ntg, xr.DataArray)
                else transmissivity_with_ntg
            )
            thickness = thickness.load() if isinstance(thickness, xr.DataArray) else thickness
            permeability = permeability.load() if isinstance(permeability, xr.DataArray) else permeability
            transmissivity = transmissivity.load() if isinstance(transmissivity, xr.DataArray) else transmissivity

        logger.info("Doublet simulation took %.1f seconds", timeit.default_timer() - start)

@@ -129,14 +137,18 @@ class ThermoGISDoublet:
            inj_temp=output_arrays[12],
            prd_temp=output_arrays[13],
            transmissivity_with_ntg=transmissivity_with_ntg,
            thickness=thickness,
            transmissivity=transmissivity,
            permeability=permeability,
            temperature=self.aquifer.temperature,
        )

    def _resolve_stochastic_properties(self, p_values: list[float]) -> tuple[xr.DataArray, xr.DataArray]:
    def _resolve_stochastic_properties(self, p_values: list[float]) -> tuple[xr.DataArray, xr.DataArray, xr.DataArray]:
        p_values_da = xr.DataArray(
            data=p_values, dims=["p_value"], coords={"p_value": p_values}
        )

        thickness, _, transmissivity = xr.apply_ufunc(
        thickness, permeability, transmissivity = xr.apply_ufunc(
            generate_thickness_permeability_transmissivity_for_pvalues,
            self.aquifer.thickness_mean,
            self.aquifer.thickness_sd,
@@ -149,7 +161,7 @@ class ThermoGISDoublet:
            dask="parallelized",
        )

        return thickness, transmissivity
        return thickness, permeability, transmissivity


def _run_doublet_at_location(
+29 −1
Original line number Diff line number Diff line
@@ -40,3 +40,31 @@ class Settings:
    @property
    def surface_temperature(self) -> float:
        return self.settings.getSurfaceTemperature()

    def set_use_heatpump(self, value: bool) -> None:
        """
        Info:
            Weather to use a heatpump or not. The heatpump is used...
            #TODO: explain what the heatpump does

        Default value:
            False

        Unit:
            -
        """
        self.settings.setUseHeatpump(value)

    def set_well_stimulation(self, value: bool) -> None:
        """
        Info:
            Weather to use a heatpump or not. The heatpump is used...
            #TODO: explain what the heatpump does

        Default value:
            False

        Unit:
            -
        """
        self.settings.setUseWellStimulation(value)
 No newline at end of file
+167 −158
Original line number Diff line number Diff line
@@ -4,92 +4,85 @@ from pathlib import Path
from unittest.case import TestCase

import numpy as np
import pytest
import xarray as xr
from pygridsio import read_grid, resample_grid

from pytg3.aquifer import Aquifer, StochasticAquifer
from pytg3.doublet import ThermoGISDoublet
from pythermogis.doublet import (
    calculate_doublet_performance,
    calculate_doublet_performance_stochastic,
)
from pythermogis.properties import instantiate_thermogis_parameters

test_files_path = Path(__file__).parent.parent / "resources"


def print_min_max(variable: xr.DataArray):
    print(f"{variable.name}, {np.min(variable):.2f}, {np.max(variable):.2f}")


@pytest.mark.skip(
    "Useful for understanding the range of possible input values, "
    "not for pipeline testing"
)
def test_define_calculation_limits():
    recalculate_results = False
    if recalculate_results:
        # generate simulation samples across desired reservoir properties
        nsamples = 1000
        thickness_samples = np.random.uniform(low=1, high=10e3, size=nsamples)
        porosity_samples = np.random.uniform(low=0.01, high=0.99, size=nsamples)
        ntg_samples = np.random.uniform(low=0.01, high=0.99, size=nsamples)
        depth_samples = np.random.uniform(low=1, high=10e3, size=nsamples)
        permeability_samples = np.random.uniform(low=1, high=10e3, size=nsamples)
        reservoir_properties = xr.Dataset(
            {
                "thickness": (["sample"], thickness_samples),
                "porosity": (["sample"], porosity_samples),
                "ntg": (["sample"], ntg_samples),
                "depth": (["sample"], depth_samples),
                "permeability": (["sample"], permeability_samples),
            },
            coords={"sample": np.arange(nsamples)},
        )

        results = calculate_doublet_performance(
            reservoir_properties, chunk_size=100, print_execution_duration=True
        )
        results.to_netcdf(test_files_path / "calculation_limits_result.nc")
    else:
        results = xr.load_dataset(test_files_path / "calculation_limits_result.nc")

    # drop the values which returned nan:
    results = results.dropna(dim="sample", subset=["power"])

    # print the min and max values of all the inputs
    for var in ["thickness", "porosity", "ntg", "depth", "permeability", "temperature"]:
        print_min_max(results[var])
from pythermogis.pytg3.settings import Settings


test_files_path = Path(__file__).parent / "resources"


# def print_min_max(variable: xr.DataArray):
#     print(f"{variable.name}, {np.min(variable):.2f}, {np.max(variable):.2f}")


# @pytest.mark.skip(
#     "Useful for understanding the range of possible input values, "
#     "not for pipeline testing"
# )
# def test_define_calculation_limits():
#     recalculate_results = False
#     if recalculate_results:
#         # generate simulation samples across desired reservoir properties
#         nsamples = 1000
#         thickness_samples = np.random.uniform(low=1, high=10e3, size=nsamples)
#         porosity_samples = np.random.uniform(low=0.01, high=0.99, size=nsamples)
#         ntg_samples = np.random.uniform(low=0.01, high=0.99, size=nsamples)
#         depth_samples = np.random.uniform(low=1, high=10e3, size=nsamples)
#         permeability_samples = np.random.uniform(low=1, high=10e3, size=nsamples)
#         reservoir_properties = xr.Dataset(
#             {
#                 "thickness": (["sample"], thickness_samples),
#                 "porosity": (["sample"], porosity_samples),
#                 "ntg": (["sample"], ntg_samples),
#                 "depth": (["sample"], depth_samples),
#                 "permeability": (["sample"], permeability_samples),
#             },
#             coords={"sample": np.arange(nsamples)},
#         )
#
#         results = calculate_doublet_performance(
#             reservoir_properties, chunk_size=100, print_execution_duration=True
#         )
#         results.to_netcdf(test_files_path / "calculation_limits_result.nc")
#     else:
#         results = xr.load_dataset(test_files_path / "calculation_limits_result.nc")
#
#     # drop the values which returned nan:
#     results = results.dropna(dim="sample", subset=["power"])
#
#     # print the min and max values of all the inputs
#     for var in ["thickness", "porosity", "ntg", "depth", "permeability", "temperature"]:
#         print_min_max(results[var])


def test_deterministic_doublet():
    reservoir_properties = xr.Dataset(
        {
            "thickness": 10,
            "porosity": 0.2,
            "ntg": 0.5,
            "depth": 2000,
            "permeability": 200,
        }
    aquifer = Aquifer(
        thickness=10,
        porosity=0.2,
        ntg=0.5,
        depth=2000,
        permeability=200
    )

    results1 = calculate_doublet_performance(reservoir_properties)
    results2 = calculate_doublet_performance(reservoir_properties)
    results1 = ThermoGISDoublet(aquifer=aquifer).simulate().to_dataset()
    results2 = ThermoGISDoublet(aquifer=aquifer).simulate().to_dataset()

    xr.testing.assert_equal(results1, results2)


def test_deterministic_doublet_mask_value():
    reservoir_properties = xr.Dataset(
        {
            "thickness": 10,
            "porosity": 0.2,
            "ntg": 0.5,
            "depth": 2000,
            "permeability": 200,
            "mask": 0.0,
        }
    aquifer = Aquifer(
        thickness=10, porosity=0.2, ntg=0.5, depth=2000, permeability=200, mask=0.0
    )

    results = calculate_doublet_performance(reservoir_properties, mask_value=-9999.0)
    results = ThermoGISDoublet(aquifer=aquifer).simulate(mask_value=-9999.0).to_dataset()

    assert results.power.data == -9999.0

@@ -105,29 +98,35 @@ def test_deterministic_doublet_temperature_provided():
            "permeability": 200,
        }
    )
    aquifer = Aquifer(
        thickness=10,
        porosity=0.2,
        ntg=0.5,
        depth=2000,
        temperature=89,
        permeability=200,
    )

    results1 = ThermoGISDoublet(aquifer=aquifer).simulate().to_dataset()
    results2 = ThermoGISDoublet(aquifer=aquifer).simulate().to_dataset()

    results1 = calculate_doublet_performance(reservoir_properties)
    results2 = calculate_doublet_performance(reservoir_properties)
    # assert that results are equal, showing no stochastic variations
    xr.testing.assert_equal(results1, results2)


def test_deterministic_doublet_transmissivity_provided():
    reservoir_properties = xr.Dataset(
        {
            "thickness": 10,
            "porosity": 0.2,
            "ntg": 0.5,
            "depth": 2000,
            "temperature": 89,
            "transmissivity": 2000,
            "permeability": 200,
        }
    aquifer = Aquifer(
        thickness=10,
        porosity=0.2,
        ntg=0.5,
        depth=2000,
        temperature=89,
        transmissivity=2000,
        permeability=200,
    )

    results1 = calculate_doublet_performance(reservoir_properties)
    results2 = calculate_doublet_performance(reservoir_properties)
    # assert that results are equal, showing no stochastic variations
    results1 = ThermoGISDoublet(aquifer=aquifer).simulate().to_dataset()
    results2 = ThermoGISDoublet(aquifer=aquifer).simulate().to_dataset()

    xr.testing.assert_equal(results1, results2)


@@ -148,10 +147,10 @@ class PyThermoGISSpeedTest(TestCase):
        shutil.rmtree(self.test_files_out_path)

    def test_doublet_speed(self):
        input_grids = self.read_input_grids()
        aquifer = self.setup_aquifer()
        p_values = [50]

        data = input_grids["thickness_mean"].data.flatten()
        data = aquifer.thickness_mean.data.flatten()
        non_nan_data = data[~np.isnan(data)]
        print(
            f"Running Performance calculation for ROSL_ROSLU, Pvalues: "
@@ -162,48 +161,58 @@ class PyThermoGISSpeedTest(TestCase):
        # and all provided P_values: The Equivalent calculation with 10 cores takes
        # 43 seconds in the Java code
        start = timeit.default_timer()
        calculate_doublet_performance_stochastic(
            input_grids, chunk_size=5, p_values=p_values
        )
        ThermoGISDoublet(aquifer=aquifer).simulate(chunk_size=5, p_values=p_values)

        time_elapsed = timeit.default_timer() - start
        print(f"Python calculation took {time_elapsed:.1f} seconds.")

    def read_input_grids(self):
    def setup_aquifer(self):
        new_cellsize = 10000  # in m
        input_grids = resample_grid(

        thickness_mean = resample_grid(
            read_grid(self.test_files_out_path / "ROSL_ROSLU__thick.zmap"),
            new_cellsize=new_cellsize,
        ).to_dataset(name="thickness_mean")
        input_grids["thickness_sd"] = resample_grid(
        )
        thickness_sd = resample_grid(
            read_grid(self.test_files_out_path / "ROSL_ROSLU__thick_sd.zmap"),
            new_cellsize=new_cellsize,
        )
        input_grids["ntg"] = resample_grid(
        ntg = resample_grid(
            read_grid(self.test_files_out_path / "ROSL_ROSLU__ntg.zmap"),
            new_cellsize=new_cellsize,
        )
        input_grids["porosity"] = (
        porosity = (
            resample_grid(
                read_grid(self.test_files_out_path / "ROSL_ROSLU__poro.zmap"),
                new_cellsize=new_cellsize,
            )
            / 100
        )
        input_grids["depth"] = resample_grid(
        depth = resample_grid(
            read_grid(self.test_files_out_path / "ROSL_ROSLU__top.zmap"),
            new_cellsize=new_cellsize,
        )
        input_grids["ln_permeability_mean"] = np.log(
        ln_permeability_mean = np.log(
            resample_grid(
                read_grid(self.test_files_out_path / "ROSL_ROSLU__perm.zmap"),
                new_cellsize=new_cellsize,
            )
        )
        input_grids["ln_permeability_sd"] = resample_grid(
        ln_permeability_sd = resample_grid(
            read_grid(self.test_files_out_path / "ROSL_ROSLU__ln_perm_sd.zmap"),
            new_cellsize=new_cellsize,
        )
        return input_grids

        aquifer = StochasticAquifer(
            thickness_mean=thickness_mean,
            thickness_sd=thickness_sd,
            ntg=ntg,
            porosity=porosity,
            depth=depth,
            ln_permeability_mean=ln_permeability_mean,
            ln_permeability_sd=ln_permeability_sd,
        )
        return aquifer


class PyThermoGISScenarios(TestCase):
@@ -235,7 +244,7 @@ class PyThermoGISScenarios(TestCase):
        self.input_data = self.test_files_out_path / "doublet"

        # Read in the input grids and set the p-values
        self.input_grids = self.read_input_grids()
        self.aquifer = self.setup_aquifer()
        self.p_values = [10, 50, 90]

    def tearDown(self):
@@ -246,13 +255,11 @@ class PyThermoGISScenarios(TestCase):
        # Java code, when running on the same set of input data for the base case
        # scenario. Run calculation across all dimensions of input_grids,
        # and all provided P_values
        output_grids = calculate_doublet_performance_stochastic(
            self.input_grids, p_values=self.p_values
        )
        results = ThermoGISDoublet(aquifer=self.aquifer).simulate(p_values=self.p_values).to_dataset()

        # Assert values are the same as the benchmark grids generated by the Java code
        for p_value in self.p_values:
            output_grids_p_value = output_grids.sel(p_value=p_value)
            output_grids_p_value = results.sel(p_value=p_value)
            output_grids_p_value = output_grids_p_value.drop_vars("p_value")
            self.assert_output_and_benchmark_is_close(
                self.test_benchmark_output_basecase, output_grids_p_value, p_value, ""
@@ -262,21 +269,17 @@ class PyThermoGISScenarios(TestCase):
        # This tests that the python API produces (approximately) the same output as the
        # Java code, when running on the same set of input data for the heat pump
        # scenario. Instantiate UTC properties, and set useHeatPump to True
        tg_properties = instantiate_thermogis_parameters()
        tg_properties.setUseHeatpump(True)
        utc_properties = tg_properties.setupUTCParameters()
        settings = Settings()
        settings.set_use_heatpump(True)

        # Run calculation across all dimensions of input_grids,
        # and all provided P_values
        output_grids = calculate_doublet_performance_stochastic(
            self.input_grids,
            utc_properties=utc_properties,
            p_values=self.p_values,
        )
        results = ThermoGISDoublet(aquifer=self.aquifer, settings=settings).simulate(p_values=self.p_values).to_dataset()


        # Assert values are the same as the benchmark grids generated by the Java code
        for p_value in self.p_values:
            output_grids_p_value = output_grids.sel(p_value=p_value)
            output_grids_p_value = results.sel(p_value=p_value)
            output_grids_p_value = output_grids_p_value.drop_vars("p_value")
            self.assert_output_and_benchmark_is_close(
                self.test_benchmark_output_HP, output_grids_p_value, p_value, "_HP"
@@ -286,21 +289,18 @@ class PyThermoGISScenarios(TestCase):
        # This tests that the python API produces (approximately) the same output as the
        # Java code, when running on the same set of input data for the stimulation
        # scenario. Instantiate UTC properties, and set useStim to True
        tg_properties = instantiate_thermogis_parameters()
        tg_properties.setUseWellStimulation(True)
        utc_properties = tg_properties.setupUTCParameters()
        settings = Settings()
        settings.set_well_stimulation(True)


        # Run calculation across all dimensions of input_grids,
        # and all provided P_values
        output_grids = calculate_doublet_performance_stochastic(
            self.input_grids,
            utc_properties=utc_properties,
            p_values=self.p_values,
        )
        results = ThermoGISDoublet(aquifer=self.aquifer, settings=settings).simulate(p_values=self.p_values).to_dataset()


        # Assert values are the same as the benchmark grids generated by the Java code
        for p_value in self.p_values:
            output_grids_p_value = output_grids.sel(p_value=p_value)
            output_grids_p_value = results.sel(p_value=p_value)
            output_grids_p_value = output_grids_p_value.drop_vars("p_value")
            self.assert_output_and_benchmark_is_close(
                self.test_benchmark_output_STIM, output_grids_p_value, p_value, "_STIM"
@@ -311,20 +311,18 @@ class PyThermoGISScenarios(TestCase):
        # Java code, when running on the same set of input data for the heat pump and
        # stimulation scenario
        # Instantiate UTC properties, and set useStim to True
        tg_properties = instantiate_thermogis_parameters()
        tg_properties.setUseWellStimulation(True)
        tg_properties.setUseHeatpump(True)
        utc_properties = tg_properties.setupUTCParameters()
        settings = Settings()
        settings.set_use_heatpump(True)
        settings.set_well_stimulation(True)

        # Run calculation across all dimensions of input_grids,
        # and all provided P_values
        output_grids = calculate_doublet_performance_stochastic(
            self.input_grids, utc_properties=utc_properties, p_values=self.p_values
        )
        results = ThermoGISDoublet(aquifer=self.aquifer, settings=settings).simulate(p_values=self.p_values).to_dataset()


        # Assert values are the same as the benchmark grids generated by the Java code
        for p_value in self.p_values:
            output_grids_p_value = output_grids.sel(p_value=p_value)
            output_grids_p_value = results.sel(p_value=p_value)
            output_grids_p_value = output_grids_p_value.drop_vars("p_value")
            self.assert_output_and_benchmark_is_close(
                self.test_benchmark_output_HP_STIM,
@@ -354,22 +352,22 @@ class PyThermoGISScenarios(TestCase):
        # Assert: asserting that it actually ran
        self.assertTrue(True)

    def test_doublet_missing_input(self):
        input_data = xr.Dataset(
            {
                "thickness_sd": ((), 50),
                "ntg": ((), 0.5),
                "porosity": ((), 0.5),
                "depth": ((), 5000),
                "ln_permeability_mean": ((), 5),
                "ln_permeability_sd": ((), 0.5),
            }
        )

        # Act & Assert
        with self.assertRaises(ValueError):
            calculate_doublet_performance_stochastic(input_data)

    # def test_doublet_missing_input(self):
    #     input_data = xr.Dataset(
    #         {
    #             "thickness_sd": ((), 50),
    #             "ntg": ((), 0.5),
    #             "porosity": ((), 0.5),
    #             "depth": ((), 5000),
    #             "ln_permeability_mean": ((), 5),
    #             "ln_permeability_sd": ((), 0.5),
    #         }
    #     )
    #
    #     # Act & Assert
    #     with self.assertRaises(ValueError):
    #         calculate_doublet_performance_stochastic(input_data)
    #
    def assert_output_and_benchmark_is_close(
        self, benchmark_path, output_grids, p_value, scenario
    ):
@@ -435,29 +433,40 @@ class PyThermoGISScenarios(TestCase):
            atol=30,
        )

    def read_input_grids(self):
        input_grids = read_grid(
    def setup_aquifer(self):
        thickness_mean = read_grid(
            self.input_data / "InputData" / "simplified__thick.zmap"
        ).to_dataset(name="thickness_mean")
        input_grids["thickness_sd"] = read_grid(
        )
        thickness_sd = read_grid(
            self.input_data / "InputData" / "simplified__thick_sd.zmap"
        )
        input_grids["ntg"] = read_grid(
        ntg = read_grid(
            self.input_data / "InputData" / "simplified__ntg.zmap"
        )
        input_grids["porosity"] = (
        porosity = (
            read_grid(self.input_data / "InputData" / "simplified__phi.zmap") / 100
        )
        input_grids["depth"] = read_grid(
        depth = read_grid(
            self.input_data / "InputData" / "simplified__top.zmap"
        )
        input_grids["mask"] = read_grid(
        mask = read_grid(
            self.input_data / "InputData" / "simplified__hc_accum.zmap"
        )
        input_grids["ln_permeability_mean"] = np.log(
        ln_permeability_mean = np.log(
            read_grid(self.input_data / "InputData" / "simplified__k.zmap")
        )
        input_grids["ln_permeability_sd"] = read_grid(
        ln_permeability_sd = read_grid(
            self.input_data / "InputData" / "simplified__k_lnsd.zmap"
        )
        return input_grids
        aquifer = StochasticAquifer(
            thickness_mean=thickness_mean,
            thickness_sd=thickness_sd,
            ntg=ntg,
            porosity=porosity,
            depth=depth,
            mask=mask,
            ln_permeability_mean=ln_permeability_mean,
            ln_permeability_sd=ln_permeability_sd,
        )

        return aquifer