TNO Intern

Commit 58a8abb5 authored by Hen Brett's avatar Hen Brett 🐔
Browse files

Merge branch '6-get-the-heatpump-mode-to-work-and-add-a-test' into 'main'

Resolve "Get the heatpump mode to work and add a test"

Closes #6

See merge request AGS/pythermogis!10
parents 3d4be995 6ca6c141
Loading
Loading
Loading
Loading
+1 −2
Original line number Diff line number Diff line
@@ -6,9 +6,8 @@ import xarray as xr

def calculate_doublet_performance(input_data: xr.Dataset,
                                  input_params: dict = None,
                                  rng_seed: int = None,
                                  p_values: List[float] = None) -> xr.Dataset:
    """
    An access point to the calculate_doublet_performance_across_dimensions function in pythermogis.thermogis_classes.doublet, see that method for more detailed instruction
    """
    return calculate_doublet_performance_across_dimensions(input_data, input_params=input_params, rng_seed=rng_seed, p_values=p_values)
    return calculate_doublet_performance_across_dimensions(input_data, input_params=input_params, p_values=p_values)
+142 −118
Original line number Diff line number Diff line
@@ -9,83 +9,8 @@ from pythermogis.statistics.calculate_thick_perm_trans import generate_thickness
from pythermogis.thermogis_classes.java_start import start_jvm


def calculate_performance_of_single_location(mask: float, depth: float, thickness: float, porosity: float, ntg: float, temperature: float, transmissivity: float, transmissivity_with_ntg: float, doublet=None,
                                             input_params: dict = None) -> float:
    """
        Calculate the performance of a doublet at a single location
    :param mask:
        mask value, if not nan set all output to 0.0
    :param depth:
        depth of the aquifer
    :param thickness:
        thickness of the aquifer
    :param porosity:
        porosity of the aquifer
    :param ntg:
        net-to-gross of the aquifer
    :param temperature:
        temperature of the aquifer
    :param transmissivity:
        transmissivity of the aquifer
    :param transmissivity_with_ntg:
        transmissivity * ntg of the aquifer
    :param doublet:
        a ThermoGIS doublet class
    :param input_params:
        a dictionary containing extra parameters for running the doublet simulation
    :return:
        the values "power", "heat_pump_power", "capex", "opex", "utc", "npv", "hprod", "cop", "cophp", "pres", "flow_rate", "welld" from the doublet calculation

    """

    if np.isnan(thickness):
        return np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan

    if not np.isnan(mask):
        return 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0

    set_doublet_parameters(doublet, transmissivity_with_ntg, depth, porosity, ntg, temperature, input_params["use_stimulation"], input_params["stimKhMax"], input_params["surface_temperature"],
                           input_params["return_temperature"], input_params["use_heat_pump"], input_params["max_cooling_temperature_range"], input_params["hp_minimum_injection_temperature"])

    # The Java routine which calculates DoubletPerformance, for more detail on the simulation inspect the Java source code
    doublet.calculateDoubletPerformance(-9999.0, thickness, transmissivity)

    if doublet.getUtcPeurctkWh() == -9999.0:  # If calculation was not successful, return all 0.0
        return 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0

    # calculate net-present-value using the utc-cutoffs
    if depth > input_params["utc_cutoff_depth"]:
        utc_cut = input_params["utc_cutoff_deep"]
    else:
        utc_cut = input_params["utc_cutoff_shallow"]

    hprod = doublet.economicalData.getDiscountedHeatProducedP()
    npv = 1e-6 * (utc_cut - doublet.getUtcPeurctkWh()) * 3.6 * hprod * (1 - doublet.economicalData.getTaxRate())

    # get values from doublet
    output_values = {"power": doublet.doubletCalc1DData.getHpP(),
                     "heat_pump_power": doublet.doubletCalc1DData.getHpPHeatPump(),
                     "capex": doublet.economicalData.getSumcapex(),
                     "opex": doublet.economicalData.getOpexFirstProdYear(),
                     "utc": doublet.getUtcPeurctkWh(),
                     "npv": npv,
                     "hprod": hprod,
                     "cop": doublet.doubletCalc1DData.getCop(),
                     "cophp": doublet.doubletCalc1DData.getCopHpP(),
                     "pres": doublet.doubletCalc1DData.getPresP() / 1e5,
                     "flow_rate": doublet.doubletCalc1DData.getFlowrate(),
                     "welld": doublet.doubletCalc1DData.getWellDistP(),
                     }

    # Reset doublet variables for next calculation
    doublet.setProjectVariables(False, 0.0)
    return output_values["power"], output_values["heat_pump_power"], output_values["capex"], output_values["opex"], output_values["utc"], output_values["npv"], output_values["hprod"], output_values["cop"], output_values[
        "cophp"], output_values["pres"], output_values["flow_rate"], output_values["welld"]


def calculate_doublet_performance_across_dimensions(input_data: xr.Dataset,
                                                    input_params: dict = None,
                                  rng_seed: int =None,
                                                    p_values: List[float] = None) -> xr.Dataset:
    """
    Perform a ThermoGIS Doublet performance assessment. This will occur across all dimensions of the input_data (ie. input data can have a single value for each required variable, or it can be 1Dimensional or a 2Dimensional grid)
@@ -97,8 +22,6 @@ def calculate_doublet_performance_across_dimensions(input_data: xr.Dataset,
        If mask values are provided, then any non-nan values in the mask variable will be set to zero across all variables in the returned output_data object.
    :param input_params:
        A dictionary containing parameters for use by the doublet calculation, If no input_params are provided then the values for the default thermogis scenario are used (the "BaseCase" scenario).
    :param rng_seed:
        A Integer that sets the random seed within the doublet calculation, if not provided then the random number generator picks a seed at random.
    :param p_values:
        A list of p_values for the doublet calculation to perform over; if no p_values are provided then the default value of P50 is used.
    :return output_data:
@@ -111,34 +34,14 @@ def calculate_doublet_performance_across_dimensions(input_data: xr.Dataset,
    """

    # Check that all essential variables are provided
    missing_variables = []
    for variable in ["thickness_mean", "thickness_sd", "porosity", "ntg", "depth", "ln_permeability_mean", "ln_permeability_sd"]:
        if variable not in input_data:
            missing_variables.append(variable)
    if len(missing_variables) > 0:
        raise ValueError(f"provided input Dataset does not contain the following required variables: {missing_variables}")
    validate_input_data(input_data)

    # If no p_values provided, calculate only the P50
    if p_values is None:
        p_values = [50.0]

    # Input parameters
    input_params_basecase = {"hp_minimum_injection_temperature": 15,
                    "return_temperature": 30,
                    "surface_temperature": 10,
                    "degrees_per_km": 31,
                    "max_cooling_temperature_range": 100,
                    "stimKhMax": 20,
                    "use_stimulation": False,
                    "use_heat_pump": False,
                    "utc_cutoff_shallow": 5.1,
                    "utc_cutoff_deep": 6.5,
                    "utc_cutoff_depth": 4000.0}

    if input_params is None:    # If no input_params provided, use the basecase
        input_params = input_params_basecase
    else:
        input_params = {key: input_params.get(key, input_params_basecase[key]) for key in input_params_basecase}
    # Apply provided input_params; or return the base case
    input_params = apply_input_params(input_params)

    # Generate temperature values from gradient if no temperature provided
    if "temperature" not in input_data:
@@ -149,13 +52,14 @@ def calculate_doublet_performance_across_dimensions(input_data: xr.Dataset,
        input_data["mask"] = np.nan

    # Instantiate ThermoGIS doublet
    doublet = instantiate_thermogis_doublet(rng_seed=rng_seed)
    doublet = instantiate_thermogis_doublet(input_params)

    # Setup output_data dataset
    output_data = input_data.thickness_mean.copy().to_dataset(name="thickness")
    output_data = output_data.expand_dims({"p_value": p_values})

    # Calculate Thickness, Permeability and Transmissivity for each P-value
    # TODO: Find a less ugly way to handle all these outputs than instantiating different arrays; probably by setting the output_core_dims in the apply_ufunc
    thickness_data = []
    permeability_data = []
    transmissivity_data = []
@@ -192,6 +96,7 @@ def calculate_doublet_performance_across_dimensions(input_data: xr.Dataset,
    pres_data = []
    flow_rate_data = []
    welld_data = []
    # TODO: Find a less ugly way to handle all these outputs than instantiating different arrays; probably by setting the output_core_dims in the apply_ufunc
    for i, p in enumerate(p_values):
        output_data_arrays = xr.apply_ufunc(calculate_performance_of_single_location,
                                            input_data.mask,
@@ -239,7 +144,49 @@ def calculate_doublet_performance_across_dimensions(input_data: xr.Dataset,
    return output_data


def instantiate_thermogis_doublet(rng_seed=None):
def validate_input_data(input_data: xr.Dataset):
    """
    Ensure that the input_data Dataset contains the minimum required variables
    :param input_data:
    :return:
    """
    missing_variables = []
    for variable in ["thickness_mean", "thickness_sd", "porosity", "ntg", "depth", "ln_permeability_mean", "ln_permeability_sd"]:
        if variable not in input_data:
            missing_variables.append(variable)
    if len(missing_variables) > 0:
        raise ValueError(f"provided input Dataset does not contain the following required variables: {missing_variables}")


def apply_input_params(input_params: dict) -> dict:
    """
    if input_params is None, return the basecase input_params, if input_params contains keys, then change these keys in the basecase input params to the values provided by the user
    :param input_params:
    :return:
    """
    input_params_basecase = {"hp_minimum_injection_temperature": 15,
                             "return_temperature": 30,
                             "surface_temperature": 10,
                             "degrees_per_km": 31,
                             "max_cooling_temperature_range": 100,
                             "stimKhMax": 20,
                             "use_stimulation": False,
                             "use_heat_pump": False,
                             "calculate_cop": True,
                             "hp_application_mode": False,
                             "hp_direct_heat_input_temp": 70.0,
                             "utc_cutoff_shallow": 5.1,
                             "utc_cutoff_deep": 6.5,
                             "utc_cutoff_depth": 4000.0,
                             "rng_seed": np.random.randint(low=0, high=10000)}

    if input_params is None:  # If no input_params provided, return the basecase
        return input_params_basecase
    else:  # If input_params provided, then go through the keys in the basecase dictionary, and any keys in common can be changed to the user provided input_params
        return {key: input_params.get(key, input_params_basecase[key]) for key in input_params_basecase}


def instantiate_thermogis_doublet(input_params):
    """
    Instantiate a ThermoGIS Doublet class, with a set random seed if provided
    :param rng_seed:
@@ -257,21 +204,96 @@ def instantiate_thermogis_doublet(rng_seed=None):

    # Instantiate the UTC properties class
    propsBuilder = UTCPropertiesBuilder()
    utc_properties = propsBuilder.build()
    utc_properties = (propsBuilder
                      .setUseStimulation(input_params["use_stimulation"])
                      .setUseHeatPump(input_params["use_heat_pump"])
                      .setCalculateCop(input_params["calculate_cop"])
                      .setHpApplicationMode(input_params["hp_application_mode"])
                      .setHpDirectHeatInputTemp(input_params["hp_direct_heat_input_temp"])
                      .build())

    # Instantiate random number generator:
    if rng_seed is None:
        rng = RNG()
    else:
        rng = RNG(rng_seed)
    rng = RNG(input_params["rng_seed"])

    # Create an instance of a ThermoGISDoublet
    doublet = ThermoGISDoublet(Mockito.mock(Logger), rng, utc_properties)
    return doublet


def set_doublet_parameters(doublet, transmissivity_with_ntg, depth, porosity, ntg, temperature, useStimulation, stimKhMax, surface_temperature, return_temperature, use_heat_pump, max_cooling_temperature_range,
                           hp_minimum_injection_temperature):
def calculate_performance_of_single_location(mask: float, depth: float, thickness: float, porosity: float, ntg: float, temperature: float, transmissivity: float, transmissivity_with_ntg: float, doublet=None,
                                             input_params: dict = None) -> float:
    """
        Calculate the performance of a doublet at a single location
    :param mask:
        mask value, if not nan set all output to 0.0
    :param depth:
        depth of the aquifer
    :param thickness:
        thickness of the aquifer
    :param porosity:
        porosity of the aquifer
    :param ntg:
        net-to-gross of the aquifer
    :param temperature:
        temperature of the aquifer
    :param transmissivity:
        transmissivity of the aquifer
    :param transmissivity_with_ntg:
        transmissivity * ntg of the aquifer
    :param doublet:
        a ThermoGIS doublet class
    :param input_params:
        a dictionary containing extra parameters for running the doublet simulation
    :return:
        the values "power", "heat_pump_power", "capex", "opex", "utc", "npv", "hprod", "cop", "cophp", "pres", "flow_rate", "welld" from the doublet calculation

    """

    if np.isnan(thickness):
        return np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan

    if not np.isnan(mask):
        return 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0

    set_doublet_parameters(doublet, transmissivity_with_ntg, depth, porosity, ntg, temperature, input_params)

    # The Java routine which calculates DoubletPerformance, for more detail on the simulation inspect the Java source code
    doublet.calculateDoubletPerformance(-9999.0, thickness, transmissivity)

    if doublet.getUtcPeurctkWh() == -9999.0:  # If calculation was not successful, return all 0.0
        return 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0

    # calculate net-present-value using the utc-cutoffs
    if depth > input_params["utc_cutoff_depth"]:
        utc_cut = input_params["utc_cutoff_deep"]
    else:
        utc_cut = input_params["utc_cutoff_shallow"]

    hprod = doublet.economicalData.getDiscountedHeatProducedP()
    npv = 1e-6 * (utc_cut - doublet.getUtcPeurctkWh()) * 3.6 * hprod * (1 - doublet.economicalData.getTaxRate())

    # get values from doublet
    output_values = {"power": doublet.doubletCalc1DData.getHpP(),
                     "heat_pump_power": doublet.doubletCalc1DData.getHpPHeatPump(),
                     "capex": doublet.economicalData.getSumcapex(),
                     "opex": doublet.economicalData.getOpexFirstProdYear(),
                     "utc": doublet.getUtcPeurctkWh(),
                     "npv": npv,
                     "hprod": hprod,
                     "cop": doublet.doubletCalc1DData.getCop(),
                     "cophp": doublet.doubletCalc1DData.getCopHpP(),
                     "pres": doublet.doubletCalc1DData.getPresP() / 1e5,
                     "flow_rate": doublet.doubletCalc1DData.getFlowrate(),
                     "welld": doublet.doubletCalc1DData.getWellDistP(),
                     }

    # Reset doublet variables for next calculation
    doublet.setProjectVariables(False, 0.0)
    return output_values["power"], output_values["heat_pump_power"], output_values["capex"], output_values["opex"], output_values["utc"], output_values["npv"], output_values["hprod"], output_values["cop"], output_values[
        "cophp"], output_values["pres"], output_values["flow_rate"], output_values["welld"]


def set_doublet_parameters(doublet, transmissivity_with_ntg, depth, porosity, ntg, temperature, input_params):
    """
    For a single location sets the necessary data on the doublet class, to then run a doublet simulation
    :param doublet:
@@ -289,20 +311,22 @@ def set_doublet_parameters(doublet, transmissivity_with_ntg, depth, porosity, nt
    :param hp_minimum_injection_temperature:
    :return:
    """
    if not useStimulation or transmissivity_with_ntg > stimKhMax:
    if not input_params["use_stimulation"] or transmissivity_with_ntg > input_params["stimKhMax"]:
        doublet.setNoStimulation()

    doublet.doubletCalc1DData.setDepth(depth)
    doublet.doubletCalc1DData.setPorosity(porosity)
    doublet.doubletCalc1DData.setNtg(ntg)
    doublet.doubletCalc1DData.setSurfaceTemperature(surface_temperature)
    doublet.doubletCalc1DData.setSurfaceTemperature(input_params["surface_temperature"])
    doublet.doubletCalc1DData.setReservoirTemp(temperature)
    doublet.doubletCalc1DData.setUseHeatPump(use_heat_pump)
    doublet.doubletCalc1DData.setUseHeatPump(input_params["use_heat_pump"])

    if use_heat_pump:
        injectionTemp = np.max([temperature - max_cooling_temperature_range, hp_minimum_injection_temperature])
    if not input_params["use_heat_pump"]:
        injectionTemp = np.max([temperature - input_params["max_cooling_temperature_range"], input_params["return_temperature"]])
    elif input_params["use_heat_pump"] and input_params["calculate_cop"] and not input_params["hp_application_mode"]:
        injectionTemp = doublet.calculateInjectionTempWithHeatPump(temperature, input_params["hp_direct_heat_input_temp"])
    else:
        injectionTemp = np.max([temperature - max_cooling_temperature_range, return_temperature])
        injectionTemp = np.max([temperature - input_params["max_cooling_temperature_range"], input_params["hp_minimum_injection_temperature"]])

    doublet.doubletCalc1DData.setInjectionTemp(injectionTemp)
    doublet.doubletCalc1DData.setDhReturnTemp(return_temperature)
    doublet.doubletCalc1DData.setDhReturnTemp(input_params["return_temperature"])
+2.31 KiB

File added.

No diff preview for this file type.

+2.31 KiB

File added.

No diff preview for this file type.

+2.31 KiB

File added.

No diff preview for this file type.

Loading