TNO Intern

Commit 2f3fe597 authored by Florian Knappers's avatar Florian Knappers
Browse files

add stochastic option to TGDoublet

parent 97a25617
Loading
Loading
Loading
Loading
+63 −25
Original line number Diff line number Diff line
@@ -12,18 +12,48 @@ from pythermogis.pytg3.typing import FloatOrArray
logger = logging.getLogger(__name__)


class Aquifer(BaseModel):
class BaseAquifer(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)

    thickness: FloatOrArray
    porosity: FloatOrArray
    ntg: FloatOrArray
    depth: FloatOrArray
    permeability: FloatOrArray | None = None
    transmissivity: FloatOrArray | None = None
    temperature: FloatOrArray | None = None
    mask: FloatOrArray | None = None

    @field_validator("porosity", "ntg", mode="after")
    @classmethod
    def check_between_0_and_1(cls, value: FloatOrArray, info) -> FloatOrArray:
        if isinstance(value, xr.DataArray):
            if (value < 0.0).any() or (value > 1.0).any():
                raise ValueError(
                    f"{info.field_name} must be between 0.0 and 1.0 "
                    f"(100% {info.field_name} = 1.0)"
                )
        elif not (0.0 <= value <= 1.0):
            raise ValueError(
                f"{info.field_name} must be between 0.0 and 1.0 "
                f"(100% {info.field_name} = 1.0)"
            )
        return value

    @model_validator(mode="after")
    def set_default_mask(self) -> BaseAquifer:
        if self.mask is None:
            self.mask = np.nan
        return self

    def calculate_temperature(
        self, temperature_gradient: float, surface_temperature: float
    ) -> None:
        raise NotImplementedError


class Aquifer(BaseAquifer):
    thickness: FloatOrArray
    permeability: FloatOrArray | None = None
    transmissivity: FloatOrArray | None = None

    @model_validator(mode="after")
    def check_permeability_or_transmissivity(self) -> Aquifer:
        has_perm = self.permeability is not None
@@ -62,37 +92,45 @@ class Aquifer(BaseModel):
            )
        return value

    @field_validator("porosity", "ntg", mode="after")
    @model_validator(mode="after")
    def calculate_transmissivity(self) -> Aquifer:
        if self.transmissivity is None:
            self.transmissivity = self.permeability * self.thickness
        return self

    def calculate_temperature(
        self, temperature_gradient: float, surface_temperature: float
    ) -> None:
        self.temperature = calculate_temperature_from_gradient(
            self.depth, self.thickness, temperature_gradient, surface_temperature
        )


class StochasticAquifer(BaseAquifer):
    thickness_mean: FloatOrArray
    thickness_sd: FloatOrArray
    ln_permeability_mean: FloatOrArray
    ln_permeability_sd: FloatOrArray

    @field_validator("thickness_mean", "thickness_sd", mode="after")
    @classmethod
    def check_between_0_and_1(cls, value: FloatOrArray, info) -> FloatOrArray:
    def check_non_negative(cls, value: FloatOrArray, info) -> FloatOrArray:
        if isinstance(value, xr.DataArray):
            if (value < 0.0).any() or (value > 1.0).any():
            if (value < 0.0).any():
                raise ValueError(
                    f"{info.field_name} must be between 0.0 and 1.0 "
                    f"(100% {info.field_name} = 1.0)"
                    f"{info.field_name} must always be >= 0.0; "
                    f"negative values are not allowed"
                )
        elif not (0.0 <= value <= 1.0):
        elif value < 0.0:
            raise ValueError(
                f"{info.field_name} must be between 0.0 and 1.0 "
                f"(100% {info.field_name} = 1.0)"
                f"{info.field_name} must always be >= 0.0; "
                f"negative values are not allowed"
            )
        return value

    @model_validator(mode="after")
    def set_default_mask(self) -> Aquifer:
        if self.mask is None:
            self.mask = np.nan
        return self

    @model_validator(mode="after")
    def calculate_transmissivity(self) -> Aquifer:
        if self.transmissivity is None:
            self.transmissivity = self.permeability * self.thickness
        return self

    def calculate_temperature(
        self, temperature_gradient: float, surface_temperature: float
    ) -> None:
        self.temperature = calculate_temperature_from_gradient(
            self.depth, self.thickness, temperature_gradient, surface_temperature
            self.depth, self.thickness_mean, temperature_gradient, surface_temperature
        )
+49 −11
Original line number Diff line number Diff line
from __future__ import annotations

import logging
import timeit
from typing import TYPE_CHECKING

import numpy as np
import xarray as xr
@@ -11,14 +8,13 @@ from pydantic import BaseModel, ConfigDict

from pythermogis.dask import auto_chunk_dataset
from pythermogis.mock import create_logger_mock
from pythermogis.pytg3.aquifer import Aquifer, StochasticAquifer
from pythermogis.pytg3.settings import Settings
from pythermogis.pytg3.typing import FloatOrArray
from pythermogis.transmissivity import generate_thickness_permeability_transmissivity_for_pvalues

logger = logging.getLogger(__name__)

if TYPE_CHECKING:
    from pythermogis.pytg3.aquifer import Aquifer


class ThermoGISDoubletResults(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True, frozen=True)
@@ -45,7 +41,7 @@ class ThermoGISDoubletResults(BaseModel):

class ThermoGISDoublet:

    def __init__(self, aquifer: Aquifer, settings: Settings | None = None):
    def __init__(self, aquifer: Aquifer | StochasticAquifer, settings: Settings | None = None):
        self.aquifer = aquifer
        self.settings = settings

@@ -59,18 +55,38 @@ class ThermoGISDoublet:
            )


    def simulate(self, mask_value: float = np.nan, chunk_size: int | None = None) -> ThermoGISDoubletResults:
    def simulate(
        self,
        p_values: list[float] | None = None,
        mask_value: float | None = None,
        chunk_size: int | None = None,
    ) -> ThermoGISDoubletResults:
        start = timeit.default_timer()
        transmissivity_with_ntg = (self.aquifer.transmissivity * self.aquifer.ntg) / 1e3

        if p_values is not None and not isinstance(self.aquifer, StochasticAquifer):
            raise TypeError("p_values requires a StochasticAquifer")
        if p_values is None and isinstance(self.aquifer, StochasticAquifer):
            raise TypeError("StochasticAquifer requires p_values")

        if mask_value is None:
            mask_value = 0.0 if p_values is not None else np.nan

        if p_values is not None:
            thickness, transmissivity = self._expand_p_values(p_values)
        else:
            thickness = self.aquifer.thickness
            transmissivity = self.aquifer.transmissivity

        transmissivity_with_ntg = (transmissivity * self.aquifer.ntg) / 1e3

        fields = [
            self.aquifer.mask,
            self.aquifer.depth,
            self.aquifer.thickness,
            thickness,
            self.aquifer.porosity,
            self.aquifer.ntg,
            self.aquifer.temperature,
            self.aquifer.transmissivity,
            transmissivity,
            transmissivity_with_ntg,
        ]

@@ -119,6 +135,28 @@ class ThermoGISDoublet:
            prd_temp=output_arrays[13],
            transmissivity_with_ntg=transmissivity_with_ntg,
        )
    def _expand_p_values(self, p_values: list[float]) -> tuple[xr.DataArray, xr.DataArray]:
        if not isinstance(self.aquifer, StochasticAquifer):
            raise TypeError("p_values requires a StochasticAquifer")

        p_values_da = xr.DataArray(
            data=p_values, dims=["p_value"], coords={"p_value": p_values}
        )

        thickness, _, transmissivity = xr.apply_ufunc(
            generate_thickness_permeability_transmissivity_for_pvalues,
            self.aquifer.thickness_mean,
            self.aquifer.thickness_sd,
            self.aquifer.ln_permeability_mean,
            self.aquifer.ln_permeability_sd,
            p_values_da,
            input_core_dims=[[], [], [], [], ["p_value"]],
            output_core_dims=[["p_value"], ["p_value"], ["p_value"]],
            vectorize=True,
            dask="parallelized",
        )

        return thickness, transmissivity


def _run_doublet_at_location(