TNO Intern

Commit 6c386e69 authored by Hen Brett's avatar Hen Brett 🐔
Browse files

Merge branch '18-use-utcproperties-instead-of-python-dict' into 'main'

Resolve "Use UTCProperties instead of Python dict"

Closes #18

See merge request AGS/pythermogis!21
parents 6d269e4f 4ae9c072
Loading
Loading
Loading
Loading
+35 −0
Original line number Diff line number Diff line
@@ -113,6 +113,41 @@ results = calculate_doublet_performance(input_grids)
print(results)
```

### Running Doublet simulations with custom simulation parameters
To adjust properties of the simulation, you can pass a `utc_properties` instance to the calculate_doublet_performance function.
A `utc_properties` instance is a JClass implementation of the Java UTCProperties class. It is generated by using the `utc_properties_builder`, upon which custom properties can be set, and used to build an instance of the `utc_properties`.

Be aware, that you will need access to the ThermoGIS java source code to fully understand what these properties do and not all properties are actually utilised in this python api (especially those which refer to paths to files).

Common properties to change include:
- `setUseHeatPump(Boolean)`: if true, include a heat-pump when modelling
- `setUseStimulation(Boolean)`: if true, apply reservoir stimulation when simulating a doublet
- `setSurfaceTemperature(float)`: The temperature of the surface
- `setTempGradient(float)`: The gradient at which temperature increases with depth, in C/km
- `setDhReturnTemp(float)`: The goal of the direct heat return temperature
- `setStimKhMax(float)`: The maximum transmissivity above which no stimulation will occur (if UseStimulation = True)

Here is an example, where the default utc_properties is used, but the UseHeatPump option is set to True. this is achievied by instantiating a `utc_properties_builder` class, running `.useHeatPump(True)` on that class, and then building the 
`utc_properties` themselves, with the `.build()` method of the `utc_properties_builder`.
```python
from pythermogis import calculate_doublet_performance, instantiate_utc_properties_builder
import xarray as xr

input_data = xr.Dataset({
    "thickness_mean": ((), 300),
    "thickness_sd": ((), 50),
    "ntg": ((), 0.5),
    "porosity": ((), 0.5),
    "depth": ((), 5000),
    "ln_permeability_mean": ((), 5),
    "ln_permeability_sd": ((), 0.5),
})

utc_properties = instantiate_utc_properties_builder().setUseHeatPump(True).build()
results = calculate_doublet_performance(input_data, utc_properties=utc_properties)
print(results)
```

---

## Contributing
+2 −1
Original line number Diff line number Diff line
from .thermogis_classes.doublet import calculate_doublet_performance
from .thermogis_classes.utc_properties import *
 No newline at end of file
+3 −42
Original line number Diff line number Diff line
@@ -2,44 +2,6 @@ import numpy as np
from scipy import stats
import xarray as xr

def generate_thickness_permeability_transmissivity_for_pvalue(thickness_mean: float, thickness_sd: float, ln_permeability_mean: float, ln_permeability_sd: float, p_values: float, nSamples: int = 10000) -> float:
    """
    Given thickness provided as a normal distribution and ln(permeability) provided as a normal distribution, and a specific p-value, then generate the values for that p-value of the
    -thickness
    -permeability
    -transmissivity

    Note: Transmissivity is a function of ln(permeability) * thickness, and so no analytical solution exists to combine these two probability distributions and so the transmissivity distribution has to be sampled.

    :param thickness_mean:
    :param thickness_sd:
    :param ln_permeability_mean:
    :param ln_permeability_sd:
    :param Pvalue:
    :param nSamples:
    :return:
    thickness, permeability, transmissivity
    """
    if np.isnan(thickness_mean) | np.isnan(thickness_sd) | np.isnan(ln_permeability_mean) | np.isnan(ln_permeability_sd):
        return np.nan, np.nan, np.nan

    p_value_fractions = p_values / 100

    thickness_dist = stats.norm(loc=thickness_mean, scale=thickness_sd)
    thickness_pvalues = thickness_dist.ppf(1 - p_value_fractions)

    ln_permeability_dist = stats.norm(loc=ln_permeability_mean, scale=ln_permeability_sd)
    permeability_pvalues = np.exp(ln_permeability_dist.ppf(1 - p_value_fractions))

    # Sampling method for transmissivity
    thickness_samples = thickness_dist.rvs(nSamples)
    thickness_samples = np.where(thickness_samples < 0.0, 0.1, thickness_samples)   # Ensure no negative thicknesses
    transmissivity_samples = np.sort(np.exp(ln_permeability_dist.rvs(nSamples) + np.log(thickness_samples)))

    transmissivity_pvalues_sampled = transmissivity_samples[int((1 - p_value_fractions) * nSamples)]

    return thickness_pvalues, permeability_pvalues, transmissivity_pvalues_sampled

def generate_thickness_permeability_transmissivity_for_pvalues(thickness_mean: float, thickness_sd: float, ln_permeability_mean: float, ln_permeability_sd: float, p_values: xr.DataArray, nSamples: int = 10000) -> float:
    """
    Given thickness provided as a normal distribution and ln(permeability) provided as a normal distribution, and a specific p-value, then generate the values for that p-value of the
@@ -71,11 +33,10 @@ def generate_thickness_permeability_transmissivity_for_pvalues(thickness_mean: f

    # Sampling method for transmissivity
    thickness_samples = thickness_dist.rvs(nSamples)
    thickness_samples = np.where(thickness_samples < 0.0, 0.1, thickness_samples)   # Ensure no negative thicknesses
    thickness_samples = np.clip(thickness_samples, a_min=0.01, a_max=None)
    transmissivity_samples = np.sort(np.exp(ln_permeability_dist.rvs(nSamples) + np.log(thickness_samples)))

    sample_indexes = p_value_fractions * nSamples
    sample_indexes = sample_indexes.astype(int)
    transmissivity_pvalues_sampled = transmissivity_samples[sample_indexes]
    sample_indexes = np.array(p_value_fractions * nSamples)
    transmissivity_pvalues_sampled = transmissivity_samples[sample_indexes.astype(int)]

    return thickness_pvalues, permeability_pvalues, transmissivity_pvalues_sampled
 No newline at end of file
+39 −109
Original line number Diff line number Diff line
@@ -6,42 +6,23 @@ from jpype import JClass

from pythermogis.physics.temperature_grid_calculation import calculate_temperature_from_gradient
from pythermogis.statistics.calculate_thick_perm_trans import generate_thickness_permeability_transmissivity_for_pvalues
from pythermogis.thermogis_classes.jvm_start import start_jvm
from pythermogis.thermogis_classes.utc_properties import instantiate_utc_properties_builder


def calculate_doublet_performance(input_data: xr.Dataset,
                                  input_params: dict = None,
                                  utc_properties = None,
                                  rng_seed = None,
                                  p_values: List[float] = [50.0]) -> xr.Dataset:
    """
    Perform a ThermoGIS Doublet performance simulation. This will occur across all dimensions of the input_data (ie. input data can have a single value for each required variable, or it can be 1Dimensional or a 2Dimensional grid)

    The default doublet simulation settings are for the ThermoGIS BaseCase; but major input parameters can be set using the input_params variable. This is a dictonary which by default contain the following keys:
    input_params = {"hp_minimum_injection_temperature": 15,
         "return_temperature": 30,
         "surface_temperature": 10,
         "degrees_per_km": 31,
         "max_cooling_temperature_range": 100,
         "stimKhMax": 20,
         "use_stimulation": False,
         "use_heat_pump": False,
         "calculate_cop": True,
         "hp_application_mode": False,
         "hp_direct_heat_input_temp": 70.0,
         "utc_cutoff_shallow": 5.1,
         "utc_cutoff_deep": 6.5,
         "utc_cutoff_depth": 4000.0,
         "rng_seed": np.random.randint(low=0, high=10000)}
    To change only one of these input parameters you can provide a dictionary with a single key to this method, and it will use the default values for the rest of the model parameters.
    e.g. input_params = {"use_stimulation":True}


    :param input_data:
        A xr.Dataset (input_data) with the required variables: "thickness_mean", "thickness_sd", "porosity", "ntg", "depth", "ln_permeability_mean", "ln_permeability_sd", Performance will be calculated across all dimensions.
        Optional Extra parameters: "temperature", "mask".
        If no temperature values are provided, temperature will be calculated using a gradient specified by the input_params dictionary and the depth variable.
        If mask values are provided, then any non-nan values in the mask variable will be set to zero across all variables in the returned output_data object.
    :param input_params:
        A dictionary containing parameters for use by the doublet calculation, If no input_params are provided then the values for the default thermogis scenario are used (the "BaseCase" scenario).
    :param utc_properties:
    :param rng_seed:
    :param p_values:
        A list of p_values for the doublet calculation to perform over; if no p_values are provided then the default value of P50 is used.
    :return output_data:
@@ -54,6 +35,9 @@ def calculate_doublet_performance(input_data: xr.Dataset,
    # Check that all essential variables are provided
    validate_input_data(input_data)

    if utc_properties is None: # Instantiate utc_properties if none is provided
        utc_properties = instantiate_utc_properties_builder().build()

    # convert p_values list to a xarray DataArray; needed to ensure the dimensionality of the calculations
    p_values = xr.DataArray(
        data=p_values,
@@ -62,21 +46,17 @@ def calculate_doublet_performance(input_data: xr.Dataset,
            p_value=(["p_value"], p_values),
        ))

    # Apply provided input_params; or return the base case
    input_params = apply_input_params(input_params)

    # Generate temperature values from gradient if no temperature provided
    if "temperature" not in input_data:
        input_data["temperature"] = calculate_temperature_from_gradient(input_data.depth, input_data.thickness_mean, input_params["degrees_per_km"], input_params["surface_temperature"])
        input_data["temperature"] = calculate_temperature_from_gradient(input_data.depth, input_data.thickness_mean, utc_properties.tempGradient(), utc_properties.surfaceTemperature())

    # If no mask grid is provided, then provide a dummy mask grid with only nan
    if "mask" not in input_data:
        input_data["mask"] = np.nan

    # Setup output_data dataset
    output_data = input_data.thickness_mean.copy().to_dataset(name="thickness")
    output_data = input_data["temperature"].copy().to_dataset(name="temperature")
    output_data = output_data.expand_dims({"p_value": p_values})
    output_data["temperature"] = input_data["temperature"].copy()

    # Calculate Thickness, Permeability and Transmissivity for each P-value
    output_data["thickness"], output_data["permeability"], output_data["transmissivity"] = xr.apply_ufunc(generate_thickness_permeability_transmissivity_for_pvalues,
@@ -93,9 +73,8 @@ def calculate_doublet_performance(input_data: xr.Dataset,
    # Calculate transmissivity scaled by ntg and converted to Dm
    output_data[f"transmissivity_with_ntg"] = (output_data[f"transmissivity"] * input_data.ntg) / 1e3


    # Instantiate ThermoGIS doublet
    doublet = instantiate_thermogis_doublet(input_params)
    doublet = instantiate_thermogis_doublet(utc_properties, rng_seed)

    # Calculate the doublet performance across all dimensions
    output_data_arrays = xr.apply_ufunc(calculate_performance_of_single_location,
@@ -107,7 +86,7 @@ def calculate_doublet_performance(input_data: xr.Dataset,
                                        input_data.temperature,
                                        output_data.transmissivity,
                                        output_data.transmissivity_with_ntg,
                                        kwargs={"doublet": doublet, "input_params": input_params},
                                        kwargs={"doublet": doublet, "utc_properties": utc_properties},
                                        input_core_dims=[[], [], [], [], [], [], [], []],
                                        output_core_dims=[[], [], [], [], [], [], [], [], [], [], [], []],
                                        vectorize=True
@@ -143,41 +122,11 @@ def validate_input_data(input_data: xr.Dataset):
    if len(missing_variables) > 0:
        raise ValueError(f"provided input Dataset does not contain the following required variables: {missing_variables}")


def apply_input_params(input_params: dict) -> dict:
    """
    if input_params is None, return the basecase input_params, if input_params contains keys, then change these keys in the basecase input params to the values provided by the user
    :param input_params:
    :return:
    """
    input_params_basecase = {"hp_minimum_injection_temperature": 15,
                             "return_temperature": 30,
                             "surface_temperature": 10,
                             "degrees_per_km": 31,
                             "max_cooling_temperature_range": 100,
                             "stimKhMax": 20,
                             "use_stimulation": False,
                             "use_heat_pump": False,
                             "calculate_cop": True,
                             "hp_application_mode": False,
                             "hp_direct_heat_input_temp": 70.0,
                             "utc_cutoff_shallow": 5.1,
                             "utc_cutoff_deep": 6.5,
                             "utc_cutoff_depth": 4000.0,
                             "rng_seed": np.random.randint(low=0, high=10000)}

    if input_params is None:  # If no input_params provided, return the basecase
        return input_params_basecase
    else:  # If input_params provided, then go through the keys in the basecase dictionary, and any keys in common can be changed to the user provided input_params
        return {key: input_params.get(key, input_params_basecase[key]) for key in input_params_basecase}




def calculate_performance_of_single_location(mask: float, depth: float, thickness: float, porosity: float, ntg: float, temperature: float, transmissivity: float, transmissivity_with_ntg: float, doublet=None,
                                             input_params: dict = None) -> float:
def calculate_performance_of_single_location(mask: float, depth: float, thickness: float, porosity: float, ntg: float, temperature: float, transmissivity: float, transmissivity_with_ntg: float, doublet: JClass = None ,
                                             utc_properties: JClass = None) -> float:
    """
        Calculate the performance of a doublet at a single location
    :param utc_properties:
    :param mask:
        mask value, if not nan set all output to 0.0
    :param depth:
@@ -196,8 +145,6 @@ def calculate_performance_of_single_location(mask: float, depth: float, thicknes
        transmissivity * ntg of the aquifer
    :param doublet:
        a ThermoGIS doublet class
    :param input_params:
        a dictionary containing extra parameters for running the doublet simulation
    :return:
        the values "power", "heat_pump_power", "capex", "opex", "utc", "npv", "hprod", "cop", "cophp", "pres", "flow_rate", "welld" from the doublet calculation

@@ -209,7 +156,7 @@ def calculate_performance_of_single_location(mask: float, depth: float, thicknes
    if not np.isnan(mask):
        return 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0

    set_doublet_parameters(doublet, transmissivity_with_ntg, depth, porosity, ntg, temperature, input_params)
    set_doublet_parameters(doublet, transmissivity_with_ntg, depth, porosity, ntg, temperature, utc_properties)

    # The Java routine which calculates DoubletPerformance, for more detail on the simulation inspect the Java source code
    doublet.calculateDoubletPerformance(-9999.0, thickness, transmissivity)
@@ -218,10 +165,10 @@ def calculate_performance_of_single_location(mask: float, depth: float, thicknes
        return 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0

    # calculate net-present-value using the utc-cutoffs
    if depth > input_params["utc_cutoff_depth"]:
        utc_cut = input_params["utc_cutoff_deep"]
    if depth > utc_properties.utcDeepDepth():
        utc_cut = utc_properties.utcCutoffDeep()
    else:
        utc_cut = input_params["utc_cutoff_shallow"]
        utc_cut = utc_properties.utcCutoff()

    hprod = doublet.economicalData.getDiscountedHeatProducedP()
    npv = 1e-6 * (utc_cut - doublet.getUtcPeurctkWh()) * 3.6 * hprod * (1 - doublet.economicalData.getTaxRate())
@@ -247,65 +194,48 @@ def calculate_performance_of_single_location(mask: float, depth: float, thicknes
        "cophp"], output_values["pres"], output_values["flow_rate"], output_values["welld"]


def instantiate_thermogis_doublet(input_params: dict):
def instantiate_thermogis_doublet(utc_properties, rng_seed: int = None) -> JClass:
    """
    Instantiate a ThermoGIS Doublet class, with a set random seed if provided
    :param rng_seed:
    :param utc_properties: A UTCProperties instance, with different input parameters
    :param rng_seed: An integer to set the random number generator
    :return:
        doublet, a JPype instantiated object of the ThermoGISDoublet class
    """
    # Start the jvm
    start_jvm()
    # Instantiate doublet class
    Logger = JClass("logging.Logger")
    Mockito = JClass("org.mockito.Mockito")
    RNG = JClass("tno.geoenergy.stochastic.RandomNumberGenerator")
    ThermoGISDoublet = JClass("thermogis.calc.doublet.ThermoGisDoublet")
    UTCPropertiesBuilder = JClass("thermogis.properties.builders.UTCPropertiesBuilder")

    # Instantiate the UTC properties class
    propsBuilder = UTCPropertiesBuilder()
    utc_properties = (propsBuilder
                      .setUseStimulation(input_params["use_stimulation"])
                      .setUseHeatPump(input_params["use_heat_pump"])
                      .setCalculateCop(input_params["calculate_cop"])
                      .setHpApplicationMode(input_params["hp_application_mode"])
                      .setHpDirectHeatInputTemp(input_params["hp_direct_heat_input_temp"])
                      .build())

    # Instantiate random number generator:
    rng = RNG(input_params["rng_seed"])

    # Create an instance of a ThermoGISDoublet
    doublet = ThermoGISDoublet(Mockito.mock(Logger), rng, utc_properties)
    if rng_seed is not None:
        rng = RNG(rng_seed)
    else:
        rng = RNG()

    # Set parameters that do not change across cells
    doublet.doubletCalc1DData.setSurfaceTemperature(input_params["surface_temperature"])
    doublet.doubletCalc1DData.setUseHeatPump(input_params["use_heat_pump"])
    doublet.doubletCalc1DData.setDhReturnTemp(input_params["return_temperature"])
    doublet = ThermoGISDoublet(Mockito.mock(Logger), rng, utc_properties)

    # Set parameters that do not change across simulations
    doublet.doubletCalc1DData.setSurfaceTemperature(utc_properties.surfaceTemperature())
    doublet.doubletCalc1DData.setUseHeatPump(utc_properties.useHeatPump())
    doublet.doubletCalc1DData.setDhReturnTemp(utc_properties.dhReturnTemp())

    return doublet

def set_doublet_parameters(doublet, transmissivity_with_ntg: float, depth: float, porosity: float, ntg: float, temperature: float, input_params: dict):
def set_doublet_parameters(doublet, transmissivity_with_ntg: float, depth: float, porosity: float, ntg: float, temperature: float, utc_properties: JClass):
    """
    For a single location sets the necessary data on the doublet class, to then run a doublet simulation
    :param utc_properties:
    :param doublet:
    :param transmissivity_with_ntg:
    :param depth:
    :param porosity:
    :param ntg:
    :param temperature:
    :param useStimulation:
    :param stimKhMax:
    :param surface_temperature:
    :param return_temperature:
    :param use_heat_pump:
    :param max_cooling_temperature_range:
    :param hp_minimum_injection_temperature:
    :return:
    """
    if not input_params["use_stimulation"] or transmissivity_with_ntg > input_params["stimKhMax"]:
    if not utc_properties.useStimulation() or transmissivity_with_ntg > utc_properties.stimKhMax():
        doublet.setNoStimulation()

    doublet.doubletCalc1DData.setDepth(depth)
@@ -313,10 +243,10 @@ def set_doublet_parameters(doublet, transmissivity_with_ntg: float, depth: float
    doublet.doubletCalc1DData.setNtg(ntg)
    doublet.doubletCalc1DData.setReservoirTemp(temperature)

    if not input_params["use_heat_pump"]:
        doublet.doubletCalc1DData.setInjectionTemp(np.max([temperature - input_params["max_cooling_temperature_range"], input_params["return_temperature"]]))
    elif input_params["use_heat_pump"] and input_params["calculate_cop"] and not input_params["hp_application_mode"]:
        doublet.doubletCalc1DData.setInjectionTemp(doublet.calculateInjectionTempWithHeatPump(temperature, input_params["hp_direct_heat_input_temp"]))
    if not utc_properties.useHeatPump():
        doublet.doubletCalc1DData.setInjectionTemp(np.max([temperature - utc_properties.maxCoolingTempRange(), utc_properties.dhReturnTemp()]))
    elif utc_properties.useHeatPump() and utc_properties.calculateCop() and not utc_properties.hpApplicationMode():
        doublet.doubletCalc1DData.setInjectionTemp(doublet.calculateInjectionTempWithHeatPump(temperature, utc_properties.hpDirectHeatInputTemp()))
    else:
        doublet.doubletCalc1DData.setInjectionTemp(np.max([temperature - input_params["max_cooling_temperature_range"], input_params["hp_minimum_injection_temperature"]]))
        doublet.doubletCalc1DData.setInjectionTemp(np.max([temperature - utc_properties.maxCoolingTempRange(), utc_properties.hpMinimumInjectionTemperature()]))
+2 −3
Original line number Diff line number Diff line
@@ -10,7 +10,7 @@ def get_jvm_path() -> str:
    """
    jvm_path = Path(os.getenv("JAVA_HOME")) / "bin" / "server" / "jvm.dll"
    if not jvm_path.exists():
        raise FileNotFoundError(f"Java executable not found at {jvm_path}")
        raise FileNotFoundError(f"Java executable not found at {jvm_path}, check that you have installed a JVM correctly and set the JAVA_HOME environment variable.")
    return str(jvm_path)


@@ -20,7 +20,7 @@ def get_thermogis_jar_path() -> str:
    """
    thermogis_jar_path = Path(os.getenv("THERMOGIS_JAR"))
    if not thermogis_jar_path.exists():
        raise FileNotFoundError(f"Java executable not found at {thermogis_jar_path}")
        raise FileNotFoundError(f"Jar file not found at {thermogis_jar_path}, check that you have downloaded the ThermoGIS Jar and set the THERMOGIS_JAR environment variable.")
    return str(thermogis_jar_path)


@@ -33,7 +33,6 @@ def start_jvm():
    if not jpype.isJVMStarted():
        jpype.startJVM(get_jvm_path(), classpath=[get_thermogis_jar_path()])


def close_jvm():
    """
    To use the JPype package a java .jar file has to be instantiated with a java virtual machine (jvm)
Loading