TNO Intern

Commit d328828b authored by Florian Knappers's avatar Florian Knappers
Browse files

progress. omg this is chaos

parent ffb756cd
Loading
Loading
Loading
Loading
+56 −2
Original line number Diff line number Diff line
from dataclasses import dataclass

from pythermogis.workflow.utc.utc_properties import UTCConfiguration
from pythermogis.workflow.utc.doublet_utils import calculate_injection_temp_with_heat_pump
from pythermogis.workflow.utc.pressure import calculate_max_pressure
from pythermogis.workflow.utc.doublet_utils import optimize_well_distance, calculate_injection_temp_with_heat_pump, calculate_cooling_temperature
from pythermogis.workflow.utc.pressure import calculate_max_pressure, optimize_pressure



@dataclass
@@ -78,6 +79,59 @@ def calculate_doublet_performance(props: UTCConfiguration, input: DoubletInput)
    if drawdown_pressure == 0:
        return None

    if props.optim_well_dist:
        end_temperature_p = (
                props.optim_dist_cooling_fraction *
                (input.temperature - injection_temperature)
        )
    else:
        end_temperature_p = calculate_cooling_temperature(
            props,
            input,
            drawdown_pressure,
            well_distance,
            injection_temperature,
        )

    if props.optim_well_dist:
        well_distance = optimize_well_distance(
            props,
            input,
            drawdown_pressure,
            injection_temperature,
        )

    stimulation_capex = (
        0.0
        if (not props.use_stimulation or input.transmissivity_with_ntg > props.stim_kh_max)
        else props.stimulation_capex
    )

    pressure_results = optimize_pressure(
        props=props,
        input=input,
        drawdown_pressure=drawdown_pressure,
        well_distance=well_distance,
        injection_temp=injection_temperature,
        stimulation_capex=stimulation_capex,
    )

    if pressure_results is None:
        return None

    heat_power_per_doublet = pressure_results.heat_power_per_doublet
    flowrate = pressure_results.flowrate
    discounted_heat_produced_p = pressure_results.discounted_heat_produced_p
    variable_opex = pressure_results.variable_opex
    fixed_opex = pressure_results.fixed_opex
    total_opex_ts = pressure_results.total_opex_ts
    cop = pressure_results.cop
    drawdown_pressure = pressure_results.drawdown_pressure  # overwrite like Java
    sum_capex = pressure_results.sum_capex
    utc = pressure_results.utc
    hp_added_power = pressure_results.hp_added_power
    production_temp = pressure_results.production_temp

    return DoubletOutput(
        power=0.0,
        hppower=0.0,
+199 −1
Original line number Diff line number Diff line
import math
from numba import njit
from pythermogis.workflow.utc.flow import calculate_volumetric_flow
from pythermogis.workflow.utc.water import density, heat_capacity, get_salinity, get_hydrostatic_pressure

@njit
def calculate_injection_temp_with_heat_pump(
@@ -56,3 +59,198 @@ def get_cop_carnot(eta: float, Tout: float, Tin: float) -> float:
    Tevap = Tin - DHP

    return eta * (Tcond + TKELVIN) / (Tcond - Tevap)


def calculate_cooling_temperature(
    props,
    input,
    drawdown_pressure: float,
    well_distance: float,
    injection_temp: float
) -> float:

    results = calculate_volumetric_flow(
        props=props,
        input_data=input,
        original_pressure=drawdown_pressure,
        well_distance=well_distance,
        injection_temp=injection_temp,
    )

    flowrate = min(results.flowrate, props.max_flow)

    cooling_frac_min = 0.0
    cooling_frac_max = 0.5
    cooling_frac = 0.5 * (cooling_frac_min + cooling_frac_max)

    for _ in range(1000):
        if abs(cooling_frac_max - cooling_frac_min) <= 0.001:
            break

        cooling_frac = 0.5 * (cooling_frac_min + cooling_frac_max)

        lifetime = calc_lifetime(
            well_distance=well_distance,
            thickness=input.thickness * input.ntg,
            delta_temp_fraction=cooling_frac,
            porosity=input.porosity,
            flowrate=flowrate,
            depth=input.depth,
            reservoir_temp=input.temperature,
            salinity_surface=props.salinity_surface,
            salinity_gradient=props.salinity_gradient,
            cp_rock=props.optim_dist_cp_rock,
            rho_rock=props.optim_dist_rho_rock,
        )

        if lifetime < props.optim_dist_lifetime:
            cooling_frac_min = cooling_frac
        else:
            cooling_frac_max = cooling_frac

    if (not props.is_ates) and cooling_frac > 0.3:
        raise RuntimeError(
            f"Large cooling factor ({cooling_frac}) could result in less reliable calculation"
        )

    return cooling_frac * (input.temperature - injection_temp)


def calc_lifetime(
    well_distance: float,
    thickness: float,
    delta_temp_fraction: float,
    porosity: float,
    flowrate: float,
    depth: float,
    reservoir_temp: float,
    salinity_surface: float,
    salinity_gradient: float,
    cp_rock: float,
    rho_rock: float,
) -> float:
    """
    Python equivalent of DoubletUtils.calcLifetime(...)
    """

    # --- Water properties at depth ---
    salinity = get_salinity(salinity_surface, salinity_gradient, depth)
    pressure = get_hydrostatic_pressure(depth)
    rho_water = density(pressure, reservoir_temp, salinity)
    cp_water = heat_capacity(pressure, reservoir_temp, salinity)

    dangle = 0.05 * math.pi / 180.0  # radians
    halfdist = well_distance * 0.5

    angleseg = 0.5 * (2 * math.pi * delta_temp_fraction) + 0.5 * dangle
    radius = halfdist / math.sin(angleseg)

    Aseg1 = 4 * (
        0.5 * angleseg * radius**2
        - 0.5 * radius**2 * math.sin(angleseg) * math.cos(angleseg)
    )

    angleseg = 0.5 * (2 * math.pi * delta_temp_fraction) - 0.5 * dangle
    radius = halfdist / math.sin(angleseg)

    Aseg2 = 4 * (
        0.5 * angleseg * radius**2
        - 0.5 * radius**2 * math.sin(angleseg) * math.cos(angleseg)
    )

    Aseg = Aseg1 - Aseg2
    Vseg = Aseg * thickness

    Eseg = 1e-9 * Vseg * (
        (1 - porosity) * cp_rock * rho_rock +
        porosity * cp_water * rho_water
    )

    flowseg = flowrate * 365 * 24 * dangle / math.pi
    Eflowseg = 1e-9 * flowseg * cp_water * rho_water

    return Eseg / Eflowseg


def optimize_well_distance(
    props,
    input,
    drawdown_pressure: float,
    injection_temp: float,
) -> float:
    """
    Python equivalent of WellDistanceOptimizer.optimize(...)
    """

    dist_min = props.optim_dist_well_dist_min
    dist_max = props.optim_dist_well_dist_max
    dist = 0.5 * (dist_min + dist_max)

    for iter_count in range(1000):
        if abs(dist_max - dist_min) <= 10.0:
            break

        dist = 0.5 * (dist_min + dist_max)

        # --- Compute flow for this distance ---
        results = calculate_volumetric_flow(
            props=props,
            input_data=input,
            original_pressure=drawdown_pressure,
            well_distance=dist,
            injection_temp=injection_temp,
        )

        flowrate = min(results.flowrate, props.max_flow)

        # --- Compute lifetime for this distance ---
        lifetime = calc_lifetime(
            well_distance=dist,
            thickness=input.thickness * input.ntg,
            delta_temp_fraction=props.optim_dist_cooling_fraction,
            porosity=input.porosity,
            flowrate=flowrate,
            depth=input.depth,
            reservoir_temp=input.temperature,
            salinity_surface=props.salinity_surface,
            salinity_gradient=props.salinity_gradient,
            cp_rock=props.optim_dist_cp_rock,
            rho_rock=props.optim_dist_rho_rock,
        )

        # --- Bisection rule ---
        if lifetime < props.optim_dist_lifetime:
            dist_min = dist
        else:
            dist_max = dist

    # If no convergence in 1000 iterations
    else:
        print(f"WARNING: Well distance optimization failed to converge. Final dist={dist}")

    return dist


def get_along_hole_length(
    true_vertical_depth: float,
    well_distance: float,
    curve_scaling_factor: float,
    max_true_vertical_depth_stepout_factor: float,
) -> float:

    if curve_scaling_factor == 0:
        return true_vertical_depth

    stepout = 0.5 * well_distance

    max_stepout = true_vertical_depth * max_true_vertical_depth_stepout_factor

    horizontal_distance = stepout - max_stepout
    if horizontal_distance < 0:
        horizontal_distance = 0.0

    stepout -= horizontal_distance

    oblique_distance = math.sqrt(stepout**2 + true_vertical_depth**2) * curve_scaling_factor

    return oblique_distance + horizontal_distance
 No newline at end of file
+405 −0
Original line number Diff line number Diff line
import numpy as np

from dataclasses import dataclass
from pythermogis.workflow.utc.doublet_utils import get_along_hole_length

SECONDS_IN_YEAR = 365 * 24 * 3600
MJ_TO_GJ = 1e-3
KWH_TO_MJ = 0.36 * 1e9
HOURS_IN_YEAR = 8760

@dataclass
class CapexCalculatorResults:
    sum_capex: float
    total_capex: float
    variable_opex: list[float]
    fixed_opex: list[float]
    total_opex_ts: list[float]
    heat_power_per_year: list[float]

@dataclass
class UTCCalculatorResults:
    discounted_heat_produced: float
    utc: float

@dataclass
class EconomicsResults:
    capex: CapexCalculatorResults
    utc: UTCCalculatorResults

def calculate_economics(
    props,
    input,
    well_distance: float,
    heat_power_produced: list[float],
    stimulation_capex: float,
    hp_cop: float,
    cop_hp_years: list[float] | None,
    season_factor_years: list[float] | None,
    hp_added_power_years: list[float] | None,
    hp_added_power: float,
    pump_power_required: float,
) -> EconomicsResults:


    ah_length_array = [
        get_along_hole_length(
            true_vertical_depth=input.depth,
            well_distance=well_distance,
            curve_scaling_factor=props.well_curv_scaling,
            max_true_vertical_depth_stepout_factor=props.max_tvd_stepout_factor,
        )
    ]

    capex_results = calculate_capex(
        props=props,
        heat_power_produced=heat_power_produced,
        stimulation_capex=stimulation_capex,
        hp_cop=hp_cop,
        hp_cop_years=cop_hp_years,
        season_factor_years=season_factor_years,
        hp_added_power_years=hp_added_power_years,
        initial_hp_added_power=hp_added_power,
        ah_length_array=ah_length_array,
        pump_power_required=pump_power_required,
    )

    utc_results = calculate_utc(
        props=props,
        heat_power_per_year=capex_results.heat_power_per_year,
        total_opex_ts=capex_results.total_opex_ts,
        total_capex=capex_results.total_capex,
    )

    return EconomicsResults(
        capex=capex_results,
        utc=utc_results,
    )

def calculate_capex(
    props,
    heat_power_produced: list[float],
    stimulation_capex: float,
    hp_cop: float,
    hp_cop_years: list[float] | None,
    season_factor_years: list[float] | None,
    hp_added_power_years: list[float] | None,
    initial_hp_added_power: float,
    ah_length_array: list[float],
    pump_power_required: float,
) -> CapexCalculatorResults:

    n_years = props.economic_lifetime

    heat_power_per_year = np.zeros(n_years)
    variable_opex = np.zeros(n_years)
    fixed_opex = np.zeros(n_years)
    total_opex_ts = np.zeros(n_years)
    total_capex_ts = np.zeros(n_years)

    shifted_heat_power = heat_power_produced.copy()
    shift_time_series(shifted_heat_power, props.drilling_time)

    capex_well = calculate_capex_for_wells(props, ah_length_array, stimulation_capex)
    allocate_capex_for_wells(props, total_capex_ts, capex_well)

    hp_added_power = get_max_hp_added_power(initial_hp_added_power, hp_added_power_years)
    hp_after_hp = calculate_hp_added_power_after_heat_pump(props, hp_added_power, hp_cop)
    installation_mw = calculate_installation_mw(shifted_heat_power, hp_added_power)

    total_capex_1year = calculate_total_capex_1year(
        props, total_capex_ts, hp_after_hp, installation_mw
    )

    sum_capex = process_annual_data(
        props,
        shifted_heat_power,
        total_capex_ts,
        heat_power_per_year,
        variable_opex,
        fixed_opex,
        total_opex_ts,
        total_capex_1year,
        installation_mw,
        hp_after_hp,
        hp_cop,
        hp_cop_years,
        season_factor_years,
        pump_power_required,
    )

    total_capex_sum = float(np.sum(total_capex_ts))

    return CapexCalculatorResults(
        sum_capex=float(sum_capex),
        total_capex=float(total_capex_sum),
        variable_opex=list(variable_opex),
        fixed_opex=list(fixed_opex),
        total_opex_ts=list(total_opex_ts),
        heat_power_per_year=list(heat_power_per_year),
    )


def shift_time_series(series: list[float], shift: int):
    n = len(series)
    if shift <= 0 or shift >= n:
        for i in range(n):
            series[i] = 0
    else:
        series[shift:] = series[: n - shift]
        for i in range(shift):
            series[i] = 0

def calculate_capex_for_wells(props, ah_length_array, stimulation_capex):
    inj_depth = ah_length_array[0]
    prod_depth = ah_length_array[0]

    capex = (
        calculate_well_cost(props, inj_depth)
        + calculate_well_cost(props, prod_depth)
        + stimulation_capex
    )
    return capex * props.well_cost_scaling

def calculate_well_cost(props, depth):
    return (
        props.well_cost_z2 * depth**2
        + props.well_cost_z * depth
    ) * 1e-6 + props.well_cost_const

def allocate_capex_for_wells(props, total_capex_ts, capex_well):
    drilling_years = props.drilling_time
    yearly_cost = capex_well / max(drilling_years, 1)

    for y in range(drilling_years):
        total_capex_ts[y] += yearly_cost

def get_max_hp_added_power(initial, hp_power_years):
    if hp_power_years is not None:
        return max(hp_power_years)
    return initial

def calculate_hp_added_power_after_heat_pump(props, hp_added_power, hp_cop):
    alt_price = props.hp_alternative_heating_price
    return hp_added_power if alt_price < 0 else hp_added_power * (1 + 1 / (hp_cop - 1))

def calculate_installation_mw(heat_power_produced, hp_added_power):
    last_val = heat_power_produced[-1]
    return max(last_val - hp_added_power, 0)

def calculate_total_capex_1year(props, total_capex_ts, hp_after_hp, installation_mw):
    last_year = max(props.drilling_time - 1, 0)

    total_capex_ts[last_year] += (
        props.capex_const
        + (props.hp_capex * hp_after_hp + props.capex_variable * installation_mw) * 1e-3
    )

    total_capex_ts[last_year] *= (1 + props.capex_contingency / 100)

    return total_capex_ts[last_year]

def process_annual_data(
    props,
    heat_power_produced,
    total_capex_ts,
    heat_power_per_year,
    variable_opex,
    fixed_opex,
    total_opex_ts,
    total_capex_1year,
    installation_mw,
    hp_after_hp,
    hp_cop,
    hp_cop_years,
    season_factor_years,
    pump_power_required,
):

    sum_capex = 0.0

    for year in range(props.economic_lifetime):

        # Season factor
        season_factor = (
            season_factor_years[year]
            if season_factor_years is not None
            else get_heat_exchanger_season_factor(props)
        )

        # COP for that year
        year_hp_cop = hp_cop_years[year] if hp_cop_years is not None else hp_cop

        # Annual heat (GJ/yr)
        heat_power_per_year[year] = (
            heat_power_produced[year]
            * season_factor
            * SECONDS_IN_YEAR
            * MJ_TO_GJ
        )

        sum_capex += total_capex_ts[year]
        total_opex_ts[year] = 0

        # Skip drilling + negative production years
        if year < props.drilling_time or heat_power_produced[year] < 0:
            continue

        # Variable OPEX
        variable_opex[year] = calculate_variable_opex(
            props,
            season_factor,
            props.elec_purchase_price,
            pump_power_required,
            heat_power_produced[year],
            props.heat_exchanger_parasitic,
            heat_power_per_year[year],
            year_hp_cop,
            hp_after_hp,
        )

        # Fixed OPEX
        fixed_opex[year] = calculate_fixed_opex(
            props,
            hp_after_hp,
            installation_mw,
            total_capex_1year,
        )

        total_opex_ts[year] = variable_opex[year] + fixed_opex[year]

    return sum_capex

def calculate_variable_opex(
    props,
    season_factor,
    elec_price,
    pump_power,
    heat_power_produced,
    heat_exchanger_parasitic,
    heat_power_gj_per_year,
    hp_cop,
    hp_after_hp,
):
    pump_cost = (
        -(pump_power * 1e3)
        * SECONDS_IN_YEAR * season_factor * elec_price
        / KWH_TO_MJ * 1e-6
    )

    parasitic_cost = (
        -(heat_power_produced * heat_exchanger_parasitic * 1e6)
        * SECONDS_IN_YEAR * season_factor * elec_price
        / KWH_TO_MJ * 1e-6
    )

    opex_per_energy = (
        -(1 / 0.36)
        * props.opex_per_energy * 1e-2
        * heat_power_gj_per_year * 1e4 / 36 * 1e-6
    )

    heat_pump_cost = 0.0
    if hp_cop != 0:
        heat_pump_cost = (
            -(hp_after_hp * 1e6 / hp_cop)
            * SECONDS_IN_YEAR * season_factor * elec_price
            / KWH_TO_MJ * 1e-6
        )

    return pump_cost + parasitic_cost + heat_pump_cost + opex_per_energy

def calculate_fixed_opex(props, hp_after_hp, installation_mw, total_capex):
    return (
        -props.opex_base / 1e6
        - (props.hp_opex * hp_after_hp + props.opex_per_power * installation_mw) * 1e-3
        - total_capex * props.opex_per_capex / 100
    )

def get_heat_exchanger_season_factor(props):
    return props.load_hours / HOURS_IN_YEAR


def calculate_utc(
    props,
    heat_power_per_year,
    total_opex_ts,
    total_capex,
) -> UTCCalculatorResults:

    net_cash_worth_eia = calculate_net_cash_worth_eia(props, total_capex)
    present_value = total_capex * props.debt_equity - net_cash_worth_eia
    yearly_payment = calculate_payment_value(props, present_value)
    depreciation_cost = calculate_depreciation_cost(props, total_capex)

    discounted_income = 0.0
    discounted_heat_produced = 0.0

    for year in range(1, props.economic_lifetime):

        inflated_opex = total_opex_ts[year] * ((1 + props.inflation) ** (year - 1))

        future_value = calculate_future_value(
            present_value, yearly_payment, props.interest_loan, year
        )
        interest_payment = -(future_value * props.interest_loan)

        taxable_expenses = inflated_opex + depreciation_cost + interest_payment
        tax_deduction = -(taxable_expenses * props.tax_rate)

        net_income = inflated_opex + yearly_payment + tax_deduction

        yearly_energy = heat_power_per_year[year] * (1 - props.tax_rate)

        discounted_income += net_income / ((1 + props.equity_return) ** year)
        discounted_heat_produced += yearly_energy / ((1 + props.equity_return) ** year)

    utc = calculate_unit_technical_cost(
        props, total_capex, discounted_income, discounted_heat_produced
    )

    if discounted_heat_produced < 0 or utc < 0:
        raise ValueError(
            f"discountedHeatProduced and unitTechnicalCost must be positive. "
            f"Values were: {discounted_heat_produced}, {utc}"
        )

    return UTCCalculatorResults(
        discounted_heat_produced=discounted_heat_produced,
        utc=utc,
    )

def calculate_net_cash_worth_eia(props, total_capex):
    tax_rate = props.tax_rate
    project_interest_rate = calculate_project_interest_rate(props)
    return (tax_rate * (props.ecn_eia * total_capex)) / (1 + project_interest_rate)

def calculate_payment_value(props, present_value):
    if props.interest_loan == 0:
        return 0.0

    factor = (1 + props.interest_loan) ** props.economic_lifetime
    return (props.interest_loan / (factor - 1)) * (-(present_value * factor))

def calculate_depreciation_cost(props, total_capex):
    return -(total_capex / props.economic_lifetime)

def calculate_future_value(present_value, payment, interest, year):
    future_value = present_value * ((1 + interest) ** (year - 1))
    if payment != 0:
        future_value += payment * (((1 + interest) ** (year - 1)) - 1) / interest
    return future_value

def calculate_unit_technical_cost(props, total_capex, discounted_income, discounted_heat_produced):
    if discounted_heat_produced <= 0:
        return 0.0
    return (
        (total_capex * (1 - props.debt_equity) - discounted_income)
        / discounted_heat_produced
    ) * 1e6

def calculate_project_interest_rate(props):
    return (
        (1 - props.tax_rate) * props.interest_loan * props.debt_equity +
        (1 - props.debt_equity) * props.equity_return
    )
+219 −0

File changed.

Preview size limit exceeded, changes collapsed.