Compare commits

...

4 Commits

Author SHA1 Message Date
Paulus Schoutsen
550aa7300c Address comments 2026-01-12 15:57:07 -05:00
Paulus Schoutsen
b48a185612 add test 2026-01-10 23:43:22 -05:00
pre-commit-ci-lite[bot]
16fad85e60 [pre-commit.ci lite] apply automatic fixes 2026-01-11 04:07:49 +00:00
Paulus Schoutsen
b32c785d7d Allow finding all devices as target that match mac suffix 2026-01-10 23:02:53 -05:00
3 changed files with 190 additions and 8 deletions

View File

@@ -34,6 +34,7 @@ from esphome.const import (
CONF_MDNS,
CONF_MQTT,
CONF_NAME,
CONF_NAME_ADD_MAC_SUFFIX,
CONF_OTA,
CONF_PASSWORD,
CONF_PLATFORM,
@@ -59,6 +60,7 @@ from esphome.util import (
run_external_process,
safe_print,
)
from esphome.zeroconf import discover_mdns_devices
_LOGGER = logging.getLogger(__name__)
@@ -232,22 +234,34 @@ def choose_upload_log_host(
(f"{port.path} ({port.description})", port.path) for port in get_serial_ports()
]
def add_ota_options() -> None:
"""Add OTA options, using mDNS discovery if name_add_mac_suffix is enabled."""
if has_name_add_mac_suffix() and has_mdns() and has_non_ip_address():
# Discover devices via mDNS when name_add_mac_suffix is enabled
safe_print("Discovering devices...")
discovered = discover_mdns_devices(CORE.name)
for device_addr in discovered:
options.append((f"Over The Air ({device_addr})", device_addr))
if not discovered and has_resolvable_address():
# No devices found, show base address as fallback
options.append(
(f"Over The Air ({CORE.address}) (no devices found)", CORE.address)
)
elif has_resolvable_address():
options.append((f"Over The Air ({CORE.address})", CORE.address))
if has_mqtt_ip_lookup():
options.append(("Over The Air (MQTT IP lookup)", "MQTTIP"))
if purpose == Purpose.LOGGING:
if has_mqtt_logging():
mqtt_config = CORE.config[CONF_MQTT]
options.append((f"MQTT ({mqtt_config[CONF_BROKER]})", "MQTT"))
if has_api():
if has_resolvable_address():
options.append((f"Over The Air ({CORE.address})", CORE.address))
if has_mqtt_ip_lookup():
options.append(("Over The Air (MQTT IP lookup)", "MQTTIP"))
add_ota_options()
elif purpose == Purpose.UPLOADING and has_ota():
if has_resolvable_address():
options.append((f"Over The Air ({CORE.address})", CORE.address))
if has_mqtt_ip_lookup():
options.append(("Over The Air (MQTT IP lookup)", "MQTTIP"))
add_ota_options()
if check_default is not None and check_default in [opt[1] for opt in options]:
return [check_default]
@@ -334,6 +348,14 @@ def has_resolvable_address() -> bool:
return not CORE.address.endswith(".local")
def has_name_add_mac_suffix() -> bool:
"""Check if name_add_mac_suffix is enabled in the config."""
if CORE.config is None:
return False
esphome_config = CORE.config.get(CONF_ESPHOME, {})
return esphome_config.get(CONF_NAME_ADD_MAC_SUFFIX, False)
def mqtt_get_ip(config: ConfigType, username: str, password: str, client_id: str):
from esphome import mqtt

View File

@@ -4,10 +4,12 @@ import asyncio
from collections.abc import Callable
from dataclasses import dataclass
import logging
import time
from zeroconf import (
AddressResolver,
IPVersion,
ServiceBrowser,
ServiceInfo,
ServiceStateChange,
Zeroconf,
@@ -200,3 +202,53 @@ class AsyncEsphomeZeroconf(AsyncZeroconf):
) and (addresses := info.parsed_scoped_addresses(IPVersion.All)):
return addresses
return None
def discover_mdns_devices(base_name: str, timeout: float = 5.0) -> list[str]:
"""Discover ESPHome devices via mDNS that match the base name pattern.
When name_add_mac_suffix is enabled, devices advertise as <base_name>-<mac>.local.
This function discovers all such devices on the network.
Args:
base_name: The base device name (without MAC suffix)
timeout: How long to wait for mDNS responses (default 5 seconds)
Returns:
List of discovered device addresses (e.g., ['device-abc123.local'])
"""
discovered: list[str] = []
prefix = f"{base_name}-"
def on_service_state_change(
zeroconf: Zeroconf,
service_type: str,
name: str,
state_change: ServiceStateChange,
) -> None:
if state_change in (ServiceStateChange.Added, ServiceStateChange.Updated):
# Extract device name from service name (removes service type suffix)
device_name = name.partition(".")[0]
# Check if this device matches our base name pattern
if device_name.startswith(prefix) and device_name not in discovered:
discovered.append(device_name)
try:
zc = Zeroconf()
except Exception as err:
_LOGGER.warning("mDNS discovery failed to initialize: %s", err)
return []
browser: ServiceBrowser | None = None
try:
browser = ServiceBrowser(
zc, ESPHOME_SERVICE_TYPE, handlers=[on_service_state_change]
)
# Wait for discovery
time.sleep(timeout)
finally:
if browser is not None:
browser.cancel()
zc.close()
return [f"{name}.local" for name in sorted(discovered)]

View File

@@ -14,6 +14,7 @@ from unittest.mock import MagicMock, Mock, patch
import pytest
from pytest import CaptureFixture
from zeroconf import ServiceStateChange
from esphome import platformio_api
from esphome.__main__ import (
@@ -31,6 +32,7 @@ from esphome.__main__ import (
has_mqtt,
has_mqtt_ip_lookup,
has_mqtt_logging,
has_name_add_mac_suffix,
has_non_ip_address,
has_resolvable_address,
mqtt_get_ip,
@@ -52,6 +54,7 @@ from esphome.const import (
CONF_MDNS,
CONF_MQTT,
CONF_NAME,
CONF_NAME_ADD_MAC_SUFFIX,
CONF_OTA,
CONF_PASSWORD,
CONF_PLATFORM,
@@ -68,6 +71,7 @@ from esphome.const import (
PLATFORM_RP2040,
)
from esphome.core import CORE, EsphomeError
from esphome.zeroconf import discover_mdns_devices
def strip_ansi_codes(text: str) -> str:
@@ -1654,6 +1658,110 @@ def test_has_resolvable_address() -> None:
assert has_resolvable_address() is False
def test_has_name_add_mac_suffix() -> None:
"""Test has_name_add_mac_suffix function."""
# Test with name_add_mac_suffix enabled
setup_core(config={CONF_ESPHOME: {CONF_NAME_ADD_MAC_SUFFIX: True}})
assert has_name_add_mac_suffix() is True
# Test with name_add_mac_suffix disabled
setup_core(config={CONF_ESPHOME: {CONF_NAME_ADD_MAC_SUFFIX: False}})
assert has_name_add_mac_suffix() is False
# Test with name_add_mac_suffix not set (defaults to False)
setup_core(config={CONF_ESPHOME: {}})
assert has_name_add_mac_suffix() is False
# Test with no esphome config
setup_core(config={})
assert has_name_add_mac_suffix() is False
# Test with no config at all
CORE.config = None
assert has_name_add_mac_suffix() is False
@pytest.fixture
def mock_mdns_discovery() -> Generator[MagicMock]:
"""Fixture to mock mDNS discovery infrastructure."""
with (
patch("esphome.zeroconf.Zeroconf") as mock_zeroconf_class,
patch("esphome.zeroconf.ServiceBrowser") as mock_browser_class,
patch("esphome.zeroconf.time.sleep"),
):
mock_zc = MagicMock()
mock_zeroconf_class.return_value = mock_zc
mock_browser = MagicMock()
# Store references for test access
mock_zc._mock_browser_class = mock_browser_class
mock_zc._mock_browser = mock_browser
yield mock_zc
@pytest.mark.parametrize(
("discovered_services", "base_name", "expected"),
[
# Test matching devices with filtering
(
[
("mydevice-abc123._esphomelib._tcp.local.", ServiceStateChange.Added),
("mydevice-def456._esphomelib._tcp.local.", ServiceStateChange.Added),
(
"otherdevice-xyz789._esphomelib._tcp.local.",
ServiceStateChange.Added,
),
],
"mydevice",
["mydevice-abc123.local", "mydevice-def456.local"],
),
# Test no matches
(
[
(
"otherdevice-abc123._esphomelib._tcp.local.",
ServiceStateChange.Added,
),
],
"mydevice",
[],
),
# Test deduplication (same device Added then Updated)
(
[
("mydevice-abc123._esphomelib._tcp.local.", ServiceStateChange.Added),
("mydevice-abc123._esphomelib._tcp.local.", ServiceStateChange.Updated),
],
"mydevice",
["mydevice-abc123.local"],
),
],
ids=["matching_with_filter", "no_matches", "deduplication"],
)
def test_discover_mdns_devices(
mock_mdns_discovery: MagicMock,
discovered_services: list[tuple[str, ServiceStateChange]],
base_name: str,
expected: list[str],
) -> None:
"""Test discover_mdns_devices function with various scenarios."""
mock_browser = mock_mdns_discovery._mock_browser
def capture_callback(zc, service_type, handlers):
callback = handlers[0]
for service_name, state_change in discovered_services:
callback(mock_mdns_discovery, service_type, service_name, state_change)
return mock_browser
mock_mdns_discovery._mock_browser_class.side_effect = capture_callback
result = discover_mdns_devices(base_name, timeout=0.1)
assert result == expected
mock_browser.cancel.assert_called_once()
mock_mdns_discovery.close.assert_called_once()
def test_command_wizard(tmp_path: Path) -> None:
"""Test command_wizard function."""
config_file = tmp_path / "test.yaml"