mirror of
https://github.com/esphome/esphome.git
synced 2026-02-18 15:35:59 -07:00
Merge branch 'buffering' into integration
This commit is contained in:
@@ -431,25 +431,33 @@ def run_miniterm(config: ConfigType, port: str, args) -> int:
|
||||
while tries < 5:
|
||||
try:
|
||||
with ser:
|
||||
buffer = b""
|
||||
ser.timeout = 0.1 # 100ms timeout for non-blocking reads
|
||||
while True:
|
||||
try:
|
||||
raw = ser.readline()
|
||||
# Read all available data and timestamp it
|
||||
chunk = ser.read(ser.in_waiting or 1)
|
||||
if not chunk:
|
||||
continue
|
||||
time_ = datetime.now()
|
||||
nanoseconds = time_.microsecond // 1000
|
||||
time_str = f"[{time_.hour:02}:{time_.minute:02}:{time_.second:02}.{nanoseconds:03}]"
|
||||
|
||||
# Add to buffer and process complete lines
|
||||
buffer += chunk
|
||||
while b"\n" in buffer:
|
||||
raw_line, buffer = buffer.split(b"\n", 1)
|
||||
line = raw_line.replace(b"\r", b"").decode(
|
||||
"utf8", "backslashreplace"
|
||||
)
|
||||
safe_print(parser.parse_line(line, time_str))
|
||||
|
||||
backtrace_state = platformio_api.process_stacktrace(
|
||||
config, line, backtrace_state=backtrace_state
|
||||
)
|
||||
except serial.SerialException:
|
||||
_LOGGER.error("Serial port closed!")
|
||||
return 0
|
||||
line = (
|
||||
raw.replace(b"\r", b"")
|
||||
.replace(b"\n", b"")
|
||||
.decode("utf8", "backslashreplace")
|
||||
)
|
||||
time_ = datetime.now()
|
||||
nanoseconds = time_.microsecond // 1000
|
||||
time_str = f"[{time_.hour:02}:{time_.minute:02}:{time_.second:02}.{nanoseconds:03}]"
|
||||
safe_print(parser.parse_line(line, time_str))
|
||||
|
||||
backtrace_state = platformio_api.process_stacktrace(
|
||||
config, line, backtrace_state=backtrace_state
|
||||
)
|
||||
except serial.SerialException:
|
||||
tries += 1
|
||||
time.sleep(1)
|
||||
|
||||
@@ -34,6 +34,7 @@ from esphome.__main__ import (
|
||||
has_non_ip_address,
|
||||
has_resolvable_address,
|
||||
mqtt_get_ip,
|
||||
run_miniterm,
|
||||
show_logs,
|
||||
upload_program,
|
||||
upload_using_esptool,
|
||||
@@ -41,11 +42,13 @@ from esphome.__main__ import (
|
||||
from esphome.components.esp32 import KEY_ESP32, KEY_VARIANT, VARIANT_ESP32
|
||||
from esphome.const import (
|
||||
CONF_API,
|
||||
CONF_BAUD_RATE,
|
||||
CONF_BROKER,
|
||||
CONF_DISABLED,
|
||||
CONF_ESPHOME,
|
||||
CONF_LEVEL,
|
||||
CONF_LOG_TOPIC,
|
||||
CONF_LOGGER,
|
||||
CONF_MDNS,
|
||||
CONF_MQTT,
|
||||
CONF_NAME,
|
||||
@@ -838,6 +841,7 @@ class MockArgs:
|
||||
configuration: str | None = None
|
||||
name: str | None = None
|
||||
dashboard: bool = False
|
||||
reset: bool = False
|
||||
|
||||
|
||||
def test_upload_program_serial_esp32(
|
||||
@@ -2802,3 +2806,301 @@ def test_compile_program_no_build_info_when_json_missing_keys(
|
||||
|
||||
assert result == 0
|
||||
assert "Build Info:" not in caplog.text
|
||||
|
||||
|
||||
# Tests for run_miniterm serial log batching
|
||||
|
||||
|
||||
# Sentinel to signal end of mock serial data (raises SerialException)
|
||||
MOCK_SERIAL_END = object()
|
||||
|
||||
|
||||
class MockSerial:
|
||||
"""Mock serial port for testing run_miniterm."""
|
||||
|
||||
def __init__(self, chunks: list[bytes | object]) -> None:
|
||||
"""Initialize with a list of chunks to return from read().
|
||||
|
||||
Args:
|
||||
chunks: List of byte chunks to return sequentially.
|
||||
Use MOCK_SERIAL_END sentinel to signal end of data.
|
||||
Empty bytes b"" simulate timeout (no data available).
|
||||
"""
|
||||
self.chunks = list(chunks)
|
||||
self.chunk_index = 0
|
||||
self.baudrate = 0
|
||||
self.port = ""
|
||||
self.dtr = True
|
||||
self.rts = True
|
||||
self.timeout = 0.1
|
||||
self._is_open = False
|
||||
|
||||
def __enter__(self) -> MockSerial:
|
||||
self._is_open = True
|
||||
return self
|
||||
|
||||
def __exit__(self, *args: Any) -> None:
|
||||
self._is_open = False
|
||||
|
||||
@property
|
||||
def in_waiting(self) -> int:
|
||||
"""Return number of bytes available."""
|
||||
if self.chunk_index < len(self.chunks):
|
||||
chunk = self.chunks[self.chunk_index]
|
||||
if chunk is MOCK_SERIAL_END:
|
||||
return 0
|
||||
return len(chunk) # type: ignore[arg-type]
|
||||
return 0
|
||||
|
||||
def read(self, size: int = 1) -> bytes:
|
||||
"""Read next chunk of data."""
|
||||
if self.chunk_index < len(self.chunks):
|
||||
chunk = self.chunks[self.chunk_index]
|
||||
self.chunk_index += 1
|
||||
if chunk is MOCK_SERIAL_END:
|
||||
# Sentinel means we're done - simulate port closed
|
||||
import serial
|
||||
|
||||
raise serial.SerialException("Port closed")
|
||||
return chunk # type: ignore[return-value]
|
||||
import serial
|
||||
|
||||
raise serial.SerialException("Port closed")
|
||||
|
||||
|
||||
def test_run_miniterm_batches_lines_with_same_timestamp(
|
||||
capfd: CaptureFixture[str],
|
||||
) -> None:
|
||||
"""Test that lines from the same chunk get the same timestamp."""
|
||||
# Simulate receiving multiple log lines in a single chunk
|
||||
# This is how data arrives over USB - many lines at once
|
||||
chunk = b"[I][app:100]: Line 1\r\n[I][app:100]: Line 2\r\n[I][app:100]: Line 3\r\n"
|
||||
|
||||
mock_serial = MockSerial([chunk, MOCK_SERIAL_END])
|
||||
|
||||
config = {
|
||||
CONF_LOGGER: {
|
||||
CONF_BAUD_RATE: 115200,
|
||||
"deassert_rts_dtr": False,
|
||||
}
|
||||
}
|
||||
args = MockArgs()
|
||||
|
||||
with (
|
||||
patch("serial.Serial", return_value=mock_serial),
|
||||
patch.object(platformio_api, "process_stacktrace") as mock_bt,
|
||||
):
|
||||
mock_bt.return_value = False
|
||||
result = run_miniterm(config, "/dev/ttyUSB0", args)
|
||||
|
||||
assert result == 0
|
||||
|
||||
captured = capfd.readouterr()
|
||||
lines = [line for line in captured.out.strip().split("\n") if line]
|
||||
|
||||
# All 3 lines should have the same timestamp (first 13 chars like "[HH:MM:SS.mmm]")
|
||||
assert len(lines) == 3
|
||||
timestamps = [line[:13] for line in lines]
|
||||
assert timestamps[0] == timestamps[1] == timestamps[2], (
|
||||
f"Lines from same chunk should have same timestamp: {timestamps}"
|
||||
)
|
||||
|
||||
|
||||
def test_run_miniterm_different_chunks_different_timestamps(
|
||||
capfd: CaptureFixture[str],
|
||||
) -> None:
|
||||
"""Test that lines from different chunks can have different timestamps."""
|
||||
# Two separate chunks - could have different timestamps
|
||||
chunk1 = b"[I][app:100]: Chunk 1 Line\r\n"
|
||||
chunk2 = b"[I][app:100]: Chunk 2 Line\r\n"
|
||||
|
||||
mock_serial = MockSerial([chunk1, chunk2, MOCK_SERIAL_END])
|
||||
|
||||
config = {
|
||||
CONF_LOGGER: {
|
||||
CONF_BAUD_RATE: 115200,
|
||||
"deassert_rts_dtr": False,
|
||||
}
|
||||
}
|
||||
args = MockArgs()
|
||||
|
||||
with (
|
||||
patch("serial.Serial", return_value=mock_serial),
|
||||
patch.object(platformio_api, "process_stacktrace") as mock_bt,
|
||||
):
|
||||
mock_bt.return_value = False
|
||||
result = run_miniterm(config, "/dev/ttyUSB0", args)
|
||||
|
||||
assert result == 0
|
||||
|
||||
captured = capfd.readouterr()
|
||||
lines = [line for line in captured.out.strip().split("\n") if line]
|
||||
assert len(lines) == 2
|
||||
|
||||
|
||||
def test_run_miniterm_handles_split_lines() -> None:
|
||||
"""Test that partial lines are buffered until complete."""
|
||||
# Line split across two chunks
|
||||
chunk1 = b"[I][app:100]: Start of "
|
||||
chunk2 = b"line\r\n"
|
||||
|
||||
mock_serial = MockSerial([chunk1, chunk2, MOCK_SERIAL_END])
|
||||
|
||||
config = {
|
||||
CONF_LOGGER: {
|
||||
CONF_BAUD_RATE: 115200,
|
||||
"deassert_rts_dtr": False,
|
||||
}
|
||||
}
|
||||
args = MockArgs()
|
||||
|
||||
with (
|
||||
patch("serial.Serial", return_value=mock_serial),
|
||||
patch.object(platformio_api, "process_stacktrace") as mock_bt,
|
||||
patch("esphome.__main__.safe_print") as mock_print,
|
||||
):
|
||||
mock_bt.return_value = False
|
||||
run_miniterm(config, "/dev/ttyUSB0", args)
|
||||
|
||||
# Should have printed exactly one complete line
|
||||
assert mock_print.call_count == 1
|
||||
printed_line = mock_print.call_args[0][0]
|
||||
assert "Start of line" in printed_line
|
||||
|
||||
|
||||
def test_run_miniterm_backtrace_state_maintained() -> None:
|
||||
"""Test that backtrace_state is properly maintained across lines.
|
||||
|
||||
ESP8266 backtraces span multiple lines between >>>stack>>> and <<<stack<<<.
|
||||
The backtrace_state must persist correctly when lines arrive in the same chunk.
|
||||
"""
|
||||
# Simulate ESP8266 multi-line backtrace arriving in a single chunk
|
||||
backtrace_chunk = (
|
||||
b">>>stack>>>\r\n"
|
||||
b"3ffffe90: 40220ef8 b66aa8c0 3fff0a4c 40204c84\r\n"
|
||||
b"3ffffea0: 00000005 0000a635 3fff191c 4020413c\r\n"
|
||||
b"<<<stack<<<\r\n"
|
||||
)
|
||||
|
||||
mock_serial = MockSerial([backtrace_chunk, MOCK_SERIAL_END])
|
||||
|
||||
config = {
|
||||
CONF_LOGGER: {
|
||||
CONF_BAUD_RATE: 115200,
|
||||
"deassert_rts_dtr": False,
|
||||
}
|
||||
}
|
||||
args = MockArgs()
|
||||
|
||||
backtrace_states: list[tuple[str, bool]] = []
|
||||
|
||||
def track_backtrace_state(
|
||||
config: dict[str, Any], line: str, backtrace_state: bool
|
||||
) -> bool:
|
||||
"""Track the backtrace_state progression."""
|
||||
backtrace_states.append((line, backtrace_state))
|
||||
# Simulate actual behavior
|
||||
if ">>>stack>>>" in line:
|
||||
return True
|
||||
if "<<<stack<<<" in line:
|
||||
return False
|
||||
return backtrace_state
|
||||
|
||||
with (
|
||||
patch("serial.Serial", return_value=mock_serial),
|
||||
patch.object(
|
||||
platformio_api,
|
||||
"process_stacktrace",
|
||||
side_effect=track_backtrace_state,
|
||||
),
|
||||
):
|
||||
run_miniterm(config, "/dev/ttyUSB0", args)
|
||||
|
||||
# Verify the state progression
|
||||
assert len(backtrace_states) == 4
|
||||
|
||||
# Line 1: >>>stack>>> - state should be False (before processing)
|
||||
assert ">>>stack>>>" in backtrace_states[0][0]
|
||||
assert backtrace_states[0][1] is False
|
||||
|
||||
# Line 2: stack data - state should be True (after >>>stack>>>)
|
||||
assert "40220ef8" in backtrace_states[1][0]
|
||||
assert backtrace_states[1][1] is True
|
||||
|
||||
# Line 3: more stack data - state should be True
|
||||
assert "4020413c" in backtrace_states[2][0]
|
||||
assert backtrace_states[2][1] is True
|
||||
|
||||
# Line 4: <<<stack<<< - state should be True (before processing end marker)
|
||||
assert "<<<stack<<<" in backtrace_states[3][0]
|
||||
assert backtrace_states[3][1] is True
|
||||
|
||||
|
||||
def test_run_miniterm_handles_empty_reads(
|
||||
capfd: CaptureFixture[str],
|
||||
) -> None:
|
||||
"""Test that empty reads (timeouts) are handled correctly.
|
||||
|
||||
When read() returns empty bytes, the code should continue waiting
|
||||
for more data without processing anything.
|
||||
"""
|
||||
# Simulate: empty read (timeout), then data, then empty read, then end
|
||||
chunk = b"[I][app:100]: Test line\r\n"
|
||||
|
||||
mock_serial = MockSerial([b"", chunk, b"", MOCK_SERIAL_END])
|
||||
|
||||
config = {
|
||||
CONF_LOGGER: {
|
||||
CONF_BAUD_RATE: 115200,
|
||||
"deassert_rts_dtr": False,
|
||||
}
|
||||
}
|
||||
args = MockArgs()
|
||||
|
||||
with (
|
||||
patch("serial.Serial", return_value=mock_serial),
|
||||
patch.object(platformio_api, "process_stacktrace") as mock_bt,
|
||||
):
|
||||
mock_bt.return_value = False
|
||||
result = run_miniterm(config, "/dev/ttyUSB0", args)
|
||||
|
||||
assert result == 0
|
||||
|
||||
captured = capfd.readouterr()
|
||||
lines = [line for line in captured.out.strip().split("\n") if line]
|
||||
# Should have exactly one line despite empty reads
|
||||
assert len(lines) == 1
|
||||
assert "Test line" in lines[0]
|
||||
|
||||
|
||||
def test_run_miniterm_no_logger_returns_early(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test that run_miniterm returns early if logger is not configured."""
|
||||
config: dict[str, Any] = {} # No logger config
|
||||
args = MockArgs()
|
||||
|
||||
with caplog.at_level(logging.INFO):
|
||||
result = run_miniterm(config, "/dev/ttyUSB0", args)
|
||||
|
||||
assert result == 1
|
||||
assert "Logger is not enabled" in caplog.text
|
||||
|
||||
|
||||
def test_run_miniterm_baud_rate_zero_returns_early(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test that run_miniterm returns early if baud_rate is 0."""
|
||||
config = {
|
||||
CONF_LOGGER: {
|
||||
CONF_BAUD_RATE: 0,
|
||||
"deassert_rts_dtr": False,
|
||||
}
|
||||
}
|
||||
args = MockArgs()
|
||||
|
||||
with caplog.at_level(logging.INFO):
|
||||
result = run_miniterm(config, "/dev/ttyUSB0", args)
|
||||
|
||||
assert result == 1
|
||||
assert "UART logging is disabled" in caplog.text
|
||||
|
||||
Reference in New Issue
Block a user