Files
esphome/tests/unit_tests/test_substitutions.py
2025-12-10 00:59:58 +01:00

387 lines
14 KiB
Python

import glob
import logging
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from esphome import config as config_module, yaml_util
from esphome.components import substitutions
from esphome.components.packages import do_packages_pass, merge_packages
from esphome.config import resolve_extend_remove
from esphome.config_helpers import merge_config
from esphome.const import CONF_SUBSTITUTIONS
from esphome.core import CORE
from esphome.util import OrderedDict
_LOGGER = logging.getLogger(__name__)
# Set to True for dev mode behavior
# This will generate the expected version of the test files.
DEV_MODE = False
def sort_dicts(obj):
"""Recursively sort dictionaries for order-insensitive comparison."""
if isinstance(obj, dict):
return {k: sort_dicts(obj[k]) for k in sorted(obj)}
if isinstance(obj, list):
# Lists are not sorted; we preserve order
return [sort_dicts(i) for i in obj]
return obj
def dict_diff(a, b, path=""):
"""Recursively find differences between two dict/list structures."""
diffs = []
if isinstance(a, dict) and isinstance(b, dict):
a_keys = set(a)
b_keys = set(b)
diffs.extend(f"{path}/{key} only in actual" for key in a_keys - b_keys)
diffs.extend(f"{path}/{key} only in expected" for key in b_keys - a_keys)
for key in a_keys & b_keys:
diffs.extend(dict_diff(a[key], b[key], f"{path}/{key}"))
elif isinstance(a, list) and isinstance(b, list):
min_len = min(len(a), len(b))
for i in range(min_len):
diffs.extend(dict_diff(a[i], b[i], f"{path}[{i}]"))
if len(a) > len(b):
diffs.extend(
f"{path}[{i}] only in actual: {a[i]!r}" for i in range(min_len, len(a))
)
elif len(b) > len(a):
diffs.extend(
f"{path}[{i}] only in expected: {b[i]!r}"
for i in range(min_len, len(b))
)
elif a != b:
diffs.append(f"\t{path}: actual={a!r} expected={b!r}")
return diffs
def write_yaml(path: Path, data: dict) -> None:
path.write_text(yaml_util.dump(data), encoding="utf-8")
def verify_database(value: Any, path: str = "") -> str | None:
if isinstance(value, list):
for i, v in enumerate(value):
result = verify_database(v, f"{path}[{i}]")
if result is not None:
return result
return None
if isinstance(value, dict):
for k, v in value.items():
if path == "" and k == CONF_SUBSTITUTIONS:
return None # ignore substitutions key at top level since it is merged.
key_result = verify_database(k, f"{path}/{k}")
if key_result is not None:
return key_result
value_result = verify_database(v, f"{path}/{k}")
if value_result is not None:
return value_result
return None
if isinstance(value, str):
if not isinstance(value, yaml_util.ESPHomeDataBase):
return f"{path}: {value!r} is not ESPHomeDataBase"
return None
return None
# Mapping of (url, ref) to local test repository path under fixtures/substitutions
REMOTES = {
("https://github.com/esphome/repo1", "main"): "remotes/repo1/main",
("https://github.com/esphome/repo2", "main"): "remotes/repo2/main",
}
# Collect all input YAML files for test_substitutions_fixtures parametrized tests:
HERE = Path(__file__).parent
BASE_DIR = HERE / "fixtures" / "substitutions"
SOURCES = sorted(glob.glob(str(BASE_DIR / "*.input.yaml")))
assert SOURCES, f"test_substitutions_fixtures: No input YAML files found in {BASE_DIR}"
@pytest.mark.parametrize(
"source_path",
[Path(p) for p in SOURCES],
ids=lambda p: p.name,
)
@patch("esphome.git.clone_or_update")
def test_substitutions_fixtures(
mock_clone_or_update: MagicMock, source_path: Path
) -> None:
def fake_clone_or_update(
*,
url: str,
ref: str | None = None,
refresh=None,
domain: str,
username: str | None = None,
password: str | None = None,
submodules: list[str] | None = None,
_recover_broken: bool = True,
) -> tuple[Path, None]:
path = REMOTES.get((url, ref))
if path is None:
path = REMOTES.get((url.rstrip(".git"), ref))
if path is None:
raise RuntimeError(
f"Cannot find test repository for {url} @ {ref}. Check the REMOTES mapping in test_substitutions.py"
)
return BASE_DIR / path, None
mock_clone_or_update.side_effect = fake_clone_or_update
expected_path = source_path.with_suffix("").with_suffix(".approved.yaml")
test_case = source_path.with_suffix("").stem
# Load using ESPHome's YAML loader
config = yaml_util.load_yaml(source_path)
command_line_substitutions = config.pop("command_line_substitutions", None)
config = do_packages_pass(config)
substitutions.do_substitution_pass(config, command_line_substitutions)
config = merge_packages(config)
resolve_extend_remove(config)
verify_database_result = verify_database(config)
if verify_database_result is not None:
raise AssertionError(verify_database_result)
# Also load expected using ESPHome's loader, or use {} if missing and DEV_MODE
if expected_path.is_file():
expected = yaml_util.load_yaml(expected_path)
elif DEV_MODE:
expected = {}
else:
assert expected_path.is_file(), f"Expected file missing: {expected_path}"
# Sort dicts only (not lists) for comparison
got_sorted = sort_dicts(config)
expected_sorted = sort_dicts(expected)
if got_sorted != expected_sorted:
diff = "\n".join(dict_diff(got_sorted, expected_sorted))
msg = (
f"Substitution result mismatch for {source_path.name}\n"
f"Diff:\n{diff}\n\n"
f"Got: {got_sorted}\n"
f"Expected: {expected_sorted}"
)
# Write out the received file when test fails
if DEV_MODE:
received_path = source_path.with_name(f"{test_case}.received.yaml")
write_yaml(received_path, config)
msg += f"\nWrote received file to {received_path}."
raise AssertionError(msg)
if DEV_MODE:
_LOGGER.error("Tests passed, but Dev mode is enabled.")
assert (
not DEV_MODE # make sure DEV_MODE is disabled after you are finished.
), (
"Test passed but DEV_MODE must be disabled when running tests. Please set DEV_MODE=False."
)
def test_substitutions_with_command_line_maintains_ordered_dict() -> None:
"""Test that substitutions remain an OrderedDict when command line substitutions are provided,
and that move_to_end() can be called successfully.
This is a regression test for https://github.com/esphome/esphome/issues/11182
where the config would become a regular dict and fail when move_to_end() was called.
"""
# Create an OrderedDict config with substitutions
config = OrderedDict()
config["esphome"] = {"name": "test"}
config[CONF_SUBSTITUTIONS] = {"var1": "value1", "var2": "value2"}
config["other_key"] = "other_value"
# Command line substitutions that should override
command_line_subs = {"var2": "override", "var3": "new_value"}
# Call do_substitution_pass with command line substitutions
substitutions.do_substitution_pass(config, command_line_subs)
# Verify that config is still an OrderedDict
assert isinstance(config, OrderedDict), "Config should remain an OrderedDict"
# Verify substitutions are at the beginning (move_to_end with last=False)
keys = list(config.keys())
assert keys[0] == CONF_SUBSTITUTIONS, "Substitutions should be first key"
# Verify substitutions were properly merged
assert config[CONF_SUBSTITUTIONS]["var1"] == "value1"
assert config[CONF_SUBSTITUTIONS]["var2"] == "override"
assert config[CONF_SUBSTITUTIONS]["var3"] == "new_value"
# Verify config[CONF_SUBSTITUTIONS] is also an OrderedDict
assert isinstance(config[CONF_SUBSTITUTIONS], OrderedDict), (
"Substitutions should be an OrderedDict"
)
def test_substitutions_without_command_line_maintains_ordered_dict() -> None:
"""Test that substitutions work correctly without command line substitutions."""
config = OrderedDict()
config["esphome"] = {"name": "test"}
config[CONF_SUBSTITUTIONS] = {"var1": "value1"}
config["other_key"] = "other_value"
# Call without command line substitutions
substitutions.do_substitution_pass(config, None)
# Verify that config is still an OrderedDict
assert isinstance(config, OrderedDict), "Config should remain an OrderedDict"
# Verify substitutions are at the beginning
keys = list(config.keys())
assert keys[0] == CONF_SUBSTITUTIONS, "Substitutions should be first key"
def test_substitutions_after_merge_config_maintains_ordered_dict() -> None:
"""Test that substitutions work after merge_config (packages scenario).
This is a regression test for https://github.com/esphome/esphome/issues/11182
where using packages would cause config to become a regular dict, breaking move_to_end().
"""
# Simulate what happens with packages - merge two OrderedDict configs
base_config = OrderedDict()
base_config["esphome"] = {"name": "base"}
base_config[CONF_SUBSTITUTIONS] = {"var1": "value1"}
package_config = OrderedDict()
package_config["sensor"] = [{"platform": "template"}]
package_config[CONF_SUBSTITUTIONS] = {"var2": "value2"}
# Merge configs (simulating package merge)
merged_config = merge_config(base_config, package_config)
# Verify merged config is still an OrderedDict
assert isinstance(merged_config, OrderedDict), (
"Merged config should be an OrderedDict"
)
# Now try to run substitution pass on the merged config
substitutions.do_substitution_pass(merged_config, None)
# Should not raise AttributeError
assert isinstance(merged_config, OrderedDict), (
"Config should still be OrderedDict after substitution pass"
)
keys = list(merged_config.keys())
assert keys[0] == CONF_SUBSTITUTIONS, "Substitutions should be first key"
def test_validate_config_with_command_line_substitutions_maintains_ordered_dict(
tmp_path,
) -> None:
"""Test that validate_config preserves OrderedDict when merging command-line substitutions.
This tests the code path in config.py where result[CONF_SUBSTITUTIONS] is set
using merge_dicts_ordered() with command-line substitutions provided.
"""
# Create a minimal valid config
test_config = OrderedDict()
test_config["esphome"] = {"name": "test_device", "platform": "ESP32"}
test_config[CONF_SUBSTITUTIONS] = OrderedDict({"var1": "value1", "var2": "value2"})
test_config["esp32"] = {"board": "esp32dev"}
# Command line substitutions that should override
command_line_subs = {"var2": "override", "var3": "new_value"}
# Set up CORE for the test with a proper Path object
test_yaml = tmp_path / "test.yaml"
test_yaml.write_text("# test config")
CORE.config_path = test_yaml
# Call validate_config with command line substitutions
result = config_module.validate_config(test_config, command_line_subs)
# Verify that result[CONF_SUBSTITUTIONS] is an OrderedDict
assert isinstance(result.get(CONF_SUBSTITUTIONS), OrderedDict), (
"Result substitutions should be an OrderedDict"
)
# Verify substitutions were properly merged
assert result[CONF_SUBSTITUTIONS]["var1"] == "value1"
assert result[CONF_SUBSTITUTIONS]["var2"] == "override"
assert result[CONF_SUBSTITUTIONS]["var3"] == "new_value"
def test_validate_config_without_command_line_substitutions_maintains_ordered_dict(
tmp_path,
) -> None:
"""Test that validate_config preserves OrderedDict without command-line substitutions.
This tests the code path in config.py where result[CONF_SUBSTITUTIONS] is set
using merge_dicts_ordered() when command_line_substitutions is None.
"""
# Create a minimal valid config
test_config = OrderedDict()
test_config["esphome"] = {"name": "test_device", "platform": "ESP32"}
test_config[CONF_SUBSTITUTIONS] = OrderedDict({"var1": "value1", "var2": "value2"})
test_config["esp32"] = {"board": "esp32dev"}
# Set up CORE for the test with a proper Path object
test_yaml = tmp_path / "test.yaml"
test_yaml.write_text("# test config")
CORE.config_path = test_yaml
# Call validate_config without command line substitutions
result = config_module.validate_config(test_config, None)
# Verify that result[CONF_SUBSTITUTIONS] is an OrderedDict
assert isinstance(result.get(CONF_SUBSTITUTIONS), OrderedDict), (
"Result substitutions should be an OrderedDict"
)
# Verify substitutions are unchanged
assert result[CONF_SUBSTITUTIONS]["var1"] == "value1"
assert result[CONF_SUBSTITUTIONS]["var2"] == "value2"
def test_merge_config_preserves_ordered_dict() -> None:
"""Test that merge_config preserves OrderedDict type.
This is a regression test to ensure merge_config doesn't lose OrderedDict type
when merging configs, which causes AttributeError on move_to_end().
"""
# Test OrderedDict + dict = OrderedDict
od = OrderedDict([("a", 1), ("b", 2)])
d = {"b": 20, "c": 3}
result = merge_config(od, d)
assert isinstance(result, OrderedDict), (
"OrderedDict + dict should return OrderedDict"
)
# Test dict + OrderedDict = OrderedDict
d = {"a": 1, "b": 2}
od = OrderedDict([("b", 20), ("c", 3)])
result = merge_config(d, od)
assert isinstance(result, OrderedDict), (
"dict + OrderedDict should return OrderedDict"
)
# Test OrderedDict + OrderedDict = OrderedDict
od1 = OrderedDict([("a", 1), ("b", 2)])
od2 = OrderedDict([("b", 20), ("c", 3)])
result = merge_config(od1, od2)
assert isinstance(result, OrderedDict), (
"OrderedDict + OrderedDict should return OrderedDict"
)
# Test that dict + dict still returns regular dict (no unnecessary conversion)
d1 = {"a": 1, "b": 2}
d2 = {"b": 20, "c": 3}
result = merge_config(d1, d2)
assert isinstance(result, dict), "dict + dict should return dict"
assert not isinstance(result, OrderedDict), (
"dict + dict should not return OrderedDict"
)