from pathlib import Path
from typing import Any, Iterable, Literal
import message_ix
import openpyxl as pxl
import pandas as pd
from scipy.optimize import curve_fit
from message_ix_models import Context, ScenarioInfo
from message_ix_models.util import load_package_data, nodes_ex_world, package_data_path
# Configuration files
METADATA = [
# ("material", "config"),
("material", "set"),
# ("material", "technology"),
]
[docs]
def read_config() -> Context:
"""Read configuration from set.yaml.
Returns
-------
message_ix_models.Context
Context object holding information about MESSAGEix-Materials structure
"""
# TODO this is similar to transport.utils.read_config; make a common
# function so it doesn't need to be in this file.
context = Context.get_instance(-1)
if "material set" in context:
# Already loaded
return context
# Load material configuration
for parts in METADATA:
# Key for storing in the context
key = " ".join(parts)
# Actual filename parts; ends with YAML
_parts = list(parts)
_parts[-1] += ".yaml"
context[key] = load_package_data(*_parts)
# Use a shorter name
context["material"] = context["material set"]
return context
[docs]
def prepare_xlsx_for_explorer(filepath: str) -> None:
"""Post-processing to make timeseries comply with IIASA Scenario Explorer standard.
Parameters
----------
filepath
Path to xlsx files generated with message_ix_models.report.legacy
"""
df = pd.read_excel(filepath)
def add_R12(str: str) -> str:
return "R12_" + str if len(str) < 5 else str
df = df[~df["Region"].isna()]
df["Region"] = df["Region"].map(add_R12)
df.to_excel(filepath, index=False)
[docs]
def invert_dictionary(original_dict: dict[str, list[str]]) -> dict[str, list[str]]:
"""Create inverted dictionary from existing dictionary, where values turn
into keys and vice versa
Parameters
----------
original_dict: dict
dictionary with values of list type
Returns
-------
dict
"""
inverted_dict: dict[str, list[str]] = {}
for key, value in original_dict.items():
for array_element in value:
if array_element not in inverted_dict:
inverted_dict[array_element] = []
inverted_dict[array_element].append(key)
return inverted_dict
[docs]
def remove_from_list_if_exists(element: Any, _list: list) -> None:
"""Utility function removing element from list if it is part of the list
Parameters
----------
element
element to remove
_list
list that potentially contains element
"""
if element in _list:
_list.remove(element)
[docs]
def exponential(x: float | list[float], b: float, m: float) -> float:
"""Emulates Excels GROWTH function
Parameters
----------
x:
domain of function
b
function parameter b
m
function parameter m
Returns
-------
float | list[float]
function value for given b, m and x
"""
return b * m**x
[docs]
def price_fit(df: pd.DataFrame) -> float:
"""``price_ref`` parameter estimation emulation in MACRO calibration files.
Parameters
----------
df
DataFrame with required columns: "year" and "lvl"
Returns
-------
float
estimated value for price_ref in 2020
"""
if 2025 in df["year"].values:
if df[df["year"] == 2025]["lvl"].gt(0.5).all():
val = df[df["year"] == 2025]["lvl"].values[0].round(3)
return val
pars = curve_fit(exponential, df.year, df.lvl, maxfev=10000)[0]
val = exponential([2020], *pars)[0]
return val
[docs]
def cost_fit(df: pd.DataFrame) -> float:
"""Python implementation of cost_ref parameter estimation.
Originally implemented in Excel files for MACRO calibration.
Parameters
----------
df: pd.DataFrame
DataFrame with required columns: "year" and "lvl"
Returns
-------
float
estimated value for cost_ref in 2020
"""
try:
pars = curve_fit(exponential, df.year, df.lvl, maxfev=5000)[0]
val = exponential([2020], *pars)[0]
except RuntimeError:
val = df.lvl.values[0]
# print(df.node.unique(), val)
return val / 1000
[docs]
def update_macro_calib_file(
scenario: message_ix.Scenario, fname: str, extrapolate=True
) -> None:
"""Function to automate manual steps in MACRO calibration.
Tries to open a xlsx file with the given "fname" and writes ``cost_ref`` and
``price_ref`` values derived from scenario variables ``COST_NODAL_NET`` and
``PRICE_COMMODITY`` to the respective xlsx sheets.
Parameters
----------
scenario
Scenario instance to be calibrated
fname
file name of MACRO file used for calibration
"""
# Change this according to the relevant data path
path = package_data_path("material", "macro", fname)
wb = pxl.load_workbook(path)
fmy = scenario.firstmodelyear
nodes = [
"R12_AFR",
"R12_CHN",
"R12_EEU",
"R12_FSU",
"R12_LAM",
"R12_MEA",
"R12_NAM",
"R12_PAO",
"R12_PAS",
"R12_RCPA",
"R12_SAS",
"R12_WEU",
]
# cost_ref
years_cost = [i for i in range(fmy, fmy + 15, 5)]
df = scenario.var(
"COST_NODAL_NET",
filters={"year": years_cost, "node": nodes_ex_world(ScenarioInfo(scenario).N)},
)
if extrapolate:
df["node"] = pd.Categorical(df["node"], nodes)
df = df[df["year"].isin(years_cost)].groupby(["node"]).apply(cost_fit)
ws = wb["cost_ref"]
for i in range(2, 14):
ws[f"A{i}"].value = nodes[i - 2]
ws[f"B{i}"].value = df.values[i - 2]
else:
vals = df[df["year"] == fmy + 5]["lvl"].values
nodes = df["node"].values
ws = wb["cost_ref"]
for i in range(2, 14):
ws[f"A{i}"].value = nodes
ws[f"B{i}"].value = (vals[i - 2] / 1000).round(3)
# price_ref
comms = ["i_feed", "i_spec", "i_therm", "rc_spec", "rc_therm", "transport"]
years_price = [i for i in range(fmy, 2055, 5)]
df = scenario.var(
"PRICE_COMMODITY", filters={"commodity": comms, "year": years_price}
)
df["node"] = pd.Categorical(df["node"], nodes)
df["commodity"] = pd.Categorical(df["commodity"], comms)
df = df.groupby(["node", "commodity"]).apply(price_fit)
ws = wb["price_ref"]
for i in range(2, 62):
ws[f"A{i}"].value = df.index.get_level_values(0).values[i - 2]
ws[f"B{i}"].value = df.index.get_level_values(1).values[i - 2]
ws[f"C{i}"].value = df.values[i - 2]
# demand_ref
df = scenario.par("demand", filters={"commodity": comms, "year": fmy})
ws = wb["demand_ref"]
for i in range(2, 62):
ws[f"A{i}"].value = df.node.values[i - 2]
ws[f"B{i}"].value = df.commodity.values[i - 2]
ws[f"C{i}"].value = df.value.values[i - 2]
ws = wb["gdp_calibrate"]
gdp = scenario.par("bound_activity_up", filters={"technology": "GDP"})
gdp = gdp[gdp["year_act"] >= 2015].sort_values(["node_loc", "year_act"])
for i in range(2, len(gdp.index) + 2):
ws[f"A{i}"].value = gdp.year_act.values[i - 2]
ws[f"B{i}"].value = gdp.node_loc.values[i - 2]
ws[f"C{i}"].value = gdp.value.values[i - 2]
wb.save(path)
[docs]
def get_ssp_from_context(
context: Context,
) -> Literal["SSP1", "SSP2", "SSP3", "SSP4", "SSP5", "LED"]:
"""Get selected SSP from context
Parameters
----------
context
Returns
-------
SSP label
"""
return "SSP2" if "ssp" not in context else context["ssp"]
[docs]
def maybe_remove_water_tec(scenario: message_ix.Scenario, results: dict) -> None:
"""Helper to rename deprecated water supply technology name.
Parameters
----------
scenario
results
"""
if len(scenario.par("output", filters={"technology": "extract_surfacewater"})):
results["input"] = results["input"].replace({"freshwater_supply": "freshwater"})
[docs]
def path_fallback(context_or_regions: Context | str, *parts) -> Path:
"""Return a :class:`.Path` constructed from `parts`.
If ``context.model.regions`` (or a string value as the first argument) is defined
and the file exists in a subdirectory of :file:`data/transport/{regions}/`, return
its path; otherwise, return the path in :file:`data/transport/`.
"""
if isinstance(context_or_regions, str):
regions = context_or_regions
else:
# Use a value from a Context object, or a default
regions = context_or_regions.model.regions
candidates = (
package_data_path("material", regions, *parts),
package_data_path("material", *parts),
)
for c in candidates:
if c.exists():
return c
raise FileNotFoundError(candidates)
[docs]
def add_region_column(
df: pd.DataFrame, parts: Iterable[str], iso_column: str = "COUNTRY"
) -> pd.Series:
"""Convenience function to add R12 region column to dataframe."""
yaml_data = load_package_data(*parts)
if "World" in yaml_data:
yaml_data.pop("World")
r12_map = {k: v["child"] for k, v in yaml_data.items()}
r12_map_inv = {k: v[0] for k, v in invert_dictionary(r12_map).items()}
return df[iso_column].map(r12_map_inv)