TNO Intern

Commit 9797bbd2 authored by Florian Knappers's avatar Florian Knappers
Browse files

restructure more tests

parent fd65645a
Loading
Loading
Loading
Loading
+241 −0
Original line number Diff line number Diff line
@@ -3,9 +3,13 @@ from unittest import TestCase
import numpy as np
import pytest
from jpype import JClass
import xarray as xr

from pythermogis.jvm import start_jvm
from pythermogis.mock import create_logger_mock
from doublet import calculate_doublet_performance_stochastic
from properties import instantiate_thermogis_parameters



class ThermoGISDoubletBenchmark(TestCase):
@@ -339,3 +343,240 @@ class ThermoGISDoubletBenchmark(TestCase):
        tg_parameters.setWellCostDepth(1050)
        tg_parameters.setWellCostDepth2(0.3)
        return tg_parameters


class PythermoGISDoubletBenchmark(TestCase):
    """
    This is a series of tests which produce the same results found in the Java code
    and in test_ThermoGISDoublet_Benchmark, but using the intended
     pythermogis implementation
    """

    def test_calculateDoubletPerformanceTest(self):
        tg_parameters = instantiate_thermogis_parameters()
        tg_parameters.setDhReturnTemp(40)
        tg_parameters.setUseKestinViscosity(True)
        utc_properties = tg_parameters.setupUTCParameters()

        input_data = xr.Dataset(
            {
                "thickness_mean": ((), 100),
                "thickness_sd": ((), 0.0),
                "ntg": ((), 1.0),
                "porosity": ((), 0.0),
                "depth": ((), 2000),
                "temperature": ((), 76),
                "ln_permeability_mean": ((), np.log(175)),
                "ln_permeability_sd": ((), 0.0),
            }
        )

        results = calculate_doublet_performance_stochastic(
            input_data, utc_properties=utc_properties, rng_seed=0
        )

        # Assert
        self.assertTrue(np.isclose(17500, results.transmissivity.data[0], 0.001))
        self.assertTrue(np.isclose(227.2757568359375, results.flow_rate.data[0], 1))
        self.assertTrue(np.isclose(60, results.pres.data[0], 0.001))
        self.assertTrue(np.isclose(6.616096470753937, results.utc.data[0], 0.001))
        self.assertTrue(np.isclose(1159.17968, results.welld.data[0], 0.001))
        self.assertTrue(np.isclose(13.627557754516602, results.cop.data[0], 0.001))
        self.assertTrue(np.isclose(8.636903762817383, results.power.data[0], 0.001))

    def test_calculateDoubletPerformance_directHeat(self):
        """
        This is a copy of a test from the Java ThermoGisDoubletTest.java script;
        to validate that this python implementation of the ThermoGIS Doublet
        returns the same values.
        """
        # Arrange
        # Instantiate the UTC properties class
        tg_properties = self.setup_template_tg_properties()
        tg_properties.setOpexBase(0)

        utc_properties = tg_properties.setupUTCParameters()

        input_data = xr.Dataset(
            {
                "thickness_mean": ((), 100),
                "thickness_sd": ((), 0.0),
                "ntg": ((), 1.0),
                "porosity": ((), 0.0),
                "depth": ((), 2000),
                "temperature": ((), 76),
                "ln_permeability_mean": ((), np.log(175)),
                "ln_permeability_sd": ((), 0.0),
            }
        )

        # Act
        results = calculate_doublet_performance_stochastic(
            input_data, utc_properties=utc_properties, rng_seed=0
        )

        # Assert
        self.assertTrue(np.isclose(17500, results.transmissivity.data[0], 0.001))
        self.assertTrue(np.isclose(227.2757568359375, results.flow_rate.data[0], 1))
        self.assertTrue(np.isclose(60, results.pres.data[0], 0.001))
        self.assertTrue(np.isclose(5.229816400909403, results.utc.data[0], 0.001))
        self.assertTrue(np.isclose(1159.1796, results.welld.data[0], 0.001))
        self.assertTrue(np.isclose(13.623167037963867, results.cop.data[0], 0.001))
        self.assertTrue(np.isclose(8.624696731567383, results.power.data[0], 0.001))

    def test_calculateDoubletPerformance_chiller(self):
        """
        This is a copy of a test from the Java ThermoGisDoubletTest.java script;
         to validate that this python implementation of the ThermoGIS Doublet
        returns the same values.
        """
        # Arrange
        # Instantiate the UTC properties class
        tg_parameters = self.setup_template_tg_properties()
        tg_parameters.setCapexConstant(0.5)
        tg_parameters.setCapexVariable(1100)
        tg_parameters.setHeatExchangerEfficiency(0.4)
        tg_parameters.setDhReturnTemp(60)
        tg_parameters.setHeatExchangerParasitic(0.1)

        utc_properties = tg_parameters.setupUTCParameters()

        input_data = xr.Dataset(
            {
                "thickness_mean": ((), 100),
                "thickness_sd": ((), 0.0),
                "ntg": ((), 1.0),
                "porosity": ((), 0.0),
                "depth": ((), 2000),
                "temperature": ((), 76),
                "ln_permeability_mean": ((), np.log(175)),
                "ln_permeability_sd": ((), 0.0),
            }
        )

        # Act
        results = calculate_doublet_performance_stochastic(
            input_data, utc_properties=utc_properties, rng_seed=0
        )

        # Assert
        self.assertTrue(np.isclose(17500, results.transmissivity.data[0], 0.001))
        self.assertTrue(np.isclose(227.2757568359375, results.flow_rate.data[0], 1))
        self.assertTrue(np.isclose(60, results.pres.data[0], 0.001))
        self.assertTrue(np.isclose(20.470115103822685, results.utc.data[0], 0.001))
        self.assertTrue(np.isclose(1227.1484375, results.welld.data[0], 0.001))
        self.assertTrue(np.isclose(1.8887300754346803, results.cop.data[0], 0.001))
        self.assertTrue(np.isclose(1.6594701766967774, results.power.data[0], 0.001))
        self.assertTrue(np.isclose(12.748051248109613, results.capex.data[0], 0.001))

    def test_calculateDoubletPerformance_ORC(self):
        """
        This is a copy of a test from the Java ThermoGisDoubletTest.java script;
         to validate that this python implementation of the ThermoGIS Doublet
        returns the same values.
        """
        # Arrange
        # Instantiate the UTC properties class
        tg_parameters = self.setup_template_tg_properties()
        tg_parameters.setCapexConstant(0)
        tg_parameters.setCapexVariable(2300)
        tg_parameters.setHeatExchangerEfficiency(0.6)
        tg_parameters.setUseORC(True)
        tg_parameters.setHeatExchangerBaseTemp(20)
        tg_parameters.setDhReturnTemp(60)
        utc_properties = tg_parameters.setupUTCParameters()

        input_data = xr.Dataset(
            {
                "thickness_mean": ((), 100),
                "thickness_sd": ((), 0.0),
                "ntg": ((), 1.0),
                "porosity": ((), 0.0),
                "depth": ((), 2000),
                "temperature": ((), 100),
                "ln_permeability_mean": ((), np.log(175)),
                "ln_permeability_sd": ((), 0.0),
            }
        )

        # Act
        results = calculate_doublet_performance_stochastic(
            input_data, utc_properties=utc_properties, rng_seed=0
        )

        # Assert
        self.assertTrue(np.isclose(17500, results.transmissivity.data[0], 0.001))
        self.assertTrue(np.isclose(293.6246643066406, results.flow_rate.data[0], 1))
        self.assertTrue(np.isclose(60, results.pres.data[0], 0.001))
        self.assertTrue(np.isclose(36.98296076530068, results.utc.data[0], 0.001))
        self.assertTrue(np.isclose(0.05274631495788107, results.power.data[0], 0.01))
        self.assertTrue(np.isclose(1306.4453125, results.welld.data[0], 0.001))
        self.assertTrue(np.isclose(0.06459403120103477, results.cop.data[0], 0.001))
        self.assertTrue(np.isclose(12.44409167482118, results.capex.data[0], 0.001))

    def test_calculateDoubletPerformance_directHeat_and_heatpump(self):
        """
        This is a copy of a test from the Java ThermoGisDoubletTest.java script;
        to validate that this python implementation of the ThermoGIS Doublet
        returns the same values.
        """
        # Arrange
        # Instantiate the UTC properties class
        tg_properties = self.setup_template_tg_properties()
        tg_properties.setOpexPower(100)
        tg_properties.setOpexBase(0)
        tg_properties.setGoalTemperature(70)
        tg_properties.setUseHeatpump(True)
        tg_properties.setDhReturnTemp(35)
        utc_properties = tg_properties.setupUTCParameters()

        input_data = xr.Dataset(
            {
                "thickness_mean": ((), 100),
                "thickness_sd": ((), 0.0),
                "ntg": ((), 1.0),
                "porosity": ((), 0.0),
                "depth": ((), 2000),
                "temperature": ((), 50),
                "ln_permeability_mean": ((), np.log(175)),
                "ln_permeability_sd": ((), 0.0),
            }
        )

        # Act
        results = calculate_doublet_performance_stochastic(
            input_data, utc_properties=utc_properties, rng_seed=0
        )
        power_hpelec = results.heat_pump_power / (results.cophp - 1)
        power_ratio = results.power / (results.power + power_hpelec)

        # Assert
        self.assertTrue(np.isclose(7.06616794, results.power + power_hpelec, 0.1))
        self.assertTrue(np.isclose(8.83718197747828, results.utc * power_ratio, 0.1))
        self.assertTrue(np.isclose(17499.99940, 17500, 0.001))
        self.assertTrue(np.isclose(152.5, results.flow_rate, 0.001))
        self.assertTrue(np.isclose(60, results.pres, 0.001))
        self.assertTrue(np.isclose(5.673, results.power, 0.001))
        self.assertTrue(np.isclose(955.3, results.welld, 0.001))
        self.assertTrue(np.isclose(3.89047122, results.cop, 0.001))
        self.assertTrue(np.isclose(3.405, results.cophp, 0.001))
        self.assertTrue(np.isclose(10.36858654, results.utc, 0.001))
        self.assertTrue(np.isclose(17.3787117, results.capex, 0.001))

    def setup_template_tg_properties(self):
        tg_parameters = instantiate_thermogis_parameters()
        tg_parameters.setSalinityGradient(47.0)
        tg_parameters.setDrillingTime(1)
        tg_parameters.setTaxRate(20)
        tg_parameters.setEquityReturn(15)
        tg_parameters.setOpexBase(10000)
        tg_parameters.setOpexPower(50)
        tg_parameters.setWellCostScaling(1)
        tg_parameters.setOpexEnergy(0)
        tg_parameters.setWellCostConstant(0.375)
        tg_parameters.setWellCostDepth(1050)
        tg_parameters.setWellCostDepth2(0.3)
        tg_parameters.setDhReturnTemp(40)
        tg_parameters.setUseKestinViscosity(True)

        return tg_parameters
+169 −1
Original line number Diff line number Diff line
import timeit
from pathlib import Path

import pytest
import numpy as np
import matplotlib.pyplot as plt

import xarray as xr

from pythermogis.dask import auto_chunk_dataset
from pythermogis.doublet import calculate_doublet_performance


@pytest.fixture
@@ -98,3 +104,165 @@ def test_1d_dataarray():
    da = xr.DataArray(np.arange(1000), dims=["x"])
    result = auto_chunk_dataset(da, target_chunk_size=100)
    assert _max_chunk_size(result) <= 100



def test_dask_parralelization():
    output_data_path = (
        Path(__file__).parent.parent / "resources" / "test_output" / "parallelization"
    )
    output_data_path.mkdir(parents=True, exist_ok=True)

    assess_optimal_chunk_size(
        n_simulations=100,
        chunk_step_size=50,
        plot_outfile=output_data_path / "parallelization.png",
    )

    assert output_data_path.exists()


def test_auto_chunking():
    # Arrange
    Ndim1, Ndim2, Ndim3 = 50, 4, 7
    reservoir_properties = xr.Dataset(
        {
            "thickness": (["dim1", "dim2", "dim3"], np.ones((Ndim1, Ndim2, Ndim3))),
            "porosity": (["dim1", "dim2", "dim3"], np.ones((Ndim1, Ndim2, Ndim3))),
            "ntg": (["dim1", "dim2", "dim3"], np.ones((Ndim1, Ndim2, Ndim3))),
            "depth": (["dim1", "dim2", "dim3"], np.ones((Ndim1, Ndim2, Ndim3))),
            "permeability": (["dim1", "dim2", "dim3"], np.ones((Ndim1, Ndim2, Ndim3))),
        },
        coords={
            "dim1": np.arange(Ndim1),
            "dim2": np.arange(Ndim2),
            "dim3": np.arange(Ndim3),
        },
    )

    # Act
    reservoir_properties = auto_chunk_dataset(reservoir_properties, 42)

    # Assert
    assert reservoir_properties.chunksizes["dim1"] == (
        3,
        3,
        3,
        3,
        3,
        3,
        3,
        3,
        3,
        3,
        3,
        3,
        3,
        3,
        3,
        3,
        2,
    )
    assert reservoir_properties.chunksizes["dim2"] == (4,)
    assert reservoir_properties.chunksizes["dim3"] == (3, 3, 1)


def assess_optimal_chunk_size(
    n_simulations: int = 1000,
    chunk_step_size: int = 100,
    plot_outfile: str | Path = None,
):
    """
    This runs the same set of doublet simulations using different chunk sizes and
    prints the results to the terminal to show find which chunk size is optimal.
    It runs each chunk size 3 times and takes the average of their times to find the
     time taken for that chunk size.
    The chunks which are tested go from 1 -> n_simulations
     with steps of chunk_step_size.

    Parameters
    ----------
    n_simulations : int
        The number of simulations which are run in parralel
    chunk_step_size : int
        The step size of chunks to test; going from 1 to n_simulations
    plot_outfile : str | Path
        If provided a plot is made showing the time taken per-chunk size and saved
        to this path

    """
    reservoir_properties = xr.Dataset(
        {
            "thickness": (["sample"], np.ones(n_simulations) * 200),
            "porosity": (["sample"], np.ones(n_simulations)),
            "ntg": (["sample"], np.ones(n_simulations)),
            "depth": (["sample"], np.ones(n_simulations) * 1000),
            "permeability": (["sample"], np.ones(n_simulations) * 500),
        },
        coords={"sample": np.arange(n_simulations)},
    )

    n_attempts = (
        3  # do the same operation n_attempts and take an average of their times.
    )
    sample_chunks = np.arange(1, n_simulations + 2, chunk_step_size)

    # run in series
    time_attempt = []
    for _attempt in range(n_attempts):
        start = timeit.default_timer()
        simulation_benchmark = calculate_doublet_performance(reservoir_properties)
        time_attempt.append(timeit.default_timer() - start)
    normal_time = np.mean(time_attempt)
    print(
        f"non-parralel simulation took {normal_time:.1f} seconds,"
        f" {n_simulations / normal_time:.1f} samples per second"
    )

    # run in parallel:
    mean_time = []
    std_time = []
    for sample_chunk in sample_chunks:
        time_attempt = []
        for _attempt in range(n_attempts):
            start = timeit.default_timer()
            simulations_parallel = calculate_doublet_performance(
                reservoir_properties,
                chunk_size=sample_chunk,
                print_execution_duration=False,
            )
            time_attempt.append(timeit.default_timer() - start)

            # additional check that the results are identical
            xr.testing.assert_allclose(simulation_benchmark, simulations_parallel)
            xr.testing.assert_equal(simulation_benchmark, simulations_parallel)

        mean_time.append(np.mean(time_attempt))
        std_time.append(np.std(time_attempt))
        print(
            f"parralel simulation, chunk size: {sample_chunk}, "
            f"took {np.mean(time_attempt):.1f} "
            f"seconds to run {n_simulations} simulations, "
            f"{n_simulations / mean_time[-1]:.1f} samples per second"
        )

    if plot_outfile is None:
        return

    fig, ax = plt.subplots(1, 1, figsize=(8, 5))
    ax.errorbar(
        sample_chunks,
        mean_time,
        yerr=std_time,
        fmt="o",
        capsize=5,
        label="parralel simulation",
    )
    ax.axhline(
        normal_time, label="non-parralel simulation", color="tab:orange", linestyle="--"
    )
    ax.set_xlabel("chunk size")
    ax.set_ylabel("time (s)")
    ax.set_title("Chunk size assessment")
    ax.legend()
    plt.savefig(plot_outfile)
+0 −170
Original line number Diff line number Diff line
import timeit
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import xarray as xr

from pythermogis.dask import auto_chunk_dataset
from pythermogis.doublet import calculate_doublet_performance


def test_dask_parralelization():
    output_data_path = (
        Path(__file__).parent.parent / "resources" / "test_output" / "parallelization"
    )
    output_data_path.mkdir(parents=True, exist_ok=True)

    assess_optimal_chunk_size(
        n_simulations=100,
        chunk_step_size=50,
        plot_outfile=output_data_path / "parallelization.png",
    )

    assert output_data_path.exists()


def test_auto_chunking():
    # Arrange
    Ndim1, Ndim2, Ndim3 = 50, 4, 7
    reservoir_properties = xr.Dataset(
        {
            "thickness": (["dim1", "dim2", "dim3"], np.ones((Ndim1, Ndim2, Ndim3))),
            "porosity": (["dim1", "dim2", "dim3"], np.ones((Ndim1, Ndim2, Ndim3))),
            "ntg": (["dim1", "dim2", "dim3"], np.ones((Ndim1, Ndim2, Ndim3))),
            "depth": (["dim1", "dim2", "dim3"], np.ones((Ndim1, Ndim2, Ndim3))),
            "permeability": (["dim1", "dim2", "dim3"], np.ones((Ndim1, Ndim2, Ndim3))),
        },
        coords={
            "dim1": np.arange(Ndim1),
            "dim2": np.arange(Ndim2),
            "dim3": np.arange(Ndim3),
        },
    )

    # Act
    reservoir_properties = auto_chunk_dataset(reservoir_properties, 42)

    # Assert
    assert reservoir_properties.chunksizes["dim1"] == (
        3,
        3,
        3,
        3,
        3,
        3,
        3,
        3,
        3,
        3,
        3,
        3,
        3,
        3,
        3,
        3,
        2,
    )
    assert reservoir_properties.chunksizes["dim2"] == (4,)
    assert reservoir_properties.chunksizes["dim3"] == (3, 3, 1)


def assess_optimal_chunk_size(
    n_simulations: int = 1000,
    chunk_step_size: int = 100,
    plot_outfile: str | Path = None,
):
    """
    This runs the same set of doublet simulations using different chunk sizes and
    prints the results to the terminal to show find which chunk size is optimal.
    It runs each chunk size 3 times and takes the average of their times to find the
     time taken for that chunk size.
    The chunks which are tested go from 1 -> n_simulations
     with steps of chunk_step_size.

    Parameters
    ----------
    n_simulations : int
        The number of simulations which are run in parralel
    chunk_step_size : int
        The step size of chunks to test; going from 1 to n_simulations
    plot_outfile : str | Path
        If provided a plot is made showing the time taken per-chunk size and saved
        to this path

    """
    reservoir_properties = xr.Dataset(
        {
            "thickness": (["sample"], np.ones(n_simulations) * 200),
            "porosity": (["sample"], np.ones(n_simulations)),
            "ntg": (["sample"], np.ones(n_simulations)),
            "depth": (["sample"], np.ones(n_simulations) * 1000),
            "permeability": (["sample"], np.ones(n_simulations) * 500),
        },
        coords={"sample": np.arange(n_simulations)},
    )

    n_attempts = (
        3  # do the same operation n_attempts and take an average of their times.
    )
    sample_chunks = np.arange(1, n_simulations + 2, chunk_step_size)

    # run in series
    time_attempt = []
    for _attempt in range(n_attempts):
        start = timeit.default_timer()
        simulation_benchmark = calculate_doublet_performance(reservoir_properties)
        time_attempt.append(timeit.default_timer() - start)
    normal_time = np.mean(time_attempt)
    print(
        f"non-parralel simulation took {normal_time:.1f} seconds,"
        f" {n_simulations / normal_time:.1f} samples per second"
    )

    # run in parallel:
    mean_time = []
    std_time = []
    for sample_chunk in sample_chunks:
        time_attempt = []
        for _attempt in range(n_attempts):
            start = timeit.default_timer()
            simulations_parallel = calculate_doublet_performance(
                reservoir_properties,
                chunk_size=sample_chunk,
                print_execution_duration=False,
            )
            time_attempt.append(timeit.default_timer() - start)

            # additional check that the results are identical
            xr.testing.assert_allclose(simulation_benchmark, simulations_parallel)
            xr.testing.assert_equal(simulation_benchmark, simulations_parallel)

        mean_time.append(np.mean(time_attempt))
        std_time.append(np.std(time_attempt))
        print(
            f"parralel simulation, chunk size: {sample_chunk}, "
            f"took {np.mean(time_attempt):.1f} "
            f"seconds to run {n_simulations} simulations, "
            f"{n_simulations / mean_time[-1]:.1f} samples per second"
        )

    if plot_outfile is None:
        return

    fig, ax = plt.subplots(1, 1, figsize=(8, 5))
    ax.errorbar(
        sample_chunks,
        mean_time,
        yerr=std_time,
        fmt="o",
        capsize=5,
        label="parralel simulation",
    )
    ax.axhline(
        normal_time, label="non-parralel simulation", color="tab:orange", linestyle="--"
    )
    ax.set_xlabel("chunk size")
    ax.set_ylabel("time (s)")
    ax.set_title("Chunk size assessment")
    ax.legend()
    plt.savefig(plot_outfile)
+0 −53
Original line number Diff line number Diff line
from pathlib import Path

import numpy as np
import pytest
import xarray as xr

from pythermogis.doublet import calculate_doublet_performance

test_files_path = Path(__file__).parent.parent / "resources"


def print_min_max(variable: xr.DataArray):
    print(f"{variable.name}, {np.min(variable):.2f}, {np.max(variable):.2f}")


@pytest.mark.skip(
    "Useful for understanding the range of possible input values, "
    "not for pipeline testing"
)
def test_define_calculation_limits():
    recalculate_results = False
    if recalculate_results:
        # generate simulation samples across desired reservoir properties
        Nsamples = 1000
        thickness_samples = np.random.uniform(low=1, high=10e3, size=Nsamples)
        porosity_samples = np.random.uniform(low=0.01, high=0.99, size=Nsamples)
        ntg_samples = np.random.uniform(low=0.01, high=0.99, size=Nsamples)
        depth_samples = np.random.uniform(low=1, high=10e3, size=Nsamples)
        permeability_samples = np.random.uniform(low=1, high=10e3, size=Nsamples)
        reservoir_properties = xr.Dataset(
            {
                "thickness": (["sample"], thickness_samples),
                "porosity": (["sample"], porosity_samples),
                "ntg": (["sample"], ntg_samples),
                "depth": (["sample"], depth_samples),
                "permeability": (["sample"], permeability_samples),
            },
            coords={"sample": np.arange(Nsamples)},
        )

        results = calculate_doublet_performance(
            reservoir_properties, chunk_size=100, print_execution_duration=True
        )
        results.to_netcdf(test_files_path / "calculation_limits_result.nc")
    else:
        results = xr.load_dataset(test_files_path / "calculation_limits_result.nc")

    # drop the values which returned nan:
    results = results.dropna(dim="sample", subset=["power"])

    # print the min and max values of all the inputs
    for var in ["thickness", "porosity", "ntg", "depth", "permeability", "temperature"]:
        print_min_max(results[var])
+0 −72

File deleted.

Preview size limit exceeded, changes collapsed.

Loading