TNO Intern

Commit 83f3d4d9 authored by Hen Brett's avatar Hen Brett 🐔
Browse files

Checking that the tests run

parent e9dc223c
Loading
Loading
Loading
Loading
Loading
+14 −2
Original line number Diff line number Diff line
@@ -5,11 +5,12 @@ import numpy as np
import xarray as xr

from pythermogis import simulate_doublet
from pythermogis.dask_utils.chunk_utils import auto_chunk_dataset
from pythermogis.physics.temperature_grid_calculation import calculate_temperature_from_gradient
from pythermogis.thermogis_classes.utc_properties import instantiate_utc_properties_builder


def calculate_doublet_performance(reservoir_properties: xr.Dataset, utc_properties = None, rng_seed = None, print_execution_duration = False) -> xr.Dataset:
def calculate_doublet_performance(reservoir_properties: xr.Dataset, utc_properties = None, rng_seed: int = None, chunk_size: int = None, print_execution_duration: bool = False) -> xr.Dataset:
    """
    Perform a deterministic Doublet performance simulation.

@@ -40,6 +41,12 @@ def calculate_doublet_performance(reservoir_properties: xr.Dataset, utc_properti
    rng_seed : int
        Random seed used for stochastic components of the simulation.

    chunk_size : int
        None by default, if set to an integer then chunking of the reservoir properties occurs.
        The chunk size is used to split up the number of simulations into "chunks" which can be processed in parallel using the dask framework.
        Chunk size involves trade-offs: smaller chunks = more parallelism, but more overhead, while larger chunks = less overhead, but can lead to memory pressure.
        The optimal chunk size is dependent on the hardware being used to run the simulation. The user should test to find the optimal chunk size.

    print_execution_duration : bool
        False by default, If set to True print the time in seconds it took to simulate across all reservoir properties

@@ -89,9 +96,14 @@ def calculate_doublet_performance(reservoir_properties: xr.Dataset, utc_properti
        reservoir_properties["transmissivity"] = reservoir_properties["permeability"] * reservoir_properties["thickness"]

    # Setup output_data dataset as a copy of reservoir properties
    if chunk_size is not None:
        reservoir_properties = auto_chunk_dataset(reservoir_properties, target_chunk_size=chunk_size)

    output_data = reservoir_properties.copy()
    output_data = simulate_doublet(output_data, reservoir_properties, rng_seed, utc_properties).load()
    output_data = simulate_doublet(output_data, reservoir_properties, rng_seed, utc_properties)
    if chunk_size is not None: output_data = output_data.load()
    if print_execution_duration: print(f"Doublet simulation took {timeit.default_timer() - start:.1f} seconds")

    return output_data

def validate_input(reservoir_properties: xr.Dataset):
+3 −3
Original line number Diff line number Diff line
@@ -13,7 +13,7 @@ def calculate_doublet_performance_stochastic(reservoir_properties: xr.Dataset,
                                             utc_properties = None,
                                             rng_seed = None,
                                             p_values: List[float] = [50.0],
                                             chunk_size: int | bool = False,
                                             chunk_size: int = None,
                                             print_execution_duration = False
                                             ) -> xr.Dataset:
    """
@@ -108,7 +108,7 @@ def calculate_doublet_performance_stochastic(reservoir_properties: xr.Dataset,
        reservoir_properties["mask"] = np.nan

    # if chunk_size is specified, then chunk reservoir_properties, (see descripting of chunk_size)
    if chunk_size:
    if chunk_size is not None:
        reservoir_properties = auto_chunk_dataset(reservoir_properties, chunk_size)

    # Setup output_data dataset
@@ -129,7 +129,7 @@ def calculate_doublet_performance_stochastic(reservoir_properties: xr.Dataset,
                                                                                                          )

    output_data = simulate_doublet(output_data, reservoir_properties, rng_seed, utc_properties)
    if chunk_size: output_data.load() # If chunking has occured then the data must be de-chunked
    if chunk_size is not None: output_data = output_data.load() # If chunking has occured then the data must be de-chunked
    if print_execution_duration: print(f"Doublet simulation took {timeit.default_timer() - start:.1f} seconds")
    return output_data

+1 −0
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@ def simulate_doublet(output_data: xr.Dataset, reservoir_properties: xr.Dataset,
                                        output_data.transmissivity_with_ntg,
                                        rng_seed,
                                        kwargs={"utc_properties": utc_properties},
                                        dask="parallelized",
                                        input_core_dims=[[], [], [], [], [], [], [], [],[]],
                                        output_core_dims=[[], [], [], [], [], [], [], [], [], [], [], [], [], []],
                                        vectorize=True,
+8 −9
Original line number Diff line number Diff line
@@ -6,10 +6,9 @@ import pytest

from pythermogis import calculate_doublet_performance, auto_chunk_dataset

@pytest.mark.skip()
def test_dask_parralelization_deterministic():
    # generate simulation samples across desired reservoir properties
    Nsamples = 5000
    Nsamples = 1000
    thickness_samples = np.random.uniform(low=150, high=300, size=Nsamples)
    porosity_samples = np.random.uniform(low=0.5, high=0.8, size=Nsamples)
    ntg_samples = np.random.uniform(low=0.25, high=0.5, size=Nsamples)
@@ -35,19 +34,19 @@ def test_dask_parralelization_deterministic():
        time_attempt.append(timeit.default_timer() - start)
    print(f"non-parralel simulation took {np.mean(time_attempt):.1f} seconds, {Nsamples/np.mean(time_attempt):.1f} samples per second")

    sample_chunks = [10, 50, 100, 150, 200, 500, 1000]

    sample_chunks = [10,50,100,150,200,500,1000,2000,5000]

    # first run parralelized:
    # first run parallelized:
    for sample_chunk in sample_chunks:
        time_attempt=[]
        for attempt in range(n_attempts):

            start = timeit.default_timer()
            reservoir_properties_chunk = reservoir_properties.chunk({'sample': sample_chunk})
            simulations_parrallel = calculate_doublet_performance(reservoir_properties_chunk, print_execution_duration=False)
            simulations_parallel = calculate_doublet_performance(reservoir_properties, chunk_size=sample_chunk, print_execution_duration=False)
            time_attempt.append(timeit.default_timer() - start)
            xr.testing.assert_allclose(simulation_benchmark, simulations_parrallel)
            xr.testing.assert_equal(simulation_benchmark, simulations_parrallel)

            xr.testing.assert_allclose(simulation_benchmark, simulations_parallel)
            xr.testing.assert_equal(simulation_benchmark, simulations_parallel)
        print(f"parralel simulation, chunk size: {sample_chunk}, took {np.mean(time_attempt):.1f} seconds to run {Nsamples} simulations, {Nsamples/np.mean(time_attempt):.1f} samples per second")

def test_auto_chunking():