Source code for message_ix_models.model.material.util

from pathlib import Path
from typing import Any, Iterable, Literal

import message_ix
import openpyxl as pxl
import pandas as pd
from scipy.optimize import curve_fit

from message_ix_models import Context, ScenarioInfo
from message_ix_models.util import load_package_data, nodes_ex_world, package_data_path

# Configuration files
METADATA = [
    # ("material", "config"),
    ("material", "set"),
    # ("material", "technology"),
]


[docs] def read_config() -> Context: """Read configuration from set.yaml. Returns ------- message_ix_models.Context Context object holding information about MESSAGEix-Materials structure """ # TODO this is similar to transport.utils.read_config; make a common # function so it doesn't need to be in this file. context = Context.get_instance(-1) if "material set" in context: # Already loaded return context # Load material configuration for parts in METADATA: # Key for storing in the context key = " ".join(parts) # Actual filename parts; ends with YAML _parts = list(parts) _parts[-1] += ".yaml" context[key] = load_package_data(*_parts) # Use a shorter name context["material"] = context["material set"] return context
[docs] def prepare_xlsx_for_explorer(filepath: str) -> None: """Post-processing to make timeseries comply with IIASA Scenario Explorer standard. Parameters ---------- filepath Path to xlsx files generated with message_ix_models.report.legacy """ df = pd.read_excel(filepath) def add_R12(str: str) -> str: return "R12_" + str if len(str) < 5 else str df = df[~df["Region"].isna()] df["Region"] = df["Region"].map(add_R12) df.to_excel(filepath, index=False)
[docs] def invert_dictionary(original_dict: dict[str, list[str]]) -> dict[str, list[str]]: """Create inverted dictionary from existing dictionary, where values turn into keys and vice versa Parameters ---------- original_dict: dict dictionary with values of list type Returns ------- dict """ inverted_dict: dict[str, list[str]] = {} for key, value in original_dict.items(): for array_element in value: if array_element not in inverted_dict: inverted_dict[array_element] = [] inverted_dict[array_element].append(key) return inverted_dict
[docs] def remove_from_list_if_exists(element: Any, _list: list) -> None: """Utility function removing element from list if it is part of the list Parameters ---------- element element to remove _list list that potentially contains element """ if element in _list: _list.remove(element)
[docs] def exponential(x: float | list[float], b: float, m: float) -> float: """Emulates Excels GROWTH function Parameters ---------- x: domain of function b function parameter b m function parameter m Returns ------- float | list[float] function value for given b, m and x """ return b * m**x
[docs] def price_fit(df: pd.DataFrame) -> float: """``price_ref`` parameter estimation emulation in MACRO calibration files. Parameters ---------- df DataFrame with required columns: "year" and "lvl" Returns ------- float estimated value for price_ref in 2020 """ if 2025 in df["year"].values: if df[df["year"] == 2025]["lvl"].gt(0.5).all(): val = df[df["year"] == 2025]["lvl"].values[0].round(3) return val pars = curve_fit(exponential, df.year, df.lvl, maxfev=10000)[0] val = exponential([2020], *pars)[0] return val
[docs] def cost_fit(df: pd.DataFrame) -> float: """Python implementation of cost_ref parameter estimation. Originally implemented in Excel files for MACRO calibration. Parameters ---------- df: pd.DataFrame DataFrame with required columns: "year" and "lvl" Returns ------- float estimated value for cost_ref in 2020 """ try: pars = curve_fit(exponential, df.year, df.lvl, maxfev=5000)[0] val = exponential([2020], *pars)[0] except RuntimeError: val = df.lvl.values[0] # print(df.node.unique(), val) return val / 1000
[docs] def update_macro_calib_file( scenario: message_ix.Scenario, fname: str, extrapolate=True ) -> None: """Function to automate manual steps in MACRO calibration. Tries to open a xlsx file with the given "fname" and writes ``cost_ref`` and ``price_ref`` values derived from scenario variables ``COST_NODAL_NET`` and ``PRICE_COMMODITY`` to the respective xlsx sheets. Parameters ---------- scenario Scenario instance to be calibrated fname file name of MACRO file used for calibration """ # Change this according to the relevant data path path = package_data_path("material", "macro", fname) wb = pxl.load_workbook(path) fmy = scenario.firstmodelyear nodes = [ "R12_AFR", "R12_CHN", "R12_EEU", "R12_FSU", "R12_LAM", "R12_MEA", "R12_NAM", "R12_PAO", "R12_PAS", "R12_RCPA", "R12_SAS", "R12_WEU", ] # cost_ref years_cost = [i for i in range(fmy, fmy + 15, 5)] df = scenario.var( "COST_NODAL_NET", filters={"year": years_cost, "node": nodes_ex_world(ScenarioInfo(scenario).N)}, ) if extrapolate: df["node"] = pd.Categorical(df["node"], nodes) df = df[df["year"].isin(years_cost)].groupby(["node"]).apply(cost_fit) ws = wb["cost_ref"] for i in range(2, 14): ws[f"A{i}"].value = nodes[i - 2] ws[f"B{i}"].value = df.values[i - 2] else: vals = df[df["year"] == fmy + 5]["lvl"].values nodes = df["node"].values ws = wb["cost_ref"] for i in range(2, 14): ws[f"A{i}"].value = nodes ws[f"B{i}"].value = (vals[i - 2] / 1000).round(3) # price_ref comms = ["i_feed", "i_spec", "i_therm", "rc_spec", "rc_therm", "transport"] years_price = [i for i in range(fmy, 2055, 5)] df = scenario.var( "PRICE_COMMODITY", filters={"commodity": comms, "year": years_price} ) df["node"] = pd.Categorical(df["node"], nodes) df["commodity"] = pd.Categorical(df["commodity"], comms) df = df.groupby(["node", "commodity"]).apply(price_fit) ws = wb["price_ref"] for i in range(2, 62): ws[f"A{i}"].value = df.index.get_level_values(0).values[i - 2] ws[f"B{i}"].value = df.index.get_level_values(1).values[i - 2] ws[f"C{i}"].value = df.values[i - 2] # demand_ref df = scenario.par("demand", filters={"commodity": comms, "year": fmy}) ws = wb["demand_ref"] for i in range(2, 62): ws[f"A{i}"].value = df.node.values[i - 2] ws[f"B{i}"].value = df.commodity.values[i - 2] ws[f"C{i}"].value = df.value.values[i - 2] ws = wb["gdp_calibrate"] gdp = scenario.par("bound_activity_up", filters={"technology": "GDP"}) gdp = gdp[gdp["year_act"] >= 2015].sort_values(["node_loc", "year_act"]) for i in range(2, len(gdp.index) + 2): ws[f"A{i}"].value = gdp.year_act.values[i - 2] ws[f"B{i}"].value = gdp.node_loc.values[i - 2] ws[f"C{i}"].value = gdp.value.values[i - 2] wb.save(path)
[docs] def get_ssp_from_context( context: Context, ) -> Literal["SSP1", "SSP2", "SSP3", "SSP4", "SSP5", "LED"]: """Get selected SSP from context Parameters ---------- context Returns ------- SSP label """ return "SSP2" if "ssp" not in context else context["ssp"]
[docs] def maybe_remove_water_tec(scenario: message_ix.Scenario, results: dict) -> None: """Helper to rename deprecated water supply technology name. Parameters ---------- scenario results """ if len(scenario.par("output", filters={"technology": "extract_surfacewater"})): results["input"] = results["input"].replace({"freshwater_supply": "freshwater"})
[docs] def path_fallback(context_or_regions: Context | str, *parts) -> Path: """Return a :class:`.Path` constructed from `parts`. If ``context.model.regions`` (or a string value as the first argument) is defined and the file exists in a subdirectory of :file:`data/transport/{regions}/`, return its path; otherwise, return the path in :file:`data/transport/`. """ if isinstance(context_or_regions, str): regions = context_or_regions else: # Use a value from a Context object, or a default regions = context_or_regions.model.regions candidates = ( package_data_path("material", regions, *parts), package_data_path("material", *parts), ) for c in candidates: if c.exists(): return c raise FileNotFoundError(candidates)
[docs] def add_region_column( df: pd.DataFrame, parts: Iterable[str], iso_column: str = "COUNTRY" ) -> pd.Series: """Convenience function to add R12 region column to dataframe.""" yaml_data = load_package_data(*parts) if "World" in yaml_data: yaml_data.pop("World") r12_map = {k: v["child"] for k, v in yaml_data.items()} r12_map_inv = {k: v[0] for k, v in invert_dictionary(r12_map).items()} return df[iso_column].map(r12_map_inv)