Merge branch 'host_logger_thread_safe' into integration

This commit is contained in:
J. Nick Koston
2026-01-05 15:26:26 -10:00
36 changed files with 1856 additions and 176 deletions

View File

@@ -1 +1 @@
94557f94be073390342833aff12ef8676a8b597db5fa770a5a1232e9425cb48f
97fb425f1d681a5994ed1cc6187910f5d2c37ee577b6dc07eb3f4d8862a011de

View File

@@ -135,7 +135,7 @@ esphome/components/display_menu_base/* @numo68
esphome/components/dps310/* @kbx81
esphome/components/ds1307/* @badbadc0ffee
esphome/components/ds2484/* @mrk-its
esphome/components/dsmr/* @glmnet @zuidwijk
esphome/components/dsmr/* @glmnet @PolarGoose @zuidwijk
esphome/components/duty_time/* @dudanov
esphome/components/ee895/* @Stock-M
esphome/components/ektf2232/touchscreen/* @jesserockz

View File

@@ -1,37 +1,50 @@
# Dummy integration to allow relying on AsyncTCP
# Async TCP client support for all platforms
import esphome.codegen as cg
import esphome.config_validation as cv
from esphome.const import (
PLATFORM_BK72XX,
PLATFORM_ESP32,
PLATFORM_ESP8266,
PLATFORM_LN882X,
PLATFORM_RTL87XX,
)
from esphome.core import CORE, CoroPriority, coroutine_with_priority
CODEOWNERS = ["@esphome/core"]
DEPENDENCIES = ["network"]
CONFIG_SCHEMA = cv.All(
cv.Schema({}),
cv.only_with_arduino,
cv.only_on(
[
PLATFORM_ESP32,
PLATFORM_ESP8266,
PLATFORM_BK72XX,
PLATFORM_LN882X,
PLATFORM_RTL87XX,
]
),
)
def AUTO_LOAD() -> list[str]:
# Socket component needed for platforms using socket-based implementation
# ESP32, ESP8266, RP2040, and LibreTiny use AsyncTCP libraries, others use sockets
if (
not CORE.is_esp32
and not CORE.is_esp8266
and not CORE.is_rp2040
and not CORE.is_libretiny
):
return ["socket"]
return []
# Support all platforms - Arduino/ESP-IDF get libraries, other platforms use socket implementation
CONFIG_SCHEMA = cv.Schema({})
@coroutine_with_priority(CoroPriority.NETWORK_TRANSPORT)
async def to_code(config):
if CORE.is_esp32 or CORE.is_libretiny:
if CORE.using_esp_idf:
# ESP-IDF needs the IDF component
from esphome.components.esp32 import add_idf_component
add_idf_component(name="esp32async/asynctcp", ref="3.4.91")
elif CORE.is_esp32 or CORE.is_libretiny:
# https://github.com/ESP32Async/AsyncTCP
cg.add_library("ESP32Async/AsyncTCP", "3.4.5")
elif CORE.is_esp8266:
# https://github.com/ESP32Async/ESPAsyncTCP
cg.add_library("ESP32Async/ESPAsyncTCP", "2.0.0")
elif CORE.is_rp2040:
# https://github.com/khoih-prog/AsyncTCP_RP2040W
cg.add_library("khoih-prog/AsyncTCP_RP2040W", "1.2.0")
# Other platforms (host, etc) use socket-based implementation
def FILTER_SOURCE_FILES() -> list[str]:
# Exclude socket implementation for platforms that use AsyncTCP libraries
if CORE.is_esp32 or CORE.is_esp8266 or CORE.is_rp2040 or CORE.is_libretiny:
return ["async_tcp_socket.cpp"]
return []

View File

@@ -0,0 +1,17 @@
#pragma once
#include "esphome/core/defines.h"
#if (defined(USE_ESP32) || defined(USE_LIBRETINY)) && !defined(CLANG_TIDY)
// Use AsyncTCP library for ESP32 (Arduino or ESP-IDF) and LibreTiny
// But not for clang-tidy as the header file isn't present in that case
#include <AsyncTCP.h>
#elif defined(USE_ESP8266)
// Use ESPAsyncTCP library for ESP8266 (always Arduino)
#include <ESPAsyncTCP.h>
#elif defined(USE_RP2040)
// Use AsyncTCP_RP2040W library for RP2040
#include <AsyncTCP_RP2040W.h>
#else
// Use socket-based implementation for other platforms and clang-tidy
#include "async_tcp_socket.h"
#endif

View File

@@ -0,0 +1,161 @@
#include "async_tcp_socket.h"
#if defined(USE_SOCKET_IMPL_LWIP_SOCKETS) || defined(USE_SOCKET_IMPL_BSD_SOCKETS)
#include "esphome/components/network/util.h"
#include "esphome/core/log.h"
#include <cerrno>
#include <sys/select.h>
namespace esphome::async_tcp {
static const char *const TAG = "async_tcp";
// Read buffer size matches TCP MSS (1500 MTU - 40 bytes IP/TCP headers).
// This implementation only runs on ESP-IDF and host which have ample stack.
static constexpr size_t READ_BUFFER_SIZE = 1460;
bool AsyncClient::connect(const char *host, uint16_t port) {
if (connected_ || connecting_) {
ESP_LOGW(TAG, "Already connected/connecting");
return false;
}
// Resolve address
struct sockaddr_storage addr;
socklen_t addrlen = esphome::socket::set_sockaddr((struct sockaddr *) &addr, sizeof(addr), host, port);
if (addrlen == 0) {
ESP_LOGE(TAG, "Invalid address: %s", host);
if (error_cb_)
error_cb_(error_arg_, this, -1);
return false;
}
// Create socket with loop monitoring
int family = ((struct sockaddr *) &addr)->sa_family;
socket_ = esphome::socket::socket_loop_monitored(family, SOCK_STREAM, IPPROTO_TCP);
if (!socket_) {
ESP_LOGE(TAG, "Failed to create socket");
if (error_cb_)
error_cb_(error_arg_, this, -1);
return false;
}
socket_->setblocking(false);
int err = socket_->connect((struct sockaddr *) &addr, addrlen);
if (err == 0) {
// Connection succeeded immediately (rare, but possible for localhost)
connected_ = true;
if (connect_cb_)
connect_cb_(connect_arg_, this);
return true;
}
if (errno != EINPROGRESS) {
ESP_LOGE(TAG, "Connect failed: %d", errno);
close();
if (error_cb_)
error_cb_(error_arg_, this, errno);
return false;
}
connecting_ = true;
return true;
}
void AsyncClient::close() {
socket_.reset();
bool was_connected = connected_;
connected_ = false;
connecting_ = false;
if (was_connected && disconnect_cb_)
disconnect_cb_(disconnect_arg_, this);
}
size_t AsyncClient::write(const char *data, size_t len) {
if (!socket_ || !connected_)
return 0;
ssize_t sent = socket_->write(data, len);
if (sent < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
ESP_LOGE(TAG, "Write error: %d", errno);
close();
if (error_cb_)
error_cb_(error_arg_, this, errno);
}
return 0;
}
return sent;
}
void AsyncClient::loop() {
if (!socket_)
return;
if (connecting_) {
// For connecting, we need to check writability, not readability
// The Application's select() only monitors read FDs, so we do our own check here
// For ESP platforms lwip_select() might be faster, but this code isn't used
// on those platforms anyway. If it was, we'd fix the Application select()
// to report writability instead of doing it this way.
int fd = socket_->get_fd();
if (fd < 0) {
ESP_LOGW(TAG, "Invalid socket fd");
close();
return;
}
fd_set writefds;
FD_ZERO(&writefds);
FD_SET(fd, &writefds);
struct timeval tv = {0, 0};
int ret = select(fd + 1, nullptr, &writefds, nullptr, &tv);
if (ret > 0 && FD_ISSET(fd, &writefds)) {
int error = 0;
socklen_t len = sizeof(error);
if (socket_->getsockopt(SOL_SOCKET, SO_ERROR, &error, &len) == 0 && error == 0) {
connecting_ = false;
connected_ = true;
if (connect_cb_)
connect_cb_(connect_arg_, this);
} else {
ESP_LOGW(TAG, "Connection failed: %d", error);
close();
if (error_cb_)
error_cb_(error_arg_, this, error);
}
} else if (ret < 0) {
ESP_LOGE(TAG, "Select error: %d", errno);
close();
if (error_cb_)
error_cb_(error_arg_, this, errno);
}
} else if (connected_) {
// For connected sockets, use the Application's select() results
if (!socket_->ready())
return;
uint8_t buf[READ_BUFFER_SIZE];
ssize_t len = socket_->read(buf, READ_BUFFER_SIZE);
if (len == 0) {
ESP_LOGI(TAG, "Connection closed by peer");
close();
} else if (len > 0) {
if (data_cb_)
data_cb_(data_arg_, this, buf, len);
} else if (errno != EAGAIN && errno != EWOULDBLOCK) {
ESP_LOGW(TAG, "Read error: %d", errno);
close();
if (error_cb_)
error_cb_(error_arg_, this, errno);
}
}
}
} // namespace esphome::async_tcp
#endif // defined(USE_SOCKET_IMPL_LWIP_SOCKETS) || defined(USE_SOCKET_IMPL_BSD_SOCKETS)

View File

@@ -0,0 +1,73 @@
#pragma once
#include "esphome/core/defines.h"
#if defined(USE_SOCKET_IMPL_LWIP_SOCKETS) || defined(USE_SOCKET_IMPL_BSD_SOCKETS)
#include "esphome/components/socket/socket.h"
#include <functional>
#include <memory>
#include <string>
#include <utility>
namespace esphome::async_tcp {
/// AsyncClient API for platforms using sockets (ESP-IDF, host, etc.)
/// NOTE: This class is NOT thread-safe. All methods must be called from the main loop.
class AsyncClient {
public:
using AcConnectHandler = std::function<void(void *, AsyncClient *)>;
using AcDataHandler = std::function<void(void *, AsyncClient *, void *data, size_t len)>;
using AcErrorHandler = std::function<void(void *, AsyncClient *, int8_t error)>;
AsyncClient() = default;
~AsyncClient() = default;
[[nodiscard]] bool connect(const char *host, uint16_t port);
void close();
[[nodiscard]] bool connected() const { return connected_; }
size_t write(const char *data, size_t len);
void onConnect(AcConnectHandler cb, void *arg = nullptr) { // NOLINT(readability-identifier-naming)
connect_cb_ = std::move(cb);
connect_arg_ = arg;
}
void onDisconnect(AcConnectHandler cb, void *arg = nullptr) { // NOLINT(readability-identifier-naming)
disconnect_cb_ = std::move(cb);
disconnect_arg_ = arg;
}
/// Set data callback. NOTE: data pointer is only valid during callback execution.
void onData(AcDataHandler cb, void *arg = nullptr) { // NOLINT(readability-identifier-naming)
data_cb_ = std::move(cb);
data_arg_ = arg;
}
void onError(AcErrorHandler cb, void *arg = nullptr) { // NOLINT(readability-identifier-naming)
error_cb_ = std::move(cb);
error_arg_ = arg;
}
// Must be called from loop()
void loop();
private:
std::unique_ptr<esphome::socket::Socket> socket_;
AcConnectHandler connect_cb_{nullptr};
void *connect_arg_{nullptr};
AcConnectHandler disconnect_cb_{nullptr};
void *disconnect_arg_{nullptr};
AcDataHandler data_cb_{nullptr};
void *data_arg_{nullptr};
AcErrorHandler error_cb_{nullptr};
void *error_arg_{nullptr};
bool connected_{false};
bool connecting_{false};
};
} // namespace esphome::async_tcp
// Expose AsyncClient in global namespace to match library behavior
using esphome::async_tcp::AsyncClient; // NOLINT(google-global-names-in-headers)
#define ESPHOME_ASYNC_TCP_SOCKET_IMPL
#endif // defined(USE_SOCKET_IMPL_LWIP_SOCKETS) || defined(USE_SOCKET_IMPL_BSD_SOCKETS)

View File

@@ -4,7 +4,7 @@ from esphome.components import uart
import esphome.config_validation as cv
from esphome.const import CONF_ID, CONF_RECEIVE_TIMEOUT, CONF_UART_ID
CODEOWNERS = ["@glmnet", "@zuidwijk"]
CODEOWNERS = ["@glmnet", "@zuidwijk", "@PolarGoose"]
MULTI_CONF = True
@@ -61,7 +61,6 @@ CONFIG_SCHEMA = cv.All(
): cv.positive_time_period_milliseconds,
}
).extend(uart.UART_DEVICE_SCHEMA),
cv.only_with_arduino,
)
@@ -83,7 +82,7 @@ async def to_code(config):
cg.add_build_flag("-DDSMR_WATER_MBUS_ID=" + str(config[CONF_WATER_MBUS_ID]))
# DSMR Parser
cg.add_library("glmnet/Dsmr", "0.8")
cg.add_library("esphome/dsmr_parser", "1.0.0")
# Crypto
cg.add_library("rweather/Crypto", "0.4.0")
cg.add_library("polargoose/Crypto-no-arduino", "0.4.0")

View File

@@ -1,5 +1,3 @@
#ifdef USE_ARDUINO
#include "dsmr.h"
#include "esphome/core/log.h"
@@ -7,8 +5,7 @@
#include <Crypto.h>
#include <GCM.h>
namespace esphome {
namespace dsmr {
namespace esphome::dsmr {
static const char *const TAG = "dsmr";
@@ -257,9 +254,9 @@ bool Dsmr::parse_telegram() {
ESP_LOGV(TAG, "Trying to parse telegram");
this->stop_requesting_data_();
::dsmr::ParseResult<void> res =
::dsmr::P1Parser::parse(&data, this->telegram_, this->bytes_read_, false,
this->crc_check_); // Parse telegram according to data definition. Ignore unknown values.
const auto &res = dsmr_parser::P1Parser::parse(
data, this->telegram_, this->bytes_read_, false,
this->crc_check_); // Parse telegram according to data definition. Ignore unknown values.
if (res.err) {
// Parsing error, show it
auto err_str = res.fullError(this->telegram_, this->telegram_ + this->bytes_read_);
@@ -329,7 +326,4 @@ void Dsmr::set_decryption_key(const std::string &decryption_key) {
}
}
} // namespace dsmr
} // namespace esphome
#endif // USE_ARDUINO
} // namespace esphome::dsmr

View File

@@ -1,24 +1,17 @@
#pragma once
#ifdef USE_ARDUINO
#include "esphome/core/component.h"
#include "esphome/components/sensor/sensor.h"
#include "esphome/components/text_sensor/text_sensor.h"
#include "esphome/components/uart/uart.h"
#include "esphome/core/log.h"
#include "esphome/core/defines.h"
// don't include <dsmr.h> because it puts everything in global namespace
#include <dsmr/parser.h>
#include <dsmr/fields.h>
#include <dsmr_parser/fields.h>
#include <dsmr_parser/parser.h>
#include <vector>
namespace esphome {
namespace dsmr {
namespace esphome::dsmr {
using namespace ::dsmr::fields;
using namespace dsmr_parser::fields;
// DSMR_**_LIST generated by ESPHome and written in esphome/core/defines
@@ -44,8 +37,8 @@ using namespace ::dsmr::fields;
#define DSMR_DATA_SENSOR(s) s
#define DSMR_COMMA ,
using MyData = ::dsmr::ParsedData<DSMR_TEXT_SENSOR_LIST(DSMR_DATA_SENSOR, DSMR_COMMA)
DSMR_BOTH DSMR_SENSOR_LIST(DSMR_DATA_SENSOR, DSMR_COMMA)>;
using MyData = dsmr_parser::ParsedData<DSMR_TEXT_SENSOR_LIST(DSMR_DATA_SENSOR, DSMR_COMMA)
DSMR_BOTH DSMR_SENSOR_LIST(DSMR_DATA_SENSOR, DSMR_COMMA)>;
class Dsmr : public Component, public uart::UARTDevice {
public:
@@ -140,7 +133,4 @@ class Dsmr : public Component, public uart::UARTDevice {
std::vector<uint8_t> decryption_key_{};
bool crc_check_;
};
} // namespace dsmr
} // namespace esphome
#endif // USE_ARDUINO
} // namespace esphome::dsmr

View File

@@ -3,27 +3,34 @@ from esphome.components import sensor
import esphome.config_validation as cv
from esphome.const import (
CONF_ID,
DEVICE_CLASS_APPARENT_POWER,
DEVICE_CLASS_CURRENT,
DEVICE_CLASS_DURATION,
DEVICE_CLASS_ENERGY,
DEVICE_CLASS_FREQUENCY,
DEVICE_CLASS_GAS,
DEVICE_CLASS_POWER,
DEVICE_CLASS_REACTIVE_POWER,
DEVICE_CLASS_VOLTAGE,
DEVICE_CLASS_WATER,
STATE_CLASS_MEASUREMENT,
STATE_CLASS_TOTAL_INCREASING,
UNIT_AMPERE,
UNIT_CUBIC_METER,
UNIT_HERTZ,
UNIT_KILOVOLT_AMPS,
UNIT_KILOVOLT_AMPS_REACTIVE,
UNIT_KILOVOLT_AMPS_REACTIVE_HOURS,
UNIT_KILOWATT,
UNIT_KILOWATT_HOURS,
UNIT_SECOND,
UNIT_VOLT,
)
from . import CONF_DSMR_ID, Dsmr
AUTO_LOAD = ["dsmr"]
UNIT_GIGA_JOULE = "GJ"
CONFIG_SCHEMA = cv.Schema(
{
@@ -46,6 +53,18 @@ CONFIG_SCHEMA = cv.Schema(
device_class=DEVICE_CLASS_ENERGY,
state_class=STATE_CLASS_TOTAL_INCREASING,
),
cv.Optional("energy_delivered_tariff3"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOWATT_HOURS,
accuracy_decimals=3,
device_class=DEVICE_CLASS_ENERGY,
state_class=STATE_CLASS_TOTAL_INCREASING,
),
cv.Optional("energy_delivered_tariff4"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOWATT_HOURS,
accuracy_decimals=3,
device_class=DEVICE_CLASS_ENERGY,
state_class=STATE_CLASS_TOTAL_INCREASING,
),
cv.Optional("energy_returned_lux"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOWATT_HOURS,
accuracy_decimals=3,
@@ -64,14 +83,82 @@ CONFIG_SCHEMA = cv.Schema(
device_class=DEVICE_CLASS_ENERGY,
state_class=STATE_CLASS_TOTAL_INCREASING,
),
cv.Optional("energy_returned_tariff3"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOWATT_HOURS,
accuracy_decimals=3,
device_class=DEVICE_CLASS_ENERGY,
state_class=STATE_CLASS_TOTAL_INCREASING,
),
cv.Optional("energy_returned_tariff4"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOWATT_HOURS,
accuracy_decimals=3,
device_class=DEVICE_CLASS_ENERGY,
state_class=STATE_CLASS_TOTAL_INCREASING,
),
cv.Optional("energy_delivered_tariff1_ch"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOWATT_HOURS,
accuracy_decimals=3,
device_class=DEVICE_CLASS_ENERGY,
state_class=STATE_CLASS_TOTAL_INCREASING,
),
cv.Optional("energy_delivered_tariff2_ch"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOWATT_HOURS,
accuracy_decimals=3,
device_class=DEVICE_CLASS_ENERGY,
state_class=STATE_CLASS_TOTAL_INCREASING,
),
cv.Optional("energy_returned_tariff1_ch"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOWATT_HOURS,
accuracy_decimals=3,
device_class=DEVICE_CLASS_ENERGY,
state_class=STATE_CLASS_TOTAL_INCREASING,
),
cv.Optional("energy_returned_tariff2_ch"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOWATT_HOURS,
accuracy_decimals=3,
device_class=DEVICE_CLASS_ENERGY,
state_class=STATE_CLASS_TOTAL_INCREASING,
),
cv.Optional("total_imported_energy"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE_HOURS,
accuracy_decimals=3,
),
cv.Optional("reactive_energy_delivered_tariff1"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE_HOURS,
accuracy_decimals=3,
),
cv.Optional("reactive_energy_delivered_tariff2"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE_HOURS,
accuracy_decimals=3,
),
cv.Optional("reactive_energy_delivered_tariff3"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE_HOURS,
accuracy_decimals=3,
),
cv.Optional("reactive_energy_delivered_tariff4"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE_HOURS,
accuracy_decimals=3,
),
cv.Optional("total_exported_energy"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE_HOURS,
accuracy_decimals=3,
),
cv.Optional("reactive_energy_returned_tariff1"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE_HOURS,
accuracy_decimals=3,
),
cv.Optional("reactive_energy_returned_tariff2"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE_HOURS,
accuracy_decimals=3,
),
cv.Optional("reactive_energy_returned_tariff3"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE_HOURS,
accuracy_decimals=3,
),
cv.Optional("reactive_energy_returned_tariff4"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE_HOURS,
accuracy_decimals=3,
),
cv.Optional("power_delivered"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOWATT,
accuracy_decimals=3,
@@ -84,61 +171,195 @@ CONFIG_SCHEMA = cv.Schema(
device_class=DEVICE_CLASS_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("power_delivered_ch"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOWATT,
accuracy_decimals=3,
device_class=DEVICE_CLASS_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("power_returned_ch"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOWATT,
accuracy_decimals=3,
device_class=DEVICE_CLASS_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("reactive_power_delivered"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE,
accuracy_decimals=3,
device_class=DEVICE_CLASS_REACTIVE_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("reactive_power_returned"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE,
accuracy_decimals=3,
device_class=DEVICE_CLASS_REACTIVE_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("electricity_threshold"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOWATT,
accuracy_decimals=3,
device_class=DEVICE_CLASS_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("electricity_switch_position"): sensor.sensor_schema(
accuracy_decimals=3,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("electricity_failures"): sensor.sensor_schema(
accuracy_decimals=0,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("electricity_long_failures"): sensor.sensor_schema(
accuracy_decimals=0,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("electricity_sags_l1"): sensor.sensor_schema(
accuracy_decimals=0,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("electricity_sags_l2"): sensor.sensor_schema(
accuracy_decimals=0,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("electricity_sags_l3"): sensor.sensor_schema(
accuracy_decimals=0,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("electricity_swells_l1"): sensor.sensor_schema(
accuracy_decimals=0,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("electricity_swells_l2"): sensor.sensor_schema(
accuracy_decimals=0,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("electricity_swells_l3"): sensor.sensor_schema(
accuracy_decimals=0,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("voltage_sag_time_l1"): sensor.sensor_schema(
unit_of_measurement=UNIT_SECOND,
accuracy_decimals=0,
device_class=DEVICE_CLASS_DURATION,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("voltage_sag_time_l2"): sensor.sensor_schema(
unit_of_measurement=UNIT_SECOND,
accuracy_decimals=0,
device_class=DEVICE_CLASS_DURATION,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("voltage_sag_time_l3"): sensor.sensor_schema(
unit_of_measurement=UNIT_SECOND,
accuracy_decimals=0,
device_class=DEVICE_CLASS_DURATION,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("voltage_sag_l1"): sensor.sensor_schema(
unit_of_measurement=UNIT_VOLT,
accuracy_decimals=0,
device_class=DEVICE_CLASS_VOLTAGE,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("voltage_sag_l2"): sensor.sensor_schema(
unit_of_measurement=UNIT_VOLT,
accuracy_decimals=0,
device_class=DEVICE_CLASS_VOLTAGE,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("voltage_sag_l3"): sensor.sensor_schema(
unit_of_measurement=UNIT_VOLT,
accuracy_decimals=0,
device_class=DEVICE_CLASS_VOLTAGE,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("voltage_swell_time_l1"): sensor.sensor_schema(
unit_of_measurement=UNIT_SECOND,
accuracy_decimals=0,
device_class=DEVICE_CLASS_DURATION,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("voltage_swell_time_l2"): sensor.sensor_schema(
unit_of_measurement=UNIT_SECOND,
accuracy_decimals=0,
device_class=DEVICE_CLASS_DURATION,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("voltage_swell_time_l3"): sensor.sensor_schema(
unit_of_measurement=UNIT_SECOND,
accuracy_decimals=0,
device_class=DEVICE_CLASS_DURATION,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("voltage_swell_l1"): sensor.sensor_schema(
unit_of_measurement=UNIT_VOLT,
accuracy_decimals=0,
device_class=DEVICE_CLASS_VOLTAGE,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("voltage_swell_l2"): sensor.sensor_schema(
unit_of_measurement=UNIT_VOLT,
accuracy_decimals=0,
device_class=DEVICE_CLASS_VOLTAGE,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("voltage_swell_l3"): sensor.sensor_schema(
unit_of_measurement=UNIT_VOLT,
accuracy_decimals=0,
device_class=DEVICE_CLASS_VOLTAGE,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("current_l1"): sensor.sensor_schema(
unit_of_measurement=UNIT_AMPERE,
accuracy_decimals=1,
accuracy_decimals=3,
device_class=DEVICE_CLASS_CURRENT,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("current_l2"): sensor.sensor_schema(
unit_of_measurement=UNIT_AMPERE,
accuracy_decimals=1,
accuracy_decimals=3,
device_class=DEVICE_CLASS_CURRENT,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("current_l3"): sensor.sensor_schema(
unit_of_measurement=UNIT_AMPERE,
accuracy_decimals=1,
accuracy_decimals=3,
device_class=DEVICE_CLASS_CURRENT,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("current"): sensor.sensor_schema(
unit_of_measurement=UNIT_AMPERE,
accuracy_decimals=3,
device_class=DEVICE_CLASS_CURRENT,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("current_n"): sensor.sensor_schema(
unit_of_measurement=UNIT_AMPERE,
accuracy_decimals=3,
device_class=DEVICE_CLASS_CURRENT,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("current_sum"): sensor.sensor_schema(
unit_of_measurement=UNIT_AMPERE,
accuracy_decimals=3,
device_class=DEVICE_CLASS_CURRENT,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("current_fuse_l1"): sensor.sensor_schema(
unit_of_measurement=UNIT_AMPERE,
accuracy_decimals=3,
device_class=DEVICE_CLASS_CURRENT,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("current_fuse_l2"): sensor.sensor_schema(
unit_of_measurement=UNIT_AMPERE,
accuracy_decimals=3,
device_class=DEVICE_CLASS_CURRENT,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("current_fuse_l3"): sensor.sensor_schema(
unit_of_measurement=UNIT_AMPERE,
accuracy_decimals=3,
device_class=DEVICE_CLASS_CURRENT,
state_class=STATE_CLASS_MEASUREMENT,
),
@@ -181,51 +402,93 @@ CONFIG_SCHEMA = cv.Schema(
cv.Optional("reactive_power_delivered_l1"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE,
accuracy_decimals=3,
device_class=DEVICE_CLASS_REACTIVE_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("reactive_power_delivered_l2"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE,
accuracy_decimals=3,
device_class=DEVICE_CLASS_REACTIVE_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("reactive_power_delivered_l3"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE,
accuracy_decimals=3,
device_class=DEVICE_CLASS_REACTIVE_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("reactive_power_returned_l1"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE,
accuracy_decimals=3,
device_class=DEVICE_CLASS_REACTIVE_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("reactive_power_returned_l2"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE,
accuracy_decimals=3,
device_class=DEVICE_CLASS_REACTIVE_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("reactive_power_returned_l3"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE,
accuracy_decimals=3,
device_class=DEVICE_CLASS_REACTIVE_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("voltage_l1"): sensor.sensor_schema(
unit_of_measurement=UNIT_VOLT,
accuracy_decimals=1,
accuracy_decimals=3,
device_class=DEVICE_CLASS_VOLTAGE,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("voltage_l2"): sensor.sensor_schema(
unit_of_measurement=UNIT_VOLT,
accuracy_decimals=1,
accuracy_decimals=3,
device_class=DEVICE_CLASS_VOLTAGE,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("voltage_l3"): sensor.sensor_schema(
unit_of_measurement=UNIT_VOLT,
accuracy_decimals=1,
accuracy_decimals=3,
device_class=DEVICE_CLASS_VOLTAGE,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("voltage_avg_l1"): sensor.sensor_schema(
unit_of_measurement=UNIT_VOLT,
accuracy_decimals=3,
device_class=DEVICE_CLASS_VOLTAGE,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("voltage_avg_l2"): sensor.sensor_schema(
unit_of_measurement=UNIT_VOLT,
accuracy_decimals=3,
device_class=DEVICE_CLASS_VOLTAGE,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("voltage_avg_l3"): sensor.sensor_schema(
unit_of_measurement=UNIT_VOLT,
accuracy_decimals=3,
device_class=DEVICE_CLASS_VOLTAGE,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("voltage"): sensor.sensor_schema(
unit_of_measurement=UNIT_VOLT,
accuracy_decimals=3,
device_class=DEVICE_CLASS_VOLTAGE,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("frequency"): sensor.sensor_schema(
unit_of_measurement=UNIT_HERTZ,
accuracy_decimals=3,
device_class=DEVICE_CLASS_FREQUENCY,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("abs_power"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOWATT,
accuracy_decimals=3,
device_class=DEVICE_CLASS_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("gas_delivered"): sensor.sensor_schema(
unit_of_measurement=UNIT_CUBIC_METER,
accuracy_decimals=3,
@@ -244,6 +507,109 @@ CONFIG_SCHEMA = cv.Schema(
device_class=DEVICE_CLASS_WATER,
state_class=STATE_CLASS_TOTAL_INCREASING,
),
cv.Optional("thermal_delivered"): sensor.sensor_schema(
unit_of_measurement=UNIT_GIGA_JOULE,
accuracy_decimals=3,
device_class=DEVICE_CLASS_ENERGY,
state_class=STATE_CLASS_TOTAL_INCREASING,
),
cv.Optional("sub_delivered"): sensor.sensor_schema(
unit_of_measurement=UNIT_CUBIC_METER,
accuracy_decimals=3,
state_class=STATE_CLASS_TOTAL_INCREASING,
),
cv.Optional("gas_device_type"): sensor.sensor_schema(
accuracy_decimals=0,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("gas_valve_position"): sensor.sensor_schema(
accuracy_decimals=0,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("thermal_device_type"): sensor.sensor_schema(
accuracy_decimals=0,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("thermal_valve_position"): sensor.sensor_schema(
accuracy_decimals=0,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("water_device_type"): sensor.sensor_schema(
accuracy_decimals=0,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("water_valve_position"): sensor.sensor_schema(
accuracy_decimals=0,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("sub_device_type"): sensor.sensor_schema(
accuracy_decimals=0,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("sub_valve_position"): sensor.sensor_schema(
accuracy_decimals=0,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("apparent_delivery_power"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS,
accuracy_decimals=3,
device_class=DEVICE_CLASS_APPARENT_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("apparent_delivery_power_l1"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS,
accuracy_decimals=3,
device_class=DEVICE_CLASS_APPARENT_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("apparent_delivery_power_l2"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS,
accuracy_decimals=3,
device_class=DEVICE_CLASS_APPARENT_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("apparent_delivery_power_l3"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS,
accuracy_decimals=3,
device_class=DEVICE_CLASS_APPARENT_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("apparent_return_power"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS,
accuracy_decimals=3,
device_class=DEVICE_CLASS_APPARENT_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("apparent_return_power_l1"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS,
accuracy_decimals=3,
device_class=DEVICE_CLASS_APPARENT_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("apparent_return_power_l2"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS,
accuracy_decimals=3,
device_class=DEVICE_CLASS_APPARENT_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("apparent_return_power_l3"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS,
accuracy_decimals=3,
device_class=DEVICE_CLASS_APPARENT_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("active_demand_power"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOWATT,
accuracy_decimals=3,
device_class=DEVICE_CLASS_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("active_demand_abs"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOWATT,
accuracy_decimals=3,
device_class=DEVICE_CLASS_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional(
"active_energy_import_current_average_demand"
): sensor.sensor_schema(
@@ -252,6 +618,90 @@ CONFIG_SCHEMA = cv.Schema(
device_class=DEVICE_CLASS_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional(
"active_energy_export_current_average_demand"
): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOWATT,
accuracy_decimals=3,
device_class=DEVICE_CLASS_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional(
"reactive_energy_import_current_average_demand"
): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE,
accuracy_decimals=3,
device_class=DEVICE_CLASS_REACTIVE_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional(
"reactive_energy_export_current_average_demand"
): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE,
accuracy_decimals=3,
device_class=DEVICE_CLASS_REACTIVE_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional(
"apparent_energy_import_current_average_demand"
): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS,
accuracy_decimals=3,
device_class=DEVICE_CLASS_APPARENT_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional(
"apparent_energy_export_current_average_demand"
): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS,
accuracy_decimals=3,
device_class=DEVICE_CLASS_APPARENT_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("active_energy_import_last_completed_demand"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOWATT,
accuracy_decimals=3,
device_class=DEVICE_CLASS_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("active_energy_export_last_completed_demand"): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOWATT,
accuracy_decimals=3,
device_class=DEVICE_CLASS_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional(
"reactive_energy_import_last_completed_demand"
): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE,
accuracy_decimals=3,
device_class=DEVICE_CLASS_REACTIVE_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional(
"reactive_energy_export_last_completed_demand"
): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS_REACTIVE,
accuracy_decimals=3,
device_class=DEVICE_CLASS_REACTIVE_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional(
"apparent_energy_import_last_completed_demand"
): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS,
accuracy_decimals=3,
device_class=DEVICE_CLASS_APPARENT_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional(
"apparent_energy_export_last_completed_demand"
): sensor.sensor_schema(
unit_of_measurement=UNIT_KILOVOLT_AMPS,
accuracy_decimals=3,
device_class=DEVICE_CLASS_APPARENT_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional(
"active_energy_import_maximum_demand_running_month"
): sensor.sensor_schema(
@@ -268,6 +718,14 @@ CONFIG_SCHEMA = cv.Schema(
device_class=DEVICE_CLASS_POWER,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("fw_core_version"): sensor.sensor_schema(
accuracy_decimals=3,
state_class=STATE_CLASS_MEASUREMENT,
),
cv.Optional("fw_module_version"): sensor.sensor_schema(
accuracy_decimals=3,
state_class=STATE_CLASS_MEASUREMENT,
),
}
).extend(cv.COMPONENT_SCHEMA)

View File

@@ -18,11 +18,15 @@ CONFIG_SCHEMA = cv.Schema(
cv.Optional("electricity_failure_log"): text_sensor.text_sensor_schema(),
cv.Optional("message_short"): text_sensor.text_sensor_schema(),
cv.Optional("message_long"): text_sensor.text_sensor_schema(),
cv.Optional("equipment_id"): text_sensor.text_sensor_schema(),
cv.Optional("gas_equipment_id"): text_sensor.text_sensor_schema(),
cv.Optional("gas_equipment_id_be"): text_sensor.text_sensor_schema(),
cv.Optional("thermal_equipment_id"): text_sensor.text_sensor_schema(),
cv.Optional("water_equipment_id"): text_sensor.text_sensor_schema(),
cv.Optional("sub_equipment_id"): text_sensor.text_sensor_schema(),
cv.Optional("gas_delivered_text"): text_sensor.text_sensor_schema(),
cv.Optional("fw_core_checksum"): text_sensor.text_sensor_schema(),
cv.Optional("fw_module_checksum"): text_sensor.text_sensor_schema(),
cv.Optional("telegram"): text_sensor.text_sensor_schema().extend(
{cv.Optional(CONF_INTERNAL, default=True): cv.boolean}
),

View File

@@ -388,14 +388,14 @@ bool ESPHomeOTAComponent::readall_(uint8_t *buf, size_t len) {
while (len - at > 0) {
uint32_t now = millis();
if (now - start > OTA_SOCKET_TIMEOUT_DATA) {
ESP_LOGW(TAG, "Timeout reading %d bytes", len);
ESP_LOGW(TAG, "Timeout reading %zu bytes", len);
return false;
}
ssize_t read = this->client_->read(buf + at, len - at);
if (read == -1) {
if (!this->would_block_(errno)) {
ESP_LOGW(TAG, "Read err %d bytes, errno %d", len, errno);
ESP_LOGW(TAG, "Read err %zu bytes, errno %d", len, errno);
return false;
}
} else if (read == 0) {
@@ -415,14 +415,14 @@ bool ESPHomeOTAComponent::writeall_(const uint8_t *buf, size_t len) {
while (len - at > 0) {
uint32_t now = millis();
if (now - start > OTA_SOCKET_TIMEOUT_DATA) {
ESP_LOGW(TAG, "Timeout writing %d bytes", len);
ESP_LOGW(TAG, "Timeout writing %zu bytes", len);
return false;
}
ssize_t written = this->client_->write(buf + at, len - at);
if (written == -1) {
if (!this->would_block_(errno)) {
ESP_LOGW(TAG, "Write err %d bytes, errno %d", len, errno);
ESP_LOGW(TAG, "Write err %zu bytes, errno %d", len, errno);
return false;
}
} else {

View File

@@ -374,23 +374,6 @@ def is_svg_file(file):
return "<svg" in str(f.read(1024))
def validate_cairosvg_installed():
try:
import cairosvg
except ImportError as err:
raise cv.Invalid(
"Please install the cairosvg python package to use this feature. "
"(pip install cairosvg)"
) from err
major, minor, _ = cairosvg.__version__.split(".")
if major < "2" or major == "2" and minor < "2":
raise cv.Invalid(
"Please update your cairosvg installation to at least 2.2.0. "
"(pip install -U cairosvg)"
)
def validate_file_shorthand(value):
value = cv.string_strict(value)
parts = value.strip().split(":")
@@ -490,9 +473,7 @@ def validate_settings(value, path=()):
)
if file := value.get(CONF_FILE):
file = Path(file)
if is_svg_file(file):
validate_cairosvg_installed()
else:
if not is_svg_file(file):
try:
Image.open(file)
except UnidentifiedImageError as exc:
@@ -669,44 +650,35 @@ async def write_image(config, all_frames=False):
raise core.EsphomeError(f"Could not load image file {path}")
resize = config.get(CONF_RESIZE)
if is_svg_file(path):
# Local import so use of non-SVG files needn't require cairosvg installed
from pyexpat import ExpatError
from xml.etree.ElementTree import ParseError
try:
if is_svg_file(path):
import resvg_py
from cairosvg import svg2png
from cairosvg.helpers import PointError
if not resize:
resize = (None, None)
try:
with open(path, "rb") as file:
image = svg2png(
file_obj=file,
output_width=resize[0],
output_height=resize[1],
if resize:
width, height = resize
# resvg-py allows rendering by width/height directly
image_data = resvg_py.svg_to_bytes(
svg_path=str(path), width=int(width), height=int(height)
)
image = Image.open(io.BytesIO(image))
else:
# Default size
image_data = resvg_py.svg_to_bytes(svg_path=str(path))
# Convert bytes to Pillow Image
image = Image.open(io.BytesIO(image_data))
width, height = image.size
except (
ValueError,
ParseError,
IndexError,
ExpatError,
AttributeError,
TypeError,
PointError,
) as e:
raise core.EsphomeError(f"Could not load SVG image {path}: {e}") from e
else:
image = Image.open(path)
width, height = image.size
if resize:
# Preserve aspect ratio
new_width_max = min(width, resize[0])
new_height_max = min(height, resize[1])
ratio = min(new_width_max / width, new_height_max / height)
width, height = int(width * ratio), int(height * ratio)
else:
image = Image.open(path)
width, height = image.size
if resize:
# Preserve aspect ratio
new_width_max = min(width, resize[0])
new_height_max = min(height, resize[1])
ratio = min(new_width_max / width, new_height_max / height)
width, height = int(width * ratio), int(height * ratio)
except (OSError, UnidentifiedImageError, ValueError) as exc:
raise core.EsphomeError(f"Could not read image file {path}: {exc}") from exc
if not resize and (width > 500 or height > 500):
_LOGGER.warning(

View File

@@ -310,6 +310,10 @@ async def to_code(config):
if task_log_buffer_size > 0:
cg.add_define("USE_ESPHOME_TASK_LOG_BUFFER")
cg.add(log.init_log_buffer(task_log_buffer_size))
elif CORE.is_host:
cg.add(log.create_pthread_key())
cg.add_define("USE_ESPHOME_TASK_LOG_BUFFER")
cg.add(log.init_log_buffer(64)) # Fixed 64 slots for host
cg.add(log.set_log_level(initial_level))
if CONF_HARDWARE_UART in config:
@@ -520,10 +524,11 @@ FILTER_SOURCE_FILES = filter_source_files_from_platform(
PlatformFramework.LN882X_ARDUINO,
},
"logger_zephyr.cpp": {PlatformFramework.NRF52_ZEPHYR},
"task_log_buffer.cpp": {
"task_log_buffer_esp32.cpp": {
PlatformFramework.ESP32_ARDUINO,
PlatformFramework.ESP32_IDF,
},
"task_log_buffer_host.cpp": {PlatformFramework.HOST_NATIVE},
}
)

View File

@@ -12,14 +12,14 @@ namespace esphome::logger {
static const char *const TAG = "logger";
#ifdef USE_ESP32
// Implementation for ESP32 (multi-task platform with task-specific tracking)
// Main task always uses direct buffer access for console output and callbacks
#if defined(USE_ESP32) || defined(USE_HOST)
// Implementation for multi-threaded platforms (ESP32 with FreeRTOS, Host with pthreads)
// Main thread/task always uses direct buffer access for console output and callbacks
//
// For non-main tasks:
// For non-main threads/tasks:
// - WITH task log buffer: Prefer sending to ring buffer for async processing
// - Avoids allocating stack memory for console output in normal operation
// - Prevents console corruption from concurrent writes by multiple tasks
// - Prevents console corruption from concurrent writes by multiple threads
// - Messages are serialized through main loop for proper console output
// - Fallback to emergency console logging only if ring buffer is full
// - WITHOUT task log buffer: Only emergency console output, no callbacks
@@ -27,15 +27,20 @@ void HOT Logger::log_vprintf_(uint8_t level, const char *tag, int line, const ch
if (level > this->level_for(tag))
return;
#ifdef USE_ESP32
TaskHandle_t current_task = xTaskGetCurrentTaskHandle();
bool is_main_task = (current_task == main_task_);
#else // USE_HOST
pthread_t current_thread = pthread_self();
bool is_main_task = pthread_equal(current_thread, main_thread_);
#endif
// Check and set recursion guard - uses pthread TLS for per-task state
// Check and set recursion guard - uses pthread TLS for per-thread/task state
if (this->check_and_set_task_log_recursion_(is_main_task)) {
return; // Recursion detected
}
// Main task uses the shared buffer for efficiency
// Main thread/task uses the shared buffer for efficiency
if (is_main_task) {
this->log_message_to_buffer_and_send_(level, tag, line, format, args);
this->reset_task_log_recursion_(is_main_task);
@@ -44,9 +49,13 @@ void HOT Logger::log_vprintf_(uint8_t level, const char *tag, int line, const ch
bool message_sent = false;
#ifdef USE_ESPHOME_TASK_LOG_BUFFER
// For non-main tasks, queue the message for callbacks - but only if we have any callbacks registered
// For non-main threads/tasks, queue the message for callbacks
#ifdef USE_ESP32
message_sent =
this->log_buffer_->send_message_thread_safe(level, tag, static_cast<uint16_t>(line), current_task, format, args);
#else // USE_HOST
message_sent = this->log_buffer_->send_message_thread_safe(level, tag, static_cast<uint16_t>(line), format, args);
#endif
if (message_sent) {
// Enable logger loop to process the buffered message
// This is safe to call from any context including ISRs
@@ -54,13 +63,19 @@ void HOT Logger::log_vprintf_(uint8_t level, const char *tag, int line, const ch
}
#endif // USE_ESPHOME_TASK_LOG_BUFFER
// Emergency console logging for non-main tasks when ring buffer is full or disabled
// Emergency console logging for non-main threads when ring buffer is full or disabled
// This is a fallback mechanism to ensure critical log messages are visible
// Note: This may cause interleaved/corrupted console output if multiple tasks
// Note: This may cause interleaved/corrupted console output if multiple threads
// log simultaneously, but it's better than losing important messages entirely
#ifdef USE_HOST
if (!message_sent) {
// Host always has console output - no baud_rate check needed
static const size_t MAX_CONSOLE_LOG_MSG_SIZE = 512;
#else
if (!message_sent && this->baud_rate_ > 0) { // If logging is enabled, write to console
// Maximum size for console log messages (includes null terminator)
static const size_t MAX_CONSOLE_LOG_MSG_SIZE = 144;
#endif
char console_buffer[MAX_CONSOLE_LOG_MSG_SIZE]; // MUST be stack allocated for thread safety
uint16_t buffer_at = 0; // Initialize buffer position
this->format_log_to_buffer_with_terminator_(level, tag, line, format, args, console_buffer, &buffer_at,
@@ -70,7 +85,7 @@ void HOT Logger::log_vprintf_(uint8_t level, const char *tag, int line, const ch
this->write_msg_(console_buffer, buffer_at);
}
// Reset the recursion guard for this task
// Reset the recursion guard for this thread/task
this->reset_task_log_recursion_(is_main_task);
}
#else
@@ -86,7 +101,7 @@ void HOT Logger::log_vprintf_(uint8_t level, const char *tag, int line, const ch
global_recursion_guard_ = false;
}
#endif // !USE_ESP32
#endif // USE_ESP32 / USE_HOST
#ifdef USE_STORE_LOG_STR_IN_FLASH
// Implementation for ESP8266 with flash string support.
@@ -167,15 +182,24 @@ Logger::Logger(uint32_t baud_rate, size_t tx_buffer_size) : baud_rate_(baud_rate
this->main_task_ = xTaskGetCurrentTaskHandle();
#elif defined(USE_ZEPHYR)
this->main_task_ = k_current_get();
#elif defined(USE_HOST)
this->main_thread_ = pthread_self();
#endif
}
#ifdef USE_ESPHOME_TASK_LOG_BUFFER
void Logger::init_log_buffer(size_t total_buffer_size) {
#ifdef USE_HOST
// Host uses slot count instead of byte size
this->log_buffer_ = esphome::make_unique<logger::TaskLogBufferHost>(total_buffer_size);
#else
this->log_buffer_ = esphome::make_unique<logger::TaskLogBuffer>(total_buffer_size);
#endif
#ifdef USE_ESP32
// Start with loop disabled when using task buffer (unless using USB CDC)
// The loop will be enabled automatically when messages arrive
this->disable_loop_when_buffer_empty_();
#endif
}
#endif
@@ -187,41 +211,37 @@ void Logger::process_messages_() {
#ifdef USE_ESPHOME_TASK_LOG_BUFFER
// Process any buffered messages when available
if (this->log_buffer_->has_messages()) {
#ifdef USE_HOST
logger::TaskLogBufferHost::LogMessage *message;
while (this->log_buffer_->get_message_main_loop(&message)) {
const char *thread_name = message->thread_name[0] != '\0' ? message->thread_name : nullptr;
this->format_buffered_message_and_notify_(message->level, message->tag, message->line, thread_name, message->text,
message->text_length);
this->log_buffer_->release_message_main_loop();
this->write_tx_buffer_to_console_();
}
#else // USE_ESP32
logger::TaskLogBuffer::LogMessage *message;
const char *text;
void *received_token;
// Process messages from the buffer
while (this->log_buffer_->borrow_message_main_loop(&message, &text, &received_token)) {
this->tx_buffer_at_ = 0;
// Use the thread name that was stored when the message was created
// This avoids potential crashes if the task no longer exists
const char *thread_name = message->thread_name[0] != '\0' ? message->thread_name : nullptr;
this->write_header_to_buffer_(message->level, message->tag, message->line, thread_name, this->tx_buffer_,
&this->tx_buffer_at_, this->tx_buffer_size_);
this->write_body_to_buffer_(text, message->text_length, this->tx_buffer_, &this->tx_buffer_at_,
this->tx_buffer_size_);
this->write_footer_to_buffer_(this->tx_buffer_, &this->tx_buffer_at_, this->tx_buffer_size_);
this->tx_buffer_[this->tx_buffer_at_] = '\0';
size_t msg_len = this->tx_buffer_at_; // We already know the length from tx_buffer_at_
for (auto *listener : this->log_listeners_)
listener->on_log(message->level, message->tag, this->tx_buffer_, msg_len);
// At this point all the data we need from message has been transferred to the tx_buffer
// so we can release the message to allow other tasks to use it as soon as possible.
this->format_buffered_message_and_notify_(message->level, message->tag, message->line, thread_name, text,
message->text_length);
// Release the message to allow other tasks to use it as soon as possible
this->log_buffer_->release_message_main_loop(received_token);
// Write to console from the main loop to prevent corruption from concurrent writes
// This ensures all log messages appear on the console in a clean, serialized manner
// Note: Messages may appear slightly out of order due to async processing, but
// this is preferred over corrupted/interleaved console output
this->write_tx_buffer_to_console_();
}
} else {
#endif
}
#ifdef USE_ESP32
else {
// No messages to process, disable loop if appropriate
// This reduces overhead when there's no async logging activity
this->disable_loop_when_buffer_empty_();
}
#endif
#endif // USE_ESPHOME_TASK_LOG_BUFFER
}
void Logger::set_baud_rate(uint32_t baud_rate) { this->baud_rate_ = baud_rate; }
@@ -271,7 +291,11 @@ void Logger::dump_config() {
#endif
#ifdef USE_ESPHOME_TASK_LOG_BUFFER
if (this->log_buffer_) {
ESP_LOGCONFIG(TAG, " Task Log Buffer Size: %u", this->log_buffer_->size());
#ifdef USE_HOST
ESP_LOGCONFIG(TAG, " Task Log Buffer Slots: %u", static_cast<unsigned int>(this->log_buffer_->size()));
#else
ESP_LOGCONFIG(TAG, " Task Log Buffer Size: %u bytes", static_cast<unsigned int>(this->log_buffer_->size()));
#endif
}
#endif

View File

@@ -2,7 +2,7 @@
#include <cstdarg>
#include <map>
#ifdef USE_ESP32
#if defined(USE_ESP32) || defined(USE_HOST)
#include <pthread.h>
#endif
#include "esphome/core/automation.h"
@@ -12,7 +12,11 @@
#include "esphome/core/log.h"
#ifdef USE_ESPHOME_TASK_LOG_BUFFER
#include "task_log_buffer.h"
#ifdef USE_HOST
#include "task_log_buffer_host.h"
#elif defined(USE_ESP32)
#include "task_log_buffer_esp32.h"
#endif
#endif
#ifdef USE_ARDUINO
@@ -181,6 +185,9 @@ class Logger : public Component {
uart_port_t get_uart_num() const { return uart_num_; }
void create_pthread_key() { pthread_key_create(&log_recursion_key_, nullptr); }
#endif
#ifdef USE_HOST
void create_pthread_key() { pthread_key_create(&log_recursion_key_, nullptr); }
#endif
#if defined(USE_ESP32) || defined(USE_ESP8266) || defined(USE_RP2040) || defined(USE_LIBRETINY) || defined(USE_ZEPHYR)
void set_uart_selection(UARTSelection uart_selection) { uart_ = uart_selection; }
/// Get the UART used by the logger.
@@ -228,7 +235,7 @@ class Logger : public Component {
inline void HOT format_log_to_buffer_with_terminator_(uint8_t level, const char *tag, int line, const char *format,
va_list args, char *buffer, uint16_t *buffer_at,
uint16_t buffer_size) {
#if defined(USE_ESP32) || defined(USE_LIBRETINY)
#if defined(USE_ESP32) || defined(USE_LIBRETINY) || defined(USE_HOST)
this->write_header_to_buffer_(level, tag, line, this->get_thread_name_(), buffer, buffer_at, buffer_size);
#elif defined(USE_ZEPHYR)
char buff[MAX_POINTER_REPRESENTATION];
@@ -291,6 +298,22 @@ class Logger : public Component {
this->write_tx_buffer_to_console_();
}
#ifdef USE_ESPHOME_TASK_LOG_BUFFER
// Helper to format a pre-formatted message from the task log buffer and notify listeners
// Used by process_messages_ to avoid code duplication between ESP32 and host platforms
inline void HOT format_buffered_message_and_notify_(uint8_t level, const char *tag, uint16_t line,
const char *thread_name, const char *text, size_t text_length) {
this->tx_buffer_at_ = 0;
this->write_header_to_buffer_(level, tag, line, thread_name, this->tx_buffer_, &this->tx_buffer_at_,
this->tx_buffer_size_);
this->write_body_to_buffer_(text, text_length, this->tx_buffer_, &this->tx_buffer_at_, this->tx_buffer_size_);
this->write_footer_to_buffer_(this->tx_buffer_, &this->tx_buffer_at_, this->tx_buffer_size_);
this->tx_buffer_[this->tx_buffer_at_] = '\0';
for (auto *listener : this->log_listeners_)
listener->on_log(level, tag, this->tx_buffer_, this->tx_buffer_at_);
}
#endif
// Write the body of the log message to the buffer
inline void write_body_to_buffer_(const char *value, size_t length, char *buffer, uint16_t *buffer_at,
uint16_t buffer_size) {
@@ -325,6 +348,9 @@ class Logger : public Component {
#if defined(USE_ESP32) || defined(USE_LIBRETINY) || defined(USE_ZEPHYR)
void *main_task_ = nullptr; // Only used for thread name identification
#endif
#ifdef USE_HOST
pthread_t main_thread_{}; // Main thread for identification
#endif
#ifdef USE_ESP32
// Task-specific recursion guards:
// - Main task uses a dedicated member variable for efficiency
@@ -332,6 +358,10 @@ class Logger : public Component {
pthread_key_t log_recursion_key_; // 4 bytes
uart_port_t uart_num_; // 4 bytes (enum defaults to int size)
#endif
#ifdef USE_HOST
// Thread-specific recursion guards using pthread TLS
pthread_key_t log_recursion_key_;
#endif
// Large objects (internally aligned)
#ifdef USE_LOGGER_RUNTIME_TAG_LEVELS
@@ -342,7 +372,11 @@ class Logger : public Component {
std::vector<LoggerLevelListener *> level_listeners_; // Log level change listeners
#endif
#ifdef USE_ESPHOME_TASK_LOG_BUFFER
#ifdef USE_HOST
std::unique_ptr<logger::TaskLogBufferHost> log_buffer_; // Will be initialized with init_log_buffer
#elif defined(USE_ESP32)
std::unique_ptr<logger::TaskLogBuffer> log_buffer_; // Will be initialized with init_log_buffer
#endif
#endif
// Group smaller types together at the end
@@ -355,7 +389,7 @@ class Logger : public Component {
#ifdef USE_LIBRETINY
UARTSelection uart_{UART_SELECTION_DEFAULT};
#endif
#ifdef USE_ESP32
#if defined(USE_ESP32) || defined(USE_HOST)
bool main_task_recursion_guard_{false};
#else
bool global_recursion_guard_{false}; // Simple global recursion guard for single-task platforms
@@ -392,7 +426,7 @@ class Logger : public Component {
}
#endif
#ifdef USE_ESP32
#if defined(USE_ESP32) || defined(USE_HOST)
inline bool HOT check_and_set_task_log_recursion_(bool is_main_task) {
if (is_main_task) {
const bool was_recursive = main_task_recursion_guard_;
@@ -418,6 +452,22 @@ class Logger : public Component {
}
#endif
#ifdef USE_HOST
const char *HOT get_thread_name_() {
pthread_t current_thread = pthread_self();
if (pthread_equal(current_thread, main_thread_)) {
return nullptr; // Main thread
}
// For non-main threads, return the thread name
// We store it in thread-local storage to avoid allocation
static thread_local char thread_name_buf[32];
if (pthread_getname_np(current_thread, thread_name_buf, sizeof(thread_name_buf)) == 0) {
return thread_name_buf;
}
return nullptr;
}
#endif
static inline void copy_string(char *buffer, uint16_t &pos, const char *str) {
const size_t len = strlen(str);
// Intentionally no null terminator, building larger string
@@ -475,7 +525,7 @@ class Logger : public Component {
buffer[pos++] = '0' + (remainder - tens * 10);
buffer[pos++] = ']';
#if defined(USE_ESP32) || defined(USE_LIBRETINY) || defined(USE_ZEPHYR)
#if defined(USE_ESP32) || defined(USE_LIBRETINY) || defined(USE_ZEPHYR) || defined(USE_HOST)
if (thread_name != nullptr) {
write_ansi_color_for_level(buffer, pos, 1); // Always use bold red for thread name
buffer[pos++] = '[';

View File

@@ -0,0 +1,51 @@
#ifdef USE_ESP8266
#include "logger.h"
#include "esphome/core/log.h"
namespace esphome::logger {
static const char *const TAG = "logger";
void Logger::pre_setup() {
if (this->baud_rate_ > 0) {
switch (this->uart_) {
case UART_SELECTION_UART0:
case UART_SELECTION_UART0_SWAP:
this->hw_serial_ = &Serial;
Serial.begin(this->baud_rate_);
if (this->uart_ == UART_SELECTION_UART0_SWAP) {
Serial.swap();
}
Serial.setDebugOutput(ESPHOME_LOG_LEVEL >= ESPHOME_LOG_LEVEL_VERBOSE);
break;
case UART_SELECTION_UART1:
this->hw_serial_ = &Serial1;
Serial1.begin(this->baud_rate_);
Serial1.setDebugOutput(ESPHOME_LOG_LEVEL >= ESPHOME_LOG_LEVEL_VERBOSE);
break;
}
} else {
uart_set_debug(UART_NO);
}
global_logger = this;
ESP_LOGI(TAG, "Log initialized");
}
void HOT Logger::write_msg_(const char *msg) { this->hw_serial_->println(msg); }
const LogString *Logger::get_uart_selection_() {
switch (this->uart_) {
case UART_SELECTION_UART0:
return LOG_STR("UART0");
case UART_SELECTION_UART1:
return LOG_STR("UART1");
case UART_SELECTION_UART0_SWAP:
default:
return LOG_STR("UART0_SWAP");
}
}
} // namespace esphome::logger
#endif

View File

@@ -10,8 +10,9 @@ void HOT Logger::write_msg_(const char *msg, size_t len) {
time_t rawtime;
time(&rawtime);
struct tm *timeinfo = localtime(&rawtime);
size_t pos = strftime(buffer, TIMESTAMP_LEN + 1, "[%H:%M:%S]", timeinfo);
struct tm timeinfo;
localtime_r(&rawtime, &timeinfo); // Thread-safe version
size_t pos = strftime(buffer, TIMESTAMP_LEN + 1, "[%H:%M:%S]", &timeinfo);
// Copy message (with newline already included by caller)
size_t copy_len = std::min(len, sizeof(buffer) - pos);

View File

@@ -0,0 +1,22 @@
#if defined(USE_HOST)
#include "logger.h"
namespace esphome::logger {
void HOT Logger::write_msg_(const char *msg) {
time_t rawtime;
struct tm *timeinfo;
char buffer[80];
time(&rawtime);
timeinfo = localtime(&rawtime);
strftime(buffer, sizeof buffer, "[%H:%M:%S]", timeinfo);
fputs(buffer, stdout);
puts(msg);
}
void Logger::pre_setup() { global_logger = this; }
} // namespace esphome::logger
#endif

View File

@@ -0,0 +1,70 @@
#ifdef USE_LIBRETINY
#include "logger.h"
namespace esphome::logger {
static const char *const TAG = "logger";
void Logger::pre_setup() {
if (this->baud_rate_ > 0) {
switch (this->uart_) {
#if LT_HW_UART0
case UART_SELECTION_UART0:
this->hw_serial_ = &Serial0;
Serial0.begin(this->baud_rate_);
break;
#endif
#if LT_HW_UART1
case UART_SELECTION_UART1:
this->hw_serial_ = &Serial1;
Serial1.begin(this->baud_rate_);
break;
#endif
#if LT_HW_UART2
case UART_SELECTION_UART2:
this->hw_serial_ = &Serial2;
Serial2.begin(this->baud_rate_);
break;
#endif
default:
this->hw_serial_ = &Serial;
Serial.begin(this->baud_rate_);
if (this->uart_ != UART_SELECTION_DEFAULT) {
ESP_LOGW(TAG, " The chosen logger UART port is not available on this board."
"The default port was used instead.");
}
break;
}
// change lt_log() port to match default Serial
if (this->uart_ == UART_SELECTION_DEFAULT) {
this->uart_ = (UARTSelection) (LT_UART_DEFAULT_SERIAL + 1);
lt_log_set_port(LT_UART_DEFAULT_SERIAL);
} else {
lt_log_set_port(this->uart_ - 1);
}
}
global_logger = this;
ESP_LOGI(TAG, "Log initialized");
}
void HOT Logger::write_msg_(const char *msg) { this->hw_serial_->println(msg); }
const LogString *Logger::get_uart_selection_() {
switch (this->uart_) {
case UART_SELECTION_DEFAULT:
return LOG_STR("DEFAULT");
case UART_SELECTION_UART0:
return LOG_STR("UART0");
case UART_SELECTION_UART1:
return LOG_STR("UART1");
case UART_SELECTION_UART2:
default:
return LOG_STR("UART2");
}
}
} // namespace esphome::logger
#endif // USE_LIBRETINY

View File

@@ -0,0 +1,48 @@
#ifdef USE_RP2040
#include "logger.h"
#include "esphome/core/log.h"
namespace esphome::logger {
static const char *const TAG = "logger";
void Logger::pre_setup() {
if (this->baud_rate_ > 0) {
switch (this->uart_) {
case UART_SELECTION_UART0:
this->hw_serial_ = &Serial1;
Serial1.begin(this->baud_rate_);
break;
case UART_SELECTION_UART1:
this->hw_serial_ = &Serial2;
Serial2.begin(this->baud_rate_);
break;
case UART_SELECTION_USB_CDC:
this->hw_serial_ = &Serial;
Serial.begin(this->baud_rate_);
break;
}
}
global_logger = this;
ESP_LOGI(TAG, "Log initialized");
}
void HOT Logger::write_msg_(const char *msg) { this->hw_serial_->println(msg); }
const LogString *Logger::get_uart_selection_() {
switch (this->uart_) {
case UART_SELECTION_UART0:
return LOG_STR("UART0");
case UART_SELECTION_UART1:
return LOG_STR("UART1");
#ifdef USE_LOGGER_USB_CDC
case UART_SELECTION_USB_CDC:
return LOG_STR("USB_CDC");
#endif
default:
return LOG_STR("UNKNOWN");
}
}
} // namespace esphome::logger
#endif // USE_RP2040

View File

@@ -0,0 +1,96 @@
#ifdef USE_ZEPHYR
#include "esphome/core/application.h"
#include "esphome/core/log.h"
#include "logger.h"
#include <zephyr/device.h>
#include <zephyr/drivers/uart.h>
#include <zephyr/usb/usb_device.h>
namespace esphome::logger {
static const char *const TAG = "logger";
#ifdef USE_LOGGER_USB_CDC
void Logger::loop() {
if (this->uart_ != UART_SELECTION_USB_CDC || nullptr == this->uart_dev_) {
return;
}
static bool opened = false;
uint32_t dtr = 0;
uart_line_ctrl_get(this->uart_dev_, UART_LINE_CTRL_DTR, &dtr);
/* Poll if the DTR flag was set, optional */
if (opened == dtr) {
return;
}
if (!opened) {
App.schedule_dump_config();
}
opened = !opened;
}
#endif
void Logger::pre_setup() {
if (this->baud_rate_ > 0) {
static const struct device *uart_dev = nullptr;
switch (this->uart_) {
case UART_SELECTION_UART0:
uart_dev = DEVICE_DT_GET_OR_NULL(DT_NODELABEL(uart0));
break;
case UART_SELECTION_UART1:
uart_dev = DEVICE_DT_GET_OR_NULL(DT_NODELABEL(uart1));
break;
#ifdef USE_LOGGER_USB_CDC
case UART_SELECTION_USB_CDC:
uart_dev = DEVICE_DT_GET_OR_NULL(DT_NODELABEL(cdc_acm_uart0));
if (device_is_ready(uart_dev)) {
usb_enable(nullptr);
}
break;
#endif
}
if (!device_is_ready(uart_dev)) {
ESP_LOGE(TAG, "%s is not ready.", LOG_STR_ARG(get_uart_selection_()));
} else {
this->uart_dev_ = uart_dev;
}
}
global_logger = this;
ESP_LOGI(TAG, "Log initialized");
}
void HOT Logger::write_msg_(const char *msg) {
#ifdef CONFIG_PRINTK
printk("%s\n", msg);
#endif
if (nullptr == this->uart_dev_) {
return;
}
while (*msg) {
uart_poll_out(this->uart_dev_, *msg);
++msg;
}
uart_poll_out(this->uart_dev_, '\n');
}
const LogString *Logger::get_uart_selection_() {
switch (this->uart_) {
case UART_SELECTION_UART0:
return LOG_STR("UART0");
case UART_SELECTION_UART1:
return LOG_STR("UART1");
#ifdef USE_LOGGER_USB_CDC
case UART_SELECTION_USB_CDC:
return LOG_STR("USB_CDC");
#endif
default:
return LOG_STR("UNKNOWN");
}
}
} // namespace esphome::logger
#endif

View File

@@ -1,5 +1,6 @@
#ifdef USE_ESP32
#include "task_log_buffer.h"
#include "task_log_buffer_esp32.h"
#include "esphome/core/helpers.h"
#include "esphome/core/log.h"
@@ -134,3 +135,4 @@ bool TaskLogBuffer::send_message_thread_safe(uint8_t level, const char *tag, uin
} // namespace esphome::logger
#endif // USE_ESPHOME_TASK_LOG_BUFFER
#endif // USE_ESP32

View File

@@ -1,5 +1,7 @@
#pragma once
#ifdef USE_ESP32
#include "esphome/core/defines.h"
#include "esphome/core/helpers.h"
@@ -13,6 +15,22 @@
namespace esphome::logger {
/**
* @brief Task log buffer for ESP32 platform using FreeRTOS ring buffer.
*
* Threading Model: Multi-Producer Single-Consumer (MPSC)
* - Multiple FreeRTOS tasks can safely call send_message_thread_safe() concurrently
* - Only the main loop task calls borrow_message_main_loop() and release_message_main_loop()
*
* This uses the FreeRTOS ring buffer (RINGBUF_TYPE_NOSPLIT) which provides
* built-in thread-safety for the MPSC pattern. The ring buffer ensures
* message integrity - each message is stored contiguously.
*
* Design:
* - Variable-size messages with header + text stored contiguously
* - FreeRTOS ring buffer handles synchronization internally
* - Atomic counter for fast has_messages() check without ring buffer lock
*/
class TaskLogBuffer {
public:
// Structure for a log message header (text data follows immediately after)
@@ -65,3 +83,4 @@ class TaskLogBuffer {
} // namespace esphome::logger
#endif // USE_ESPHOME_TASK_LOG_BUFFER
#endif // USE_ESP32

View File

@@ -0,0 +1,157 @@
#ifdef USE_HOST
#include "task_log_buffer_host.h"
#ifdef USE_ESPHOME_TASK_LOG_BUFFER
#include "esphome/core/log.h"
#include <algorithm>
#include <cstdio>
namespace esphome::logger {
TaskLogBufferHost::TaskLogBufferHost(size_t slot_count) : slot_count_(slot_count) {
// Allocate message slots
this->slots_ = std::make_unique<LogMessage[]>(slot_count);
}
TaskLogBufferHost::~TaskLogBufferHost() {
// unique_ptr handles cleanup automatically
}
int TaskLogBufferHost::acquire_write_slot_() {
// Try to reserve a slot using compare-and-swap
size_t current_reserve = this->reserve_index_.load(std::memory_order_relaxed);
while (true) {
// Calculate next index (with wrap-around)
size_t next_reserve = (current_reserve + 1) % this->slot_count_;
// Check if buffer would be full
// Buffer is full when next write position equals read position
size_t current_read = this->read_index_.load(std::memory_order_acquire);
if (next_reserve == current_read) {
return -1; // Buffer full
}
// Try to claim this slot
if (this->reserve_index_.compare_exchange_weak(current_reserve, next_reserve, std::memory_order_acq_rel,
std::memory_order_relaxed)) {
return static_cast<int>(current_reserve);
}
// If CAS failed, current_reserve was updated, retry with new value
}
}
void TaskLogBufferHost::commit_write_slot_(int slot_index) {
// Mark the slot as ready for reading
this->slots_[slot_index].ready.store(true, std::memory_order_release);
// Try to advance the write_index if we're the next expected commit
// This ensures messages are read in order
size_t expected = slot_index;
size_t next = (slot_index + 1) % this->slot_count_;
// We only advance write_index if this slot is the next one expected
// This handles out-of-order commits correctly
while (true) {
if (!this->write_index_.compare_exchange_weak(expected, next, std::memory_order_release,
std::memory_order_relaxed)) {
// Someone else advanced it or we're not next in line, that's fine
break;
}
// Successfully advanced, check if next slot is also ready
expected = next;
next = (next + 1) % this->slot_count_;
if (!this->slots_[expected].ready.load(std::memory_order_acquire)) {
break;
}
}
}
bool TaskLogBufferHost::send_message_thread_safe(uint8_t level, const char *tag, uint16_t line, const char *format,
va_list args) {
// Acquire a slot
int slot_index = this->acquire_write_slot_();
if (slot_index < 0) {
return false; // Buffer full
}
LogMessage &msg = this->slots_[slot_index];
// Fill in the message header
msg.level = level;
msg.tag = tag;
msg.line = line;
// Get thread name using pthread
char thread_name_buf[LogMessage::MAX_THREAD_NAME_SIZE];
// pthread_getname_np works the same on Linux and macOS
if (pthread_getname_np(pthread_self(), thread_name_buf, sizeof(thread_name_buf)) == 0) {
strncpy(msg.thread_name, thread_name_buf, sizeof(msg.thread_name) - 1);
msg.thread_name[sizeof(msg.thread_name) - 1] = '\0';
} else {
msg.thread_name[0] = '\0';
}
// Format the message text
int ret = vsnprintf(msg.text, sizeof(msg.text), format, args);
if (ret < 0) {
// Formatting error - still commit the slot but with empty text
msg.text[0] = '\0';
msg.text_length = 0;
} else {
msg.text_length = static_cast<uint16_t>(std::min(static_cast<size_t>(ret), sizeof(msg.text) - 1));
}
// Remove trailing newlines
while (msg.text_length > 0 && msg.text[msg.text_length - 1] == '\n') {
msg.text_length--;
}
msg.text[msg.text_length] = '\0';
// Commit the slot
this->commit_write_slot_(slot_index);
return true;
}
bool TaskLogBufferHost::get_message_main_loop(LogMessage **message) {
if (message == nullptr) {
return false;
}
size_t current_read = this->read_index_.load(std::memory_order_relaxed);
size_t current_write = this->write_index_.load(std::memory_order_acquire);
// Check if buffer is empty
if (current_read == current_write) {
return false;
}
// Check if the slot is ready (should always be true if write_index advanced)
LogMessage &msg = this->slots_[current_read];
if (!msg.ready.load(std::memory_order_acquire)) {
return false;
}
*message = &msg;
return true;
}
void TaskLogBufferHost::release_message_main_loop() {
size_t current_read = this->read_index_.load(std::memory_order_relaxed);
// Clear the ready flag
this->slots_[current_read].ready.store(false, std::memory_order_release);
// Advance read index
size_t next_read = (current_read + 1) % this->slot_count_;
this->read_index_.store(next_read, std::memory_order_release);
}
} // namespace esphome::logger
#endif // USE_ESPHOME_TASK_LOG_BUFFER
#endif // USE_HOST

View File

@@ -0,0 +1,122 @@
#pragma once
#ifdef USE_HOST
#include "esphome/core/defines.h"
#include "esphome/core/helpers.h"
#ifdef USE_ESPHOME_TASK_LOG_BUFFER
#include <atomic>
#include <cstdarg>
#include <cstddef>
#include <cstring>
#include <memory>
#include <pthread.h>
namespace esphome::logger {
/**
* @brief Lock-free task log buffer for host platform.
*
* Threading Model: Multi-Producer Single-Consumer (MPSC)
* - Multiple threads can safely call send_message_thread_safe() concurrently
* - Only the main loop thread calls get_message_main_loop() and release_message_main_loop()
*
* Producers (multiple threads) Consumer (main loop only)
* │ │
* ▼ ▼
* acquire_write_slot_() get_message_main_loop()
* CAS on reserve_index_ read write_index_
* │ check ready flag
* ▼ │
* write to slot (exclusive) ▼
* │ read slot data
* ▼ │
* commit_write_slot_() ▼
* set ready=true release_message_main_loop()
* advance write_index_ set ready=false
* advance read_index_
*
* This implements a lock-free ring buffer for log messages on the host platform.
* It uses atomic compare-and-swap (CAS) operations for thread-safe slot reservation
* without requiring mutexes in the hot path.
*
* Design:
* - Fixed number of pre-allocated message slots to avoid dynamic allocation
* - Each slot contains a header and fixed-size text buffer
* - Atomic CAS for slot reservation allows multiple producers without locks
* - Single consumer (main loop) processes messages in order
*/
class TaskLogBufferHost {
public:
// Default number of message slots - host has plenty of memory
static constexpr size_t DEFAULT_SLOT_COUNT = 64;
// Structure for a log message (fixed size for lock-free operation)
struct LogMessage {
// Size constants
static constexpr size_t MAX_THREAD_NAME_SIZE = 32;
static constexpr size_t MAX_TEXT_SIZE = 512;
const char *tag; // Pointer to static tag string
char thread_name[MAX_THREAD_NAME_SIZE]; // Thread name (copied)
char text[MAX_TEXT_SIZE + 1]; // Message text with null terminator
uint16_t text_length; // Actual length of text
uint16_t line; // Source line number
uint8_t level; // Log level
std::atomic<bool> ready; // Message is ready to be consumed
LogMessage() : tag(nullptr), text_length(0), line(0), level(0), ready(false) {
thread_name[0] = '\0';
text[0] = '\0';
}
};
/// Constructor that takes the number of message slots
explicit TaskLogBufferHost(size_t slot_count);
~TaskLogBufferHost();
// NOT thread-safe - get next message from buffer, only call from main loop
// Returns true if a message was retrieved, false if buffer is empty
bool get_message_main_loop(LogMessage **message);
// NOT thread-safe - release the message after processing, only call from main loop
void release_message_main_loop();
// Thread-safe - send a message to the buffer from any thread
// Returns true if message was queued, false if buffer is full
bool send_message_thread_safe(uint8_t level, const char *tag, uint16_t line, const char *format, va_list args);
// Check if there are messages ready to be processed
inline bool HOT has_messages() const {
return read_index_.load(std::memory_order_acquire) != write_index_.load(std::memory_order_acquire);
}
// Get the buffer size (number of slots)
inline size_t size() const { return slot_count_; }
private:
// Acquire a slot for writing (thread-safe)
// Returns slot index or -1 if buffer is full
int acquire_write_slot_();
// Commit a slot after writing (thread-safe)
void commit_write_slot_(int slot_index);
std::unique_ptr<LogMessage[]> slots_; // Pre-allocated message slots
size_t slot_count_; // Number of slots
// Lock-free indices using atomics
// - reserve_index_: Next slot to reserve (producers CAS this to claim slots)
// - write_index_: Boundary of committed/ready slots (consumer reads up to this)
// - read_index_: Next slot to read (only consumer modifies this)
std::atomic<size_t> reserve_index_{0}; // Next slot to reserve for writing
std::atomic<size_t> write_index_{0}; // Last committed slot boundary
std::atomic<size_t> read_index_{0}; // Next slot to read from
};
} // namespace esphome::logger
#endif // USE_ESPHOME_TASK_LOG_BUFFER
#endif // USE_HOST

View File

@@ -1,3 +1,5 @@
import logging
from esphome import automation
import esphome.codegen as cg
from esphome.config_helpers import filter_source_files_from_platform
@@ -27,6 +29,8 @@ CONF_ON_PROGRESS = "on_progress"
CONF_ON_STATE_CHANGE = "on_state_change"
_LOGGER = logging.getLogger(__name__)
ota_ns = cg.esphome_ns.namespace("ota")
OTAComponent = ota_ns.class_("OTAComponent", cg.Component)
OTAState = ota_ns.enum("OTAState")
@@ -45,6 +49,10 @@ def _ota_final_validate(config):
raise cv.Invalid(
f"At least one platform must be specified for '{CONF_OTA}'; add '{CONF_PLATFORM}: {CONF_ESPHOME}' for original OTA functionality"
)
if CORE.is_host:
_LOGGER.warning(
"OTA not available for platform 'host'. OTA functionality disabled."
)
FINAL_VALIDATE_SCHEMA = _ota_final_validate

View File

@@ -0,0 +1,24 @@
#ifdef USE_HOST
#include "ota_backend_host.h"
#include "esphome/core/defines.h"
namespace esphome::ota {
// Stub implementation - OTA is not supported on host platform.
// All methods return error codes to allow compilation of configs with OTA triggers.
std::unique_ptr<ota::OTABackend> make_ota_backend() { return make_unique<ota::HostOTABackend>(); }
OTAResponseTypes HostOTABackend::begin(size_t image_size) { return OTA_RESPONSE_ERROR_UPDATE_PREPARE; }
void HostOTABackend::set_update_md5(const char *expected_md5) {}
OTAResponseTypes HostOTABackend::write(uint8_t *data, size_t len) { return OTA_RESPONSE_ERROR_WRITING_FLASH; }
OTAResponseTypes HostOTABackend::end() { return OTA_RESPONSE_ERROR_UPDATE_END; }
void HostOTABackend::abort() {}
} // namespace esphome::ota
#endif

View File

@@ -0,0 +1,21 @@
#pragma once
#ifdef USE_HOST
#include "ota_backend.h"
namespace esphome::ota {
/// Stub OTA backend for host platform - allows compilation but does not implement OTA.
/// All operations return error codes immediately. This enables configurations with
/// OTA triggers to compile for host platform during development.
class HostOTABackend : public OTABackend {
public:
OTAResponseTypes begin(size_t image_size) override;
void set_update_md5(const char *md5) override;
OTAResponseTypes write(uint8_t *data, size_t len) override;
OTAResponseTypes end() override;
void abort() override;
bool supports_compression() override { return false; }
};
} // namespace esphome::ota
#endif

View File

@@ -38,6 +38,8 @@ lib_deps_base =
wjtje/qr-code-generator-library@1.7.0 ; qr_code
functionpointer/arduino-MLX90393@1.0.2 ; mlx90393
pavlodn/HaierProtocol@0.9.31 ; haier
esphome/dsmr_parser@1.0.0 ; dsmr
polargoose/Crypto-no-arduino@0.4.0 ; dsmr
https://github.com/esphome/TinyGPSPlus.git#v1.1.0 ; gps
; This is using the repository until a new release is published to PlatformIO
https://github.com/Sensirion/arduino-gas-index-algorithm.git#3.2.1 ; Sensirion Gas Index Algorithm Arduino Library
@@ -82,8 +84,6 @@ lib_deps =
heman/AsyncMqttClient-esphome@1.0.0 ; mqtt
fastled/FastLED@3.9.16 ; fastled_base
freekode/TM1651@1.0.1 ; tm1651
glmnet/Dsmr@0.7 ; dsmr
rweather/Crypto@0.4.0 ; dsmr
dudanov/MideaUART@1.1.9 ; midea
tonia/HeatpumpIR@1.0.37 ; heatpumpir
build_flags =

View File

@@ -19,13 +19,7 @@ ruamel.yaml==0.19.1 # dashboard_import
ruamel.yaml.clib==0.2.15 # dashboard_import
esphome-glyphsets==0.2.0
pillow==11.3.0
# pycairo fork for Windows
cairosvg @ git+https://github.com/clydebarrow/cairosvg.git@release ; sys_platform == 'win32'
# Original for everything else
cairosvg==2.8.2 ; sys_platform != 'win32'
resvg-py==0.2.5
freetype-py==2.5.1
jinja2==3.1.6
bleak==2.1.1

View File

@@ -580,6 +580,7 @@ def lint_relative_py_import(fname: Path, line, col, content):
],
exclude=[
"esphome/components/socket/headers.h",
"esphome/components/async_tcp/async_tcp.h",
"esphome/components/esp32/core.cpp",
"esphome/components/esp8266/core.cpp",
"esphome/components/rp2040/core.cpp",

View File

@@ -0,0 +1,7 @@
substitutions:
request_pin: GPIO15
packages:
uart: !include ../../test_build_components/common/uart/esp32-idf.yaml
<<: !include common.yaml

View File

@@ -0,0 +1,4 @@
<<: !include common.yaml
#host platform does not support wifi / network is automatically included
wifi: !remove

View File

@@ -0,0 +1,91 @@
esphome:
name: host-logger-thread-test
host:
api:
logger:
button:
- platform: template
name: "Start Thread Race Test"
id: start_test_button
on_press:
- lambda: |-
// Number of threads and messages per thread
static const int NUM_THREADS = 3;
static const int MESSAGES_PER_THREAD = 100;
// Counters
static std::atomic<int> total_messages_logged{0};
// Thread function - must be a regular function pointer for pthread
struct ThreadTest {
static void *thread_func(void *arg) {
int thread_id = *static_cast<int *>(arg);
// Set thread name (different signatures on macOS vs Linux)
char thread_name[16];
snprintf(thread_name, sizeof(thread_name), "LogThread%d", thread_id);
#ifdef __APPLE__
pthread_setname_np(thread_name);
#else
pthread_setname_np(pthread_self(), thread_name);
#endif
// Log messages with different log levels
for (int i = 0; i < MESSAGES_PER_THREAD; i++) {
switch (i % 4) {
case 0:
ESP_LOGI("thread_test", "THREAD%d_MSG%03d_INFO_MESSAGE_WITH_DATA_%08X",
thread_id, i, i * 12345);
break;
case 1:
ESP_LOGD("thread_test", "THREAD%d_MSG%03d_DEBUG_MESSAGE_WITH_DATA_%08X",
thread_id, i, i * 12345);
break;
case 2:
ESP_LOGW("thread_test", "THREAD%d_MSG%03d_WARN_MESSAGE_WITH_DATA_%08X",
thread_id, i, i * 12345);
break;
case 3:
ESP_LOGE("thread_test", "THREAD%d_MSG%03d_ERROR_MESSAGE_WITH_DATA_%08X",
thread_id, i, i * 12345);
break;
}
total_messages_logged.fetch_add(1, std::memory_order_relaxed);
// Small busy loop to vary timing between threads
int delay_count = (thread_id + 1) * 10;
while (delay_count-- > 0) {
asm volatile("" ::: "memory"); // Prevent optimization
}
}
return nullptr;
}
};
ESP_LOGI("thread_test", "RACE_TEST_START: Starting %d threads with %d messages each",
NUM_THREADS, MESSAGES_PER_THREAD);
// Reset counter for this test run
total_messages_logged.store(0, std::memory_order_relaxed);
pthread_t threads[NUM_THREADS];
int thread_ids[NUM_THREADS];
// Create all threads
for (int i = 0; i < NUM_THREADS; i++) {
thread_ids[i] = i;
int ret = pthread_create(&threads[i], nullptr, ThreadTest::thread_func, &thread_ids[i]);
if (ret != 0) {
ESP_LOGE("thread_test", "RACE_TEST_ERROR: Failed to create thread %d", i);
return;
}
}
// Wait for all threads to complete
for (int i = 0; i < NUM_THREADS; i++) {
pthread_join(threads[i], nullptr);
}
ESP_LOGI("thread_test", "RACE_TEST_COMPLETE: All threads finished, total messages: %d",
total_messages_logged.load(std::memory_order_relaxed));

View File

@@ -0,0 +1,182 @@
"""Integration test for host logger thread safety.
This test verifies that the logger's MPSC ring buffer correctly handles
multiple threads racing to log messages without corruption or data loss.
"""
from __future__ import annotations
import asyncio
import re
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
# Expected pattern for log messages from threads
# Format: THREADn_MSGnnn_LEVEL_MESSAGE_WITH_DATA_xxxxxxxx
THREAD_MSG_PATTERN = re.compile(
r"THREAD(\d+)_MSG(\d{3})_(INFO|DEBUG|WARN|ERROR)_MESSAGE_WITH_DATA_([0-9A-F]{8})"
)
# Pattern for test start/complete markers
TEST_START_PATTERN = re.compile(r"RACE_TEST_START.*Starting (\d+) threads")
TEST_COMPLETE_PATTERN = re.compile(r"RACE_TEST_COMPLETE.*total messages: (\d+)")
# Expected values
NUM_THREADS = 3
MESSAGES_PER_THREAD = 100
EXPECTED_TOTAL_MESSAGES = NUM_THREADS * MESSAGES_PER_THREAD
@pytest.mark.asyncio
async def test_host_logger_thread_safety(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test that multiple threads can log concurrently without corruption.
This test:
1. Spawns 3 threads that each log 100 messages
2. Collects all log output
3. Verifies no lines are corrupted (partially written or interleaved)
4. Verifies all expected messages were received
"""
collected_lines: list[str] = []
test_complete_event = asyncio.Event()
def line_callback(line: str) -> None:
"""Collect log lines and detect test completion."""
collected_lines.append(line)
if "RACE_TEST_COMPLETE" in line:
test_complete_event.set()
# Run the test binary and collect output
async with (
run_compiled(yaml_config, line_callback=line_callback),
api_client_connected() as client,
):
# Verify connection works
device_info = await client.device_info()
assert device_info is not None
assert device_info.name == "host-logger-thread-test"
# Get the button entity - find by name
entities, _ = await client.list_entities_services()
button_entities = [e for e in entities if e.name == "Start Thread Race Test"]
assert button_entities, "Could not find Start Thread Race Test button"
button_key = button_entities[0].key
# Press the button to start the thread race test
client.button_command(button_key)
# Wait for test to complete (with timeout)
try:
await asyncio.wait_for(test_complete_event.wait(), timeout=30.0)
except TimeoutError:
pytest.fail(
"Test did not complete within timeout. "
f"Collected {len(collected_lines)} lines."
)
# Give a bit more time for any remaining buffered messages
await asyncio.sleep(0.5)
# Analyze collected log lines
thread_messages: dict[int, set[int]] = {i: set() for i in range(NUM_THREADS)}
corrupted_lines: list[str] = []
test_started = False
test_completed = False
reported_total = 0
for line in collected_lines:
# Check for test start
start_match = TEST_START_PATTERN.search(line)
if start_match:
test_started = True
assert int(start_match.group(1)) == NUM_THREADS, (
f"Unexpected thread count: {start_match.group(1)}"
)
continue
# Check for test completion
complete_match = TEST_COMPLETE_PATTERN.search(line)
if complete_match:
test_completed = True
reported_total = int(complete_match.group(1))
continue
# Check for thread messages
msg_match = THREAD_MSG_PATTERN.search(line)
if msg_match:
thread_id = int(msg_match.group(1))
msg_num = int(msg_match.group(2))
# level = msg_match.group(3) # INFO, DEBUG, WARN, ERROR
data_hex = msg_match.group(4)
# Verify data value matches expected calculation
expected_data = f"{msg_num * 12345:08X}"
if data_hex != expected_data:
corrupted_lines.append(
f"Data mismatch in line: {line} "
f"(expected {expected_data}, got {data_hex})"
)
continue
# Track which messages we received from each thread
if 0 <= thread_id < NUM_THREADS:
thread_messages[thread_id].add(msg_num)
else:
corrupted_lines.append(f"Invalid thread ID in line: {line}")
continue
# Check for partial/corrupted thread messages
# If a line contains part of a thread message pattern but doesn't match fully
# This could indicate line corruption from interleaving
if (
"THREAD" in line
and "MSG" in line
and not msg_match
and "_MESSAGE_WITH_DATA_" in line
):
corrupted_lines.append(f"Possibly corrupted line: {line}")
# Assertions
assert test_started, "Test start marker not found in output"
assert test_completed, "Test completion marker not found in output"
assert reported_total == EXPECTED_TOTAL_MESSAGES, (
f"Reported total {reported_total} != expected {EXPECTED_TOTAL_MESSAGES}"
)
# Check for corrupted lines
assert not corrupted_lines, (
f"Found {len(corrupted_lines)} corrupted lines:\n"
+ "\n".join(corrupted_lines[:10]) # Show first 10
)
# Count total messages received
total_received = sum(len(msgs) for msgs in thread_messages.values())
# We may not receive all messages due to ring buffer overflow when buffer is full
# The test primarily verifies no corruption, not that we receive every message
# However, we should receive a reasonable number of messages
min_expected = EXPECTED_TOTAL_MESSAGES // 2 # At least 50%
assert total_received >= min_expected, (
f"Received only {total_received} messages, expected at least {min_expected}. "
f"Per-thread breakdown: "
+ ", ".join(f"Thread{i}: {len(msgs)}" for i, msgs in thread_messages.items())
)
# Verify we got messages from all threads (proves concurrent logging worked)
for thread_id in range(NUM_THREADS):
assert thread_messages[thread_id], (
f"No messages received from thread {thread_id}"
)
# Log summary for debugging
print("\nThread safety test summary:")
print(f" Total messages received: {total_received}/{EXPECTED_TOTAL_MESSAGES}")
for thread_id in range(NUM_THREADS):
received = len(thread_messages[thread_id])
print(f" Thread {thread_id}: {received}/{MESSAGES_PER_THREAD} messages")