[api] Device defined action responses (#12136)

Co-authored-by: J. Nick Koston <nick@home-assistant.io>
Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Jesse Hills
2025-12-07 04:47:57 +13:00
committed by GitHub
parent 75c41b11d1
commit f20aaf3981
46 changed files with 1455 additions and 105 deletions

View File

@@ -27,12 +27,13 @@ from esphome.const import (
CONF_SERVICE,
CONF_SERVICES,
CONF_TAG,
CONF_THEN,
CONF_TRIGGER_ID,
CONF_VARIABLES,
)
from esphome.core import CORE, ID, CoroPriority, coroutine_with_priority
from esphome.cpp_generator import TemplateArgsType
from esphome.types import ConfigType
from esphome.core import CORE, ID, CoroPriority, EsphomeError, coroutine_with_priority
from esphome.cpp_generator import MockObj, TemplateArgsType
from esphome.types import ConfigFragmentType, ConfigType
_LOGGER = logging.getLogger(__name__)
@@ -63,17 +64,21 @@ HomeAssistantActionResponseTrigger = api_ns.class_(
"HomeAssistantActionResponseTrigger", automation.Trigger
)
APIConnectedCondition = api_ns.class_("APIConnectedCondition", Condition)
APIRespondAction = api_ns.class_("APIRespondAction", automation.Action)
APIUnregisterServiceCallAction = api_ns.class_(
"APIUnregisterServiceCallAction", automation.Action
)
UserServiceTrigger = api_ns.class_("UserServiceTrigger", automation.Trigger)
ListEntitiesServicesArgument = api_ns.class_("ListEntitiesServicesArgument")
SERVICE_ARG_NATIVE_TYPES = {
"bool": bool,
SERVICE_ARG_NATIVE_TYPES: dict[str, MockObj] = {
"bool": cg.bool_,
"int": cg.int32,
"float": float,
"float": cg.float_,
"string": cg.std_string,
"bool[]": cg.FixedVector.template(bool).operator("const").operator("ref"),
"bool[]": cg.FixedVector.template(cg.bool_).operator("const").operator("ref"),
"int[]": cg.FixedVector.template(cg.int32).operator("const").operator("ref"),
"float[]": cg.FixedVector.template(float).operator("const").operator("ref"),
"float[]": cg.FixedVector.template(cg.float_).operator("const").operator("ref"),
"string[]": cg.FixedVector.template(cg.std_string)
.operator("const")
.operator("ref"),
@@ -102,6 +107,85 @@ def validate_encryption_key(value):
return value
CONF_SUPPORTS_RESPONSE = "supports_response"
# Enum values in api::enums namespace
enums_ns = api_ns.namespace("enums")
SUPPORTS_RESPONSE_OPTIONS = {
"none": enums_ns.SUPPORTS_RESPONSE_NONE,
"optional": enums_ns.SUPPORTS_RESPONSE_OPTIONAL,
"only": enums_ns.SUPPORTS_RESPONSE_ONLY,
"status": enums_ns.SUPPORTS_RESPONSE_STATUS,
}
def _auto_detect_supports_response(config: ConfigType) -> ConfigType:
"""Auto-detect supports_response based on api.respond usage in the action's then block.
- If api.respond with data found: set to "optional" (unless user explicitly set)
- If api.respond without data found: set to "status" (unless user explicitly set)
- If no api.respond found: set to "none" (unless user explicitly set)
"""
def scan_actions(items: ConfigFragmentType) -> tuple[bool, bool]:
"""Recursively scan actions for api.respond.
Returns: (found, has_data) tuple - has_data is True if ANY api.respond has data
"""
found_any = False
has_data_any = False
if isinstance(items, list):
for item in items:
found, has_data = scan_actions(item)
if found:
found_any = True
has_data_any = has_data_any or has_data
elif isinstance(items, dict):
# Check if this is an api.respond action
if "api.respond" in items:
respond_config = items["api.respond"]
has_data = isinstance(respond_config, dict) and "data" in respond_config
return True, has_data
# Recursively check all values
for value in items.values():
found, has_data = scan_actions(value)
if found:
found_any = True
has_data_any = has_data_any or has_data
return found_any, has_data_any
then = config.get(CONF_THEN, [])
action_name = config.get(CONF_ACTION)
found, has_data = scan_actions(then)
# If user explicitly set supports_response, validate and use that
if CONF_SUPPORTS_RESPONSE in config:
user_value = config[CONF_SUPPORTS_RESPONSE]
# Validate: "only" requires api.respond with data
if user_value == "only" and not has_data:
raise cv.Invalid(
f"Action '{action_name}' has supports_response=only but no api.respond "
"action with 'data:' was found. Use 'status' for responses without data, "
"or add 'data:' to your api.respond action."
)
return config
# Auto-detect based on api.respond usage
if found:
config[CONF_SUPPORTS_RESPONSE] = "optional" if has_data else "status"
else:
config[CONF_SUPPORTS_RESPONSE] = "none"
return config
def _validate_supports_response(value):
"""Validate supports_response after auto-detection has set the value."""
return cv.enum(SUPPORTS_RESPONSE_OPTIONS, lower=True)(value)
ACTIONS_SCHEMA = automation.validate_automation(
{
cv.GenerateID(CONF_TRIGGER_ID): cv.declare_id(UserServiceTrigger),
@@ -112,10 +196,20 @@ ACTIONS_SCHEMA = automation.validate_automation(
cv.validate_id_name: cv.one_of(*SERVICE_ARG_NATIVE_TYPES, lower=True),
}
),
# No default - auto-detected by _auto_detect_supports_response
cv.Optional(CONF_SUPPORTS_RESPONSE): cv.enum(
SUPPORTS_RESPONSE_OPTIONS, lower=True
),
},
cv.All(
cv.has_exactly_one_key(CONF_SERVICE, CONF_ACTION),
cv.rename_key(CONF_SERVICE, CONF_ACTION),
_auto_detect_supports_response,
# Re-validate supports_response after auto-detection sets it
cv.Schema(
{cv.Required(CONF_SUPPORTS_RESPONSE): _validate_supports_response},
extra=cv.ALLOW_EXTRA,
),
),
)
@@ -242,7 +336,7 @@ CONFIG_SCHEMA = cv.All(
@coroutine_with_priority(CoroPriority.WEB)
async def to_code(config):
async def to_code(config: ConfigType) -> None:
var = cg.new_Pvariable(config[CONF_ID])
await cg.register_component(var, config)
@@ -279,20 +373,61 @@ async def to_code(config):
# Collect all triggers first, then register all at once with initializer_list
triggers: list[cg.Pvariable] = []
for conf in actions:
template_args = []
func_args = []
service_arg_names = []
func_args: list[tuple[MockObj, str]] = []
service_template_args: list[MockObj] = [] # User service argument types
# Determine supports_response mode
# cv.enum returns the key with enum_value attribute containing the MockObj
supports_response_key = conf[CONF_SUPPORTS_RESPONSE]
supports_response = supports_response_key.enum_value
is_none = supports_response_key == "none"
is_optional = supports_response_key == "optional"
# Add call_id and return_response based on supports_response mode
# These must match the C++ Trigger template arguments
# - none: no extra args
# - status: call_id only (for reporting success/error without data)
# - only: call_id only (response always expected with data)
# - optional: call_id + return_response (client decides)
if not is_none:
# call_id is present for "optional", "only", and "status"
func_args.append((cg.uint32, "call_id"))
# return_response only present for "optional"
if is_optional:
func_args.append((cg.bool_, "return_response"))
service_arg_names: list[str] = []
for name, var_ in conf[CONF_VARIABLES].items():
native = SERVICE_ARG_NATIVE_TYPES[var_]
template_args.append(native)
service_template_args.append(native)
func_args.append((native, name))
service_arg_names.append(name)
templ = cg.TemplateArguments(*template_args)
# Template args: supports_response mode, then user service arg types
templ = cg.TemplateArguments(supports_response, *service_template_args)
trigger = cg.new_Pvariable(
conf[CONF_TRIGGER_ID], templ, conf[CONF_ACTION], service_arg_names
conf[CONF_TRIGGER_ID],
templ,
conf[CONF_ACTION],
service_arg_names,
)
triggers.append(trigger)
await automation.build_automation(trigger, func_args, conf)
auto = await automation.build_automation(trigger, func_args, conf)
# For non-none response modes, automatically append unregister action
# This ensures the call is unregistered after all actions complete (including async ones)
if not is_none:
arg_types = [arg[0] for arg in func_args]
action_templ = cg.TemplateArguments(*arg_types)
unregister_id = ID(
f"{conf[CONF_TRIGGER_ID]}__unregister",
is_declaration=True,
type=APIUnregisterServiceCallAction.template(action_templ),
)
unregister_action = cg.new_Pvariable(
unregister_id,
var,
)
cg.add(auto.add_actions([unregister_action]))
# Register all services at once - single allocation, no reallocations
cg.add(var.initialize_user_services(triggers))
@@ -538,6 +673,80 @@ async def homeassistant_tag_scanned_to_code(config, action_id, template_arg, arg
return var
CONF_SUCCESS = "success"
CONF_ERROR_MESSAGE = "error_message"
def _validate_api_respond_data(config):
"""Set flag during validation so AUTO_LOAD can include json component."""
if CONF_DATA in config:
CORE.data.setdefault(DOMAIN, {})[CONF_CAPTURE_RESPONSE] = True
return config
API_RESPOND_ACTION_SCHEMA = cv.All(
cv.Schema(
{
cv.GenerateID(): cv.use_id(APIServer),
cv.Optional(CONF_SUCCESS, default=True): cv.templatable(cv.boolean),
cv.Optional(CONF_ERROR_MESSAGE, default=""): cv.templatable(cv.string),
cv.Optional(CONF_DATA): cv.lambda_,
}
),
_validate_api_respond_data,
)
@automation.register_action(
"api.respond",
APIRespondAction,
API_RESPOND_ACTION_SCHEMA,
)
async def api_respond_to_code(
config: ConfigType,
action_id: ID,
template_arg: cg.TemplateArguments,
args: TemplateArgsType,
) -> MockObj:
# Validate that api.respond is used inside an API action context.
# We can't easily validate this at config time since the schema validation
# doesn't have access to the parent action context. Validating here in to_code
# is still much better than a cryptic C++ compile error.
has_call_id = any(name == "call_id" for _, name in args)
if not has_call_id:
raise EsphomeError(
"api.respond can only be used inside an API action's 'then:' block. "
"The 'call_id' variable is required to send a response."
)
cg.add_define("USE_API_USER_DEFINED_ACTION_RESPONSES")
serv = await cg.get_variable(config[CONF_ID])
var = cg.new_Pvariable(action_id, template_arg, serv)
# Check if we're in optional mode (has return_response arg)
is_optional = any(name == "return_response" for _, name in args)
if is_optional:
cg.add(var.set_is_optional_mode(True))
templ = await cg.templatable(config[CONF_SUCCESS], args, cg.bool_)
cg.add(var.set_success(templ))
templ = await cg.templatable(config[CONF_ERROR_MESSAGE], args, cg.std_string)
cg.add(var.set_error_message(templ))
if CONF_DATA in config:
cg.add_define("USE_API_USER_DEFINED_ACTION_RESPONSES_JSON")
# Lambda populates the JsonObject root - no return value needed
lambda_ = await cg.process_lambda(
config[CONF_DATA],
args + [(cg.JsonObject, "root")],
return_type=cg.void,
)
cg.add(var.set_data(lambda_))
return var
API_CONNECTED_CONDITION_SCHEMA = cv.Schema(
{
cv.GenerateID(): cv.use_id(APIServer),

View File

@@ -855,6 +855,14 @@ enum ServiceArgType {
SERVICE_ARG_TYPE_FLOAT_ARRAY = 6;
SERVICE_ARG_TYPE_STRING_ARRAY = 7;
}
enum SupportsResponseType {
SUPPORTS_RESPONSE_NONE = 0;
SUPPORTS_RESPONSE_OPTIONAL = 1;
SUPPORTS_RESPONSE_ONLY = 2;
// Status-only response - reports success/error without data payload
// Value is higher to avoid conflicts with future Home Assistant values
SUPPORTS_RESPONSE_STATUS = 100;
}
message ListEntitiesServicesArgument {
option (ifdef) = "USE_API_USER_DEFINED_ACTIONS";
string name = 1;
@@ -868,6 +876,7 @@ message ListEntitiesServicesResponse {
string name = 1;
fixed32 key = 2;
repeated ListEntitiesServicesArgument args = 3 [(fixed_vector) = true];
SupportsResponseType supports_response = 4;
}
message ExecuteServiceArgument {
option (ifdef) = "USE_API_USER_DEFINED_ACTIONS";
@@ -890,6 +899,21 @@ message ExecuteServiceRequest {
fixed32 key = 1;
repeated ExecuteServiceArgument args = 2 [(fixed_vector) = true];
uint32 call_id = 3 [(field_ifdef) = "USE_API_USER_DEFINED_ACTION_RESPONSES"];
bool return_response = 4 [(field_ifdef) = "USE_API_USER_DEFINED_ACTION_RESPONSES"];
}
// Message sent by ESPHome to Home Assistant with service execution response data
message ExecuteServiceResponse {
option (id) = 131;
option (source) = SOURCE_SERVER;
option (no_delay) = true;
option (ifdef) = "USE_API_USER_DEFINED_ACTION_RESPONSES";
uint32 call_id = 1; // Matches the call_id from ExecuteServiceRequest
bool success = 2; // Whether the service execution succeeded
string error_message = 3; // Error message if success = false
bytes response_data = 4 [(pointer_to_buffer) = true, (field_ifdef) = "USE_API_USER_DEFINED_ACTION_RESPONSES_JSON"];
}
// ==================== CAMERA ====================

View File

@@ -6,6 +6,9 @@
#ifdef USE_API_PLAINTEXT
#include "api_frame_helper_plaintext.h"
#endif
#ifdef USE_API_USER_DEFINED_ACTIONS
#include "user_services.h"
#endif
#include <cerrno>
#include <cinttypes>
#include <functional>
@@ -1554,15 +1557,54 @@ void APIConnection::on_home_assistant_state_response(const HomeAssistantStateRes
#ifdef USE_API_USER_DEFINED_ACTIONS
void APIConnection::execute_service(const ExecuteServiceRequest &msg) {
bool found = false;
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
// Register the call and get a unique server-generated action_call_id
// This avoids collisions when multiple clients use the same call_id
uint32_t action_call_id = 0;
if (msg.call_id != 0) {
action_call_id = this->parent_->register_active_action_call(msg.call_id, this);
}
// Use the overload that passes action_call_id separately (avoids copying msg)
for (auto *service : this->parent_->get_user_services()) {
if (service->execute_service(msg, action_call_id)) {
found = true;
}
}
#else
for (auto *service : this->parent_->get_user_services()) {
if (service->execute_service(msg)) {
found = true;
}
}
#endif
if (!found) {
ESP_LOGV(TAG, "Could not find service");
}
// Note: For services with supports_response != none, the call is unregistered
// by an automatically appended APIUnregisterServiceCallAction at the end of
// the action list. This ensures async actions (delays, waits) complete first.
}
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
void APIConnection::send_execute_service_response(uint32_t call_id, bool success, const std::string &error_message) {
ExecuteServiceResponse resp;
resp.call_id = call_id;
resp.success = success;
resp.set_error_message(StringRef(error_message));
this->send_message(resp, ExecuteServiceResponse::MESSAGE_TYPE);
}
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES_JSON
void APIConnection::send_execute_service_response(uint32_t call_id, bool success, const std::string &error_message,
const uint8_t *response_data, size_t response_data_len) {
ExecuteServiceResponse resp;
resp.call_id = call_id;
resp.success = success;
resp.set_error_message(StringRef(error_message));
resp.response_data = response_data;
resp.response_data_len = response_data_len;
this->send_message(resp, ExecuteServiceResponse::MESSAGE_TYPE);
}
#endif // USE_API_USER_DEFINED_ACTION_RESPONSES_JSON
#endif // USE_API_USER_DEFINED_ACTION_RESPONSES
#endif
#ifdef USE_API_HOMEASSISTANT_ACTION_RESPONSES

View File

@@ -223,6 +223,13 @@ class APIConnection final : public APIServerConnection {
#endif
#ifdef USE_API_USER_DEFINED_ACTIONS
void execute_service(const ExecuteServiceRequest &msg) override;
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
void send_execute_service_response(uint32_t call_id, bool success, const std::string &error_message);
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES_JSON
void send_execute_service_response(uint32_t call_id, bool success, const std::string &error_message,
const uint8_t *response_data, size_t response_data_len);
#endif // USE_API_USER_DEFINED_ACTION_RESPONSES_JSON
#endif // USE_API_USER_DEFINED_ACTION_RESPONSES
#endif
#ifdef USE_API_NOISE
bool send_noise_encryption_set_key_response(const NoiseEncryptionSetKeyRequest &msg) override;

View File

@@ -1010,11 +1010,13 @@ void ListEntitiesServicesResponse::encode(ProtoWriteBuffer buffer) const {
for (auto &it : this->args) {
buffer.encode_message(3, it, true);
}
buffer.encode_uint32(4, static_cast<uint32_t>(this->supports_response));
}
void ListEntitiesServicesResponse::calculate_size(ProtoSize &size) const {
size.add_length(1, this->name_ref_.size());
size.add_fixed32(1, this->key);
size.add_repeated_message(1, this->args);
size.add_uint32(1, static_cast<uint32_t>(this->supports_response));
}
bool ExecuteServiceArgument::decode_varint(uint32_t field_id, ProtoVarInt value) {
switch (field_id) {
@@ -1075,6 +1077,23 @@ void ExecuteServiceArgument::decode(const uint8_t *buffer, size_t length) {
this->string_array.init(count_string_array);
ProtoDecodableMessage::decode(buffer, length);
}
bool ExecuteServiceRequest::decode_varint(uint32_t field_id, ProtoVarInt value) {
switch (field_id) {
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
case 3:
this->call_id = value.as_uint32();
break;
#endif
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
case 4:
this->return_response = value.as_bool();
break;
#endif
default:
return false;
}
return true;
}
bool ExecuteServiceRequest::decode_length(uint32_t field_id, ProtoLengthDelimited value) {
switch (field_id) {
case 2:
@@ -1102,6 +1121,24 @@ void ExecuteServiceRequest::decode(const uint8_t *buffer, size_t length) {
ProtoDecodableMessage::decode(buffer, length);
}
#endif
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
void ExecuteServiceResponse::encode(ProtoWriteBuffer buffer) const {
buffer.encode_uint32(1, this->call_id);
buffer.encode_bool(2, this->success);
buffer.encode_string(3, this->error_message_ref_);
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES_JSON
buffer.encode_bytes(4, this->response_data, this->response_data_len);
#endif
}
void ExecuteServiceResponse::calculate_size(ProtoSize &size) const {
size.add_uint32(1, this->call_id);
size.add_bool(1, this->success);
size.add_length(1, this->error_message_ref_.size());
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES_JSON
size.add_length(4, this->response_data_len);
#endif
}
#endif
#ifdef USE_CAMERA
void ListEntitiesCameraResponse::encode(ProtoWriteBuffer buffer) const {
buffer.encode_string(1, this->object_id_ref_);

View File

@@ -75,6 +75,12 @@ enum ServiceArgType : uint32_t {
SERVICE_ARG_TYPE_FLOAT_ARRAY = 6,
SERVICE_ARG_TYPE_STRING_ARRAY = 7,
};
enum SupportsResponseType : uint32_t {
SUPPORTS_RESPONSE_NONE = 0,
SUPPORTS_RESPONSE_OPTIONAL = 1,
SUPPORTS_RESPONSE_ONLY = 2,
SUPPORTS_RESPONSE_STATUS = 100,
};
#endif
#ifdef USE_CLIMATE
enum ClimateMode : uint32_t {
@@ -1257,7 +1263,7 @@ class ListEntitiesServicesArgument final : public ProtoMessage {
class ListEntitiesServicesResponse final : public ProtoMessage {
public:
static constexpr uint8_t MESSAGE_TYPE = 41;
static constexpr uint8_t ESTIMATED_SIZE = 48;
static constexpr uint8_t ESTIMATED_SIZE = 50;
#ifdef HAS_PROTO_MESSAGE_DUMP
const char *message_name() const override { return "list_entities_services_response"; }
#endif
@@ -1265,6 +1271,7 @@ class ListEntitiesServicesResponse final : public ProtoMessage {
void set_name(const StringRef &ref) { this->name_ref_ = ref; }
uint32_t key{0};
FixedVector<ListEntitiesServicesArgument> args{};
enums::SupportsResponseType supports_response{};
void encode(ProtoWriteBuffer buffer) const override;
void calculate_size(ProtoSize &size) const override;
#ifdef HAS_PROTO_MESSAGE_DUMP
@@ -1297,12 +1304,18 @@ class ExecuteServiceArgument final : public ProtoDecodableMessage {
class ExecuteServiceRequest final : public ProtoDecodableMessage {
public:
static constexpr uint8_t MESSAGE_TYPE = 42;
static constexpr uint8_t ESTIMATED_SIZE = 39;
static constexpr uint8_t ESTIMATED_SIZE = 45;
#ifdef HAS_PROTO_MESSAGE_DUMP
const char *message_name() const override { return "execute_service_request"; }
#endif
uint32_t key{0};
FixedVector<ExecuteServiceArgument> args{};
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
uint32_t call_id{0};
#endif
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
bool return_response{false};
#endif
void decode(const uint8_t *buffer, size_t length) override;
#ifdef HAS_PROTO_MESSAGE_DUMP
void dump_to(std::string &out) const override;
@@ -1311,6 +1324,32 @@ class ExecuteServiceRequest final : public ProtoDecodableMessage {
protected:
bool decode_32bit(uint32_t field_id, Proto32Bit value) override;
bool decode_length(uint32_t field_id, ProtoLengthDelimited value) override;
bool decode_varint(uint32_t field_id, ProtoVarInt value) override;
};
#endif
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
class ExecuteServiceResponse final : public ProtoMessage {
public:
static constexpr uint8_t MESSAGE_TYPE = 131;
static constexpr uint8_t ESTIMATED_SIZE = 34;
#ifdef HAS_PROTO_MESSAGE_DUMP
const char *message_name() const override { return "execute_service_response"; }
#endif
uint32_t call_id{0};
bool success{false};
StringRef error_message_ref_{};
void set_error_message(const StringRef &ref) { this->error_message_ref_ = ref; }
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES_JSON
const uint8_t *response_data{nullptr};
uint16_t response_data_len{0};
#endif
void encode(ProtoWriteBuffer buffer) const override;
void calculate_size(ProtoSize &size) const override;
#ifdef HAS_PROTO_MESSAGE_DUMP
void dump_to(std::string &out) const override;
#endif
protected:
};
#endif
#ifdef USE_CAMERA

View File

@@ -231,6 +231,20 @@ template<> const char *proto_enum_to_string<enums::ServiceArgType>(enums::Servic
return "UNKNOWN";
}
}
template<> const char *proto_enum_to_string<enums::SupportsResponseType>(enums::SupportsResponseType value) {
switch (value) {
case enums::SUPPORTS_RESPONSE_NONE:
return "SUPPORTS_RESPONSE_NONE";
case enums::SUPPORTS_RESPONSE_OPTIONAL:
return "SUPPORTS_RESPONSE_OPTIONAL";
case enums::SUPPORTS_RESPONSE_ONLY:
return "SUPPORTS_RESPONSE_ONLY";
case enums::SUPPORTS_RESPONSE_STATUS:
return "SUPPORTS_RESPONSE_STATUS";
default:
return "UNKNOWN";
}
}
#endif
#ifdef USE_CLIMATE
template<> const char *proto_enum_to_string<enums::ClimateMode>(enums::ClimateMode value) {
@@ -1194,6 +1208,7 @@ void ListEntitiesServicesResponse::dump_to(std::string &out) const {
it.dump_to(out);
out.append("\n");
}
dump_field(out, "supports_response", static_cast<enums::SupportsResponseType>(this->supports_response));
}
void ExecuteServiceArgument::dump_to(std::string &out) const {
MessageDumpHelper helper(out, "ExecuteServiceArgument");
@@ -1223,6 +1238,25 @@ void ExecuteServiceRequest::dump_to(std::string &out) const {
it.dump_to(out);
out.append("\n");
}
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
dump_field(out, "call_id", this->call_id);
#endif
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
dump_field(out, "return_response", this->return_response);
#endif
}
#endif
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
void ExecuteServiceResponse::dump_to(std::string &out) const {
MessageDumpHelper helper(out, "ExecuteServiceResponse");
dump_field(out, "call_id", this->call_id);
dump_field(out, "success", this->success);
dump_field(out, "error_message", this->error_message_ref_);
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES_JSON
out.append(" response_data: ");
out.append(format_hex_pretty(this->response_data, this->response_data_len));
out.append("\n");
#endif
}
#endif
#ifdef USE_CAMERA

View File

@@ -4,8 +4,8 @@
#include "api_connection.h"
#include "esphome/components/network/util.h"
#include "esphome/core/application.h"
#include "esphome/core/defines.h"
#include "esphome/core/controller_registry.h"
#include "esphome/core/defines.h"
#include "esphome/core/hal.h"
#include "esphome/core/log.h"
#include "esphome/core/util.h"
@@ -186,6 +186,9 @@ void APIServer::loop() {
// Rare case: handle disconnection
#ifdef USE_API_CLIENT_DISCONNECTED_TRIGGER
this->client_disconnected_trigger_->trigger(client->client_info_.name, client->client_info_.peername);
#endif
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
this->unregister_active_action_calls_for_connection(client.get());
#endif
ESP_LOGV(TAG, "Remove connection %s", client->client_info_.name.c_str());
@@ -585,5 +588,84 @@ bool APIServer::teardown() {
return this->clients_.empty();
}
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
// Timeout for action calls - matches aioesphomeapi client timeout (default 30s)
// Can be overridden via USE_API_ACTION_CALL_TIMEOUT_MS define for testing
#ifndef USE_API_ACTION_CALL_TIMEOUT_MS
#define USE_API_ACTION_CALL_TIMEOUT_MS 30000 // NOLINT
#endif
uint32_t APIServer::register_active_action_call(uint32_t client_call_id, APIConnection *conn) {
uint32_t action_call_id = this->next_action_call_id_++;
// Handle wraparound (skip 0 as it means "no call")
if (this->next_action_call_id_ == 0) {
this->next_action_call_id_ = 1;
}
this->active_action_calls_.push_back({action_call_id, client_call_id, conn});
// Schedule automatic cleanup after timeout (client will have given up by then)
this->set_timeout(str_sprintf("action_call_%u", action_call_id), USE_API_ACTION_CALL_TIMEOUT_MS,
[this, action_call_id]() {
ESP_LOGD(TAG, "Action call %u timed out", action_call_id);
this->unregister_active_action_call(action_call_id);
});
return action_call_id;
}
void APIServer::unregister_active_action_call(uint32_t action_call_id) {
// Cancel the timeout for this action call
this->cancel_timeout(str_sprintf("action_call_%u", action_call_id));
// Swap-and-pop is more efficient than remove_if for unordered vectors
for (size_t i = 0; i < this->active_action_calls_.size(); i++) {
if (this->active_action_calls_[i].action_call_id == action_call_id) {
std::swap(this->active_action_calls_[i], this->active_action_calls_.back());
this->active_action_calls_.pop_back();
return;
}
}
}
void APIServer::unregister_active_action_calls_for_connection(APIConnection *conn) {
// Remove all active action calls for disconnected connection using swap-and-pop
for (size_t i = 0; i < this->active_action_calls_.size();) {
if (this->active_action_calls_[i].connection == conn) {
// Cancel the timeout for this action call
this->cancel_timeout(str_sprintf("action_call_%u", this->active_action_calls_[i].action_call_id));
std::swap(this->active_action_calls_[i], this->active_action_calls_.back());
this->active_action_calls_.pop_back();
// Don't increment i - need to check the swapped element
} else {
i++;
}
}
}
void APIServer::send_action_response(uint32_t action_call_id, bool success, const std::string &error_message) {
for (auto &call : this->active_action_calls_) {
if (call.action_call_id == action_call_id) {
call.connection->send_execute_service_response(call.client_call_id, success, error_message);
return;
}
}
ESP_LOGW(TAG, "Cannot send response: no active call found for action_call_id %u", action_call_id);
}
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES_JSON
void APIServer::send_action_response(uint32_t action_call_id, bool success, const std::string &error_message,
const uint8_t *response_data, size_t response_data_len) {
for (auto &call : this->active_action_calls_) {
if (call.action_call_id == action_call_id) {
call.connection->send_execute_service_response(call.client_call_id, success, error_message, response_data,
response_data_len);
return;
}
}
ESP_LOGW(TAG, "Cannot send response: no active call found for action_call_id %u", action_call_id);
}
#endif // USE_API_USER_DEFINED_ACTION_RESPONSES_JSON
#endif // USE_API_USER_DEFINED_ACTION_RESPONSES
} // namespace esphome::api
#endif

View File

@@ -12,9 +12,6 @@
#include "esphome/core/log.h"
#include "list_entities.h"
#include "subscribe_state.h"
#ifdef USE_API_USER_DEFINED_ACTIONS
#include "user_services.h"
#endif
#ifdef USE_LOGGER
#include "esphome/components/logger/logger.h"
#endif
@@ -22,11 +19,15 @@
#include "esphome/components/camera/camera.h"
#endif
#include <map>
#include <vector>
namespace esphome::api {
#ifdef USE_API_USER_DEFINED_ACTIONS
// Forward declaration - full definition in user_services.h
class UserServiceDescriptor;
#endif
#ifdef USE_API_NOISE
struct SavedNoisePsk {
psk_t psk;
@@ -154,6 +155,19 @@ class APIServer : public Component,
// Only compile push_back method when custom_services: true (external components)
void register_user_service(UserServiceDescriptor *descriptor) { this->user_services_.push_back(descriptor); }
#endif
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
// Action call context management - supports concurrent calls from multiple clients
// Returns server-generated action_call_id to avoid collisions when clients use same call_id
uint32_t register_active_action_call(uint32_t client_call_id, APIConnection *conn);
void unregister_active_action_call(uint32_t action_call_id);
void unregister_active_action_calls_for_connection(APIConnection *conn);
// Send response for a specific action call (uses action_call_id, sends client_call_id in response)
void send_action_response(uint32_t action_call_id, bool success, const std::string &error_message);
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES_JSON
void send_action_response(uint32_t action_call_id, bool success, const std::string &error_message,
const uint8_t *response_data, size_t response_data_len);
#endif // USE_API_USER_DEFINED_ACTION_RESPONSES_JSON
#endif // USE_API_USER_DEFINED_ACTION_RESPONSES
#endif
#ifdef USE_HOMEASSISTANT_TIME
void request_time();
@@ -230,6 +244,17 @@ class APIServer : public Component,
#endif
#ifdef USE_API_USER_DEFINED_ACTIONS
std::vector<UserServiceDescriptor *> user_services_;
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
// Active action calls - supports concurrent calls from multiple clients
// Uses server-generated action_call_id to avoid collisions when multiple clients use same call_id
struct ActiveActionCall {
uint32_t action_call_id; // Server-generated unique ID (passed to actions)
uint32_t client_call_id; // Client's original call_id (used in response)
APIConnection *connection;
};
std::vector<ActiveActionCall> active_action_calls_;
uint32_t next_action_call_id_{1}; // Counter for generating unique action_call_ids
#endif // USE_API_USER_DEFINED_ACTION_RESPONSES
#endif
#ifdef USE_API_HOMEASSISTANT_ACTION_RESPONSES
struct PendingActionResponse {

View File

@@ -16,7 +16,10 @@ template<typename T, typename... Ts> class CustomAPIDeviceService : public UserS
: UserServiceDynamic<Ts...>(name, arg_names), obj_(obj), callback_(callback) {}
protected:
void execute(Ts... x) override { (this->obj_->*this->callback_)(x...); } // NOLINT
// CustomAPIDevice services don't support action responses - ignore call_id and return_response
void execute(uint32_t /*call_id*/, bool /*return_response*/, Ts... x) override {
(this->obj_->*this->callback_)(x...); // NOLINT
}
T *obj_;
void (T::*callback_)(Ts...);

View File

@@ -5,6 +5,9 @@
#include "esphome/core/application.h"
#include "esphome/core/log.h"
#include "esphome/core/util.h"
#ifdef USE_API_USER_DEFINED_ACTIONS
#include "user_services.h"
#endif
namespace esphome::api {

View File

@@ -1,20 +1,31 @@
#pragma once
#include <tuple>
#include <utility>
#include <vector>
#include "esphome/core/component.h"
#include "esphome/core/automation.h"
#include "api_pb2.h"
#include "esphome/core/automation.h"
#include "esphome/core/component.h"
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES_JSON
#include "esphome/components/json/json_util.h"
#endif
#ifdef USE_API_USER_DEFINED_ACTIONS
namespace esphome::api {
// Forward declaration - full definition in api_server.h
class APIServer;
class UserServiceDescriptor {
public:
virtual ListEntitiesServicesResponse encode_list_service_response() = 0;
virtual bool execute_service(const ExecuteServiceRequest &req) = 0;
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
// Overload that accepts server-generated action_call_id (avoids client call_id collisions)
virtual bool execute_service(const ExecuteServiceRequest &req, uint32_t action_call_id) = 0;
#endif
bool is_internal() { return false; }
};
@@ -27,8 +38,9 @@ template<typename T> enums::ServiceArgType to_service_arg_type();
// Stores only pointers to string literals in flash - no heap allocation
template<typename... Ts> class UserServiceBase : public UserServiceDescriptor {
public:
UserServiceBase(const char *name, const std::array<const char *, sizeof...(Ts)> &arg_names)
: name_(name), arg_names_(arg_names) {
UserServiceBase(const char *name, const std::array<const char *, sizeof...(Ts)> &arg_names,
enums::SupportsResponseType supports_response = enums::SUPPORTS_RESPONSE_NONE)
: name_(name), arg_names_(arg_names), supports_response_(supports_response) {
this->key_ = fnv1_hash(name);
}
@@ -36,6 +48,7 @@ template<typename... Ts> class UserServiceBase : public UserServiceDescriptor {
ListEntitiesServicesResponse msg;
msg.set_name(StringRef(this->name_));
msg.key = this->key_;
msg.supports_response = this->supports_response_;
std::array<enums::ServiceArgType, sizeof...(Ts)> arg_types = {to_service_arg_type<Ts>()...};
msg.args.init(sizeof...(Ts));
for (size_t i = 0; i < sizeof...(Ts); i++) {
@@ -51,21 +64,37 @@ template<typename... Ts> class UserServiceBase : public UserServiceDescriptor {
return false;
if (req.args.size() != sizeof...(Ts))
return false;
this->execute_(req.args, std::make_index_sequence<sizeof...(Ts)>{});
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
this->execute_(req.args, req.call_id, req.return_response, std::make_index_sequence<sizeof...(Ts)>{});
#else
this->execute_(req.args, 0, false, std::make_index_sequence<sizeof...(Ts)>{});
#endif
return true;
}
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
bool execute_service(const ExecuteServiceRequest &req, uint32_t action_call_id) override {
if (req.key != this->key_)
return false;
if (req.args.size() != sizeof...(Ts))
return false;
this->execute_(req.args, action_call_id, req.return_response, std::make_index_sequence<sizeof...(Ts)>{});
return true;
}
#endif
protected:
virtual void execute(Ts... x) = 0;
virtual void execute(uint32_t call_id, bool return_response, Ts... x) = 0;
template<typename ArgsContainer, size_t... S>
void execute_(const ArgsContainer &args, std::index_sequence<S...> type) {
this->execute((get_execute_arg_value<Ts>(args[S]))...);
void execute_(const ArgsContainer &args, uint32_t call_id, bool return_response, std::index_sequence<S...> /*type*/) {
this->execute(call_id, return_response, (get_execute_arg_value<Ts>(args[S]))...);
}
// Pointers to string literals in flash - no heap allocation
const char *name_;
std::array<const char *, sizeof...(Ts)> arg_names_;
uint32_t key_{0};
enums::SupportsResponseType supports_response_{enums::SUPPORTS_RESPONSE_NONE};
};
// Separate class for custom_api_device services (rare case)
@@ -81,6 +110,7 @@ template<typename... Ts> class UserServiceDynamic : public UserServiceDescriptor
ListEntitiesServicesResponse msg;
msg.set_name(StringRef(this->name_));
msg.key = this->key_;
msg.supports_response = enums::SUPPORTS_RESPONSE_NONE; // Dynamic services don't support responses yet
std::array<enums::ServiceArgType, sizeof...(Ts)> arg_types = {to_service_arg_type<Ts>()...};
msg.args.init(sizeof...(Ts));
for (size_t i = 0; i < sizeof...(Ts); i++) {
@@ -96,15 +126,31 @@ template<typename... Ts> class UserServiceDynamic : public UserServiceDescriptor
return false;
if (req.args.size() != sizeof...(Ts))
return false;
this->execute_(req.args, std::make_index_sequence<sizeof...(Ts)>{});
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
this->execute_(req.args, req.call_id, req.return_response, std::make_index_sequence<sizeof...(Ts)>{});
#else
this->execute_(req.args, 0, false, std::make_index_sequence<sizeof...(Ts)>{});
#endif
return true;
}
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
// Dynamic services don't support responses yet, but need to implement the interface
bool execute_service(const ExecuteServiceRequest &req, uint32_t action_call_id) override {
if (req.key != this->key_)
return false;
if (req.args.size() != sizeof...(Ts))
return false;
this->execute_(req.args, action_call_id, req.return_response, std::make_index_sequence<sizeof...(Ts)>{});
return true;
}
#endif
protected:
virtual void execute(Ts... x) = 0;
virtual void execute(uint32_t call_id, bool return_response, Ts... x) = 0;
template<typename ArgsContainer, size_t... S>
void execute_(const ArgsContainer &args, std::index_sequence<S...> type) {
this->execute((get_execute_arg_value<Ts>(args[S]))...);
void execute_(const ArgsContainer &args, uint32_t call_id, bool return_response, std::index_sequence<S...> /*type*/) {
this->execute(call_id, return_response, (get_execute_arg_value<Ts>(args[S]))...);
}
// Heap-allocated strings for runtime-generated names
@@ -113,15 +159,149 @@ template<typename... Ts> class UserServiceDynamic : public UserServiceDescriptor
uint32_t key_{0};
};
template<typename... Ts> class UserServiceTrigger : public UserServiceBase<Ts...>, public Trigger<Ts...> {
// Primary template declaration
template<enums::SupportsResponseType Mode, typename... Ts> class UserServiceTrigger;
// Specialization for NONE - no extra trigger arguments
template<typename... Ts>
class UserServiceTrigger<enums::SUPPORTS_RESPONSE_NONE, Ts...> : public UserServiceBase<Ts...>, public Trigger<Ts...> {
public:
// Constructor for static names (YAML-defined services - used by code generator)
UserServiceTrigger(const char *name, const std::array<const char *, sizeof...(Ts)> &arg_names)
: UserServiceBase<Ts...>(name, arg_names) {}
: UserServiceBase<Ts...>(name, arg_names, enums::SUPPORTS_RESPONSE_NONE) {}
protected:
void execute(Ts... x) override { this->trigger(x...); } // NOLINT
void execute(uint32_t /*call_id*/, bool /*return_response*/, Ts... x) override { this->trigger(x...); }
};
// Specialization for OPTIONAL - call_id and return_response trigger arguments
template<typename... Ts>
class UserServiceTrigger<enums::SUPPORTS_RESPONSE_OPTIONAL, Ts...> : public UserServiceBase<Ts...>,
public Trigger<uint32_t, bool, Ts...> {
public:
UserServiceTrigger(const char *name, const std::array<const char *, sizeof...(Ts)> &arg_names)
: UserServiceBase<Ts...>(name, arg_names, enums::SUPPORTS_RESPONSE_OPTIONAL) {}
protected:
void execute(uint32_t call_id, bool return_response, Ts... x) override {
this->trigger(call_id, return_response, x...);
}
};
// Specialization for ONLY - just call_id trigger argument
template<typename... Ts>
class UserServiceTrigger<enums::SUPPORTS_RESPONSE_ONLY, Ts...> : public UserServiceBase<Ts...>,
public Trigger<uint32_t, Ts...> {
public:
UserServiceTrigger(const char *name, const std::array<const char *, sizeof...(Ts)> &arg_names)
: UserServiceBase<Ts...>(name, arg_names, enums::SUPPORTS_RESPONSE_ONLY) {}
protected:
void execute(uint32_t call_id, bool /*return_response*/, Ts... x) override { this->trigger(call_id, x...); }
};
// Specialization for STATUS - just call_id trigger argument (reports success/error without data)
template<typename... Ts>
class UserServiceTrigger<enums::SUPPORTS_RESPONSE_STATUS, Ts...> : public UserServiceBase<Ts...>,
public Trigger<uint32_t, Ts...> {
public:
UserServiceTrigger(const char *name, const std::array<const char *, sizeof...(Ts)> &arg_names)
: UserServiceBase<Ts...>(name, arg_names, enums::SUPPORTS_RESPONSE_STATUS) {}
protected:
void execute(uint32_t call_id, bool /*return_response*/, Ts... x) override { this->trigger(call_id, x...); }
};
} // namespace esphome::api
#endif // USE_API_USER_DEFINED_ACTIONS
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES
// Include full definition of APIServer for template implementation
// Must be outside namespace to avoid including STL headers inside namespace
#include "api_server.h"
namespace esphome::api {
template<typename... Ts> class APIRespondAction : public Action<Ts...> {
public:
explicit APIRespondAction(APIServer *parent) : parent_(parent) {}
template<typename V> void set_success(V success) { this->success_ = success; }
template<typename V> void set_error_message(V error) { this->error_message_ = error; }
void set_is_optional_mode(bool is_optional) { this->is_optional_mode_ = is_optional; }
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES_JSON
void set_data(std::function<void(Ts..., JsonObject)> func) {
this->json_builder_ = std::move(func);
this->has_data_ = true;
}
#endif
void play(const Ts &...x) override {
// Extract call_id from first argument - it's always first for optional/only/status modes
auto args = std::make_tuple(x...);
uint32_t call_id = std::get<0>(args);
bool success = this->success_.value(x...);
std::string error_message = this->error_message_.value(x...);
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES_JSON
if (this->has_data_) {
// For optional mode, check return_response (second arg) to decide if client wants data
// Use nested if constexpr to avoid compile error when tuple doesn't have enough elements
// (std::tuple_element_t is evaluated before the && short-circuit, so we must nest)
if constexpr (sizeof...(Ts) >= 2) {
if constexpr (std::is_same_v<std::tuple_element_t<1, std::tuple<Ts...>>, bool>) {
if (this->is_optional_mode_) {
bool return_response = std::get<1>(args);
if (!return_response) {
// Client doesn't want response data, just send success/error
this->parent_->send_action_response(call_id, success, error_message);
return;
}
}
}
}
// Build and send JSON response
json::JsonBuilder builder;
this->json_builder_(x..., builder.root());
std::string json_str = builder.serialize();
this->parent_->send_action_response(call_id, success, error_message,
reinterpret_cast<const uint8_t *>(json_str.data()), json_str.size());
return;
}
#endif
this->parent_->send_action_response(call_id, success, error_message);
}
protected:
APIServer *parent_;
TemplatableValue<bool, Ts...> success_{true};
TemplatableValue<std::string, Ts...> error_message_{""};
#ifdef USE_API_USER_DEFINED_ACTION_RESPONSES_JSON
std::function<void(Ts..., JsonObject)> json_builder_;
bool has_data_{false};
#endif
bool is_optional_mode_{false};
};
// Action to unregister a service call after execution completes
// Automatically appended to the end of action lists for non-none response modes
template<typename... Ts> class APIUnregisterServiceCallAction : public Action<Ts...> {
public:
explicit APIUnregisterServiceCallAction(APIServer *parent) : parent_(parent) {}
void play(const Ts &...x) override {
// Extract call_id from first argument - same convention as APIRespondAction
auto args = std::make_tuple(x...);
uint32_t call_id = std::get<0>(args);
if (call_id != 0) {
this->parent_->unregister_active_action_call(call_id);
}
}
protected:
APIServer *parent_;
};
} // namespace esphome::api
#endif // USE_API_USER_DEFINED_ACTION_RESPONSES

View File

@@ -129,6 +129,8 @@
#define USE_API_PLAINTEXT
#define USE_API_USER_DEFINED_ACTIONS
#define USE_API_CUSTOM_SERVICES
#define USE_API_USER_DEFINED_ACTION_RESPONSES
#define USE_API_USER_DEFINED_ACTION_RESPONSES_JSON
#define API_MAX_SEND_QUEUE 8
#define USE_MD5
#define USE_SHA256

View File

@@ -12,7 +12,7 @@ platformio==6.1.18 # When updating platformio, also update /docker/Dockerfile
esptool==5.1.0
click==8.1.7
esphome-dashboard==20251013.0
aioesphomeapi==42.10.0
aioesphomeapi==43.0.0
zeroconf==0.148.0
puremagic==1.30
ruamel.yaml==0.18.16 # dashboard_import

View File

@@ -181,6 +181,99 @@ api:
else:
- logger.log: "Skipped loops"
- logger.log: "After combined test"
# ==========================================================================
# supports_response: status (auto-detected - api.respond without data)
# Has call_id only - reports success/error without data payload
# ==========================================================================
- action: test_respond_status
then:
- api.respond:
success: true
- logger.log:
format: "Status response sent (call_id=%d)"
args: [call_id]
- action: test_respond_status_error
variables:
error_msg: string
then:
- api.respond:
success: false
error_message: !lambda 'return error_msg;'
# ==========================================================================
# supports_response: optional (auto-detected - api.respond with data)
# Has call_id and return_response - client decides if it wants response
# ==========================================================================
- action: test_respond_optional
variables:
sensor_name: string
value: float
then:
- logger.log:
format: "Optional response (call_id=%d, return_response=%d)"
args: [call_id, return_response]
- api.respond:
data: !lambda |-
root["sensor"] = sensor_name;
root["value"] = value;
root["unit"] = "°C";
- action: test_respond_optional_conditional
variables:
do_succeed: bool
then:
- if:
condition:
lambda: 'return do_succeed;'
then:
- api.respond:
success: true
data: !lambda |-
root["status"] = "ok";
else:
- api.respond:
success: false
error_message: "Operation failed"
# ==========================================================================
# supports_response: only (explicit - always expects data response)
# Has call_id only - response is always expected with data
# ==========================================================================
- action: test_respond_only
supports_response: only
variables:
input: string
then:
- logger.log:
format: "Only response (call_id=%d)"
args: [call_id]
- api.respond:
data: !lambda |-
root["input"] = input;
root["processed"] = true;
- action: test_respond_only_nested
supports_response: only
then:
- api.respond:
data: !lambda |-
root["config"]["wifi"] = "connected";
root["config"]["api"] = true;
root["items"][0] = "item1";
root["items"][1] = "item2";
# ==========================================================================
# supports_response: none (no api.respond action)
# No call_id or return_response - just user variables
# ==========================================================================
- action: test_no_response
variables:
message: string
then:
- logger.log:
format: "No response action: %s"
args: [message.c_str()]
event:
- platform: template

View File

@@ -252,7 +252,7 @@ my_service = next((s for s in services if s.name == "my_service"), None)
assert my_service is not None
# Execute with parameters
client.execute_service(my_service, {"param1": "value1", "param2": 42})
await client.execute_service(my_service, {"param1": "value1", "param2": 42})
```
##### Multiple Entity Tracking

View File

@@ -0,0 +1,93 @@
esphome:
name: api-action-responses-test
host:
logger:
level: DEBUG
api:
actions:
# ==========================================================================
# supports_response: none (default - no api.respond action)
# No call_id or return_response - just user variables
# ==========================================================================
- action: action_no_response
variables:
message: string
then:
- logger.log:
format: "ACTION_NO_RESPONSE called with: %s"
args: [message.c_str()]
# ==========================================================================
# supports_response: status (auto-detected - api.respond without data)
# Has call_id only - reports success/error without data payload
# ==========================================================================
- action: action_status_response
variables:
should_succeed: bool
then:
- if:
condition:
lambda: 'return should_succeed;'
then:
- api.respond:
success: true
- logger.log:
format: "ACTION_STATUS_RESPONSE success (call_id=%d)"
args: [call_id]
else:
- api.respond:
success: false
error_message: "Intentional failure for testing"
- logger.log:
format: "ACTION_STATUS_RESPONSE error (call_id=%d)"
args: [call_id]
# ==========================================================================
# supports_response: optional (auto-detected - api.respond with data)
# Has call_id and return_response - client decides if it wants response
# ==========================================================================
- action: action_optional_response
variables:
value: int
then:
- logger.log:
format: "ACTION_OPTIONAL_RESPONSE (call_id=%d, return_response=%d, value=%d)"
args: [call_id, return_response, value]
- api.respond:
data: !lambda |-
root["input"] = value;
root["doubled"] = value * 2;
# ==========================================================================
# supports_response: only (explicit - always expects data response)
# Has call_id only - response is always expected with data
# ==========================================================================
- action: action_only_response
supports_response: only
variables:
name: string
then:
- logger.log:
format: "ACTION_ONLY_RESPONSE (call_id=%d, name=%s)"
args: [call_id, name.c_str()]
- api.respond:
data: !lambda |-
root["greeting"] = "Hello, " + name + "!";
root["length"] = name.length();
# Test action with nested JSON response
- action: action_nested_json
supports_response: only
then:
- logger.log:
format: "ACTION_NESTED_JSON (call_id=%d)"
args: [call_id]
- api.respond:
data: !lambda |-
root["config"]["wifi"]["connected"] = true;
root["config"]["api"]["port"] = 6053;
root["items"][0] = "first";
root["items"][1] = "second";

View File

@@ -0,0 +1,45 @@
esphome:
name: api-action-timeout-test
# Use a short timeout for testing (500ms instead of 30s)
platformio_options:
build_flags:
- "-DUSE_API_ACTION_CALL_TIMEOUT_MS=500"
host:
logger:
level: DEBUG
api:
actions:
# Action that responds immediately - should work fine
- action: action_immediate
supports_response: only
then:
- logger.log: "ACTION_IMMEDIATE responding"
- api.respond:
data: !lambda |-
root["status"] = "immediate";
# Action that delays 200ms before responding - should work (within 500ms timeout)
- action: action_short_delay
supports_response: only
then:
- logger.log: "ACTION_SHORT_DELAY starting"
- delay: 200ms
- logger.log: "ACTION_SHORT_DELAY responding"
- api.respond:
data: !lambda |-
root["status"] = "short_delay";
# Action that delays 1s before responding - should fail (exceeds 500ms timeout)
# The api.respond will log a warning because the action call was already cleaned up
- action: action_long_delay
supports_response: only
then:
- logger.log: "ACTION_LONG_DELAY starting"
- delay: 1s
- logger.log: "ACTION_LONG_DELAY responding (after timeout)"
- api.respond:
data: !lambda |-
root["status"] = "long_delay";

View File

@@ -0,0 +1,258 @@
"""Integration test for API action responses feature.
Tests the supports_response modes: none, status, optional, only.
"""
from __future__ import annotations
import asyncio
import json
import re
from aioesphomeapi import SupportsResponseType, UserService, UserServiceArgType
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
@pytest.mark.asyncio
async def test_api_action_responses(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test API action response modes work correctly."""
loop = asyncio.get_running_loop()
# Track log messages for each action type
no_response_future = loop.create_future()
status_success_future = loop.create_future()
status_error_future = loop.create_future()
optional_response_future = loop.create_future()
only_response_future = loop.create_future()
nested_json_future = loop.create_future()
# Patterns to match in logs
no_response_pattern = re.compile(r"ACTION_NO_RESPONSE called with: test_message")
status_success_pattern = re.compile(
r"ACTION_STATUS_RESPONSE success \(call_id=\d+\)"
)
status_error_pattern = re.compile(r"ACTION_STATUS_RESPONSE error \(call_id=\d+\)")
optional_response_pattern = re.compile(
r"ACTION_OPTIONAL_RESPONSE \(call_id=\d+, return_response=\d+, value=42\)"
)
only_response_pattern = re.compile(
r"ACTION_ONLY_RESPONSE \(call_id=\d+, name=World\)"
)
nested_json_pattern = re.compile(r"ACTION_NESTED_JSON \(call_id=\d+\)")
def check_output(line: str) -> None:
"""Check log output for expected messages."""
if not no_response_future.done() and no_response_pattern.search(line):
no_response_future.set_result(True)
elif not status_success_future.done() and status_success_pattern.search(line):
status_success_future.set_result(True)
elif not status_error_future.done() and status_error_pattern.search(line):
status_error_future.set_result(True)
elif not optional_response_future.done() and optional_response_pattern.search(
line
):
optional_response_future.set_result(True)
elif not only_response_future.done() and only_response_pattern.search(line):
only_response_future.set_result(True)
elif not nested_json_future.done() and nested_json_pattern.search(line):
nested_json_future.set_result(True)
# Run with log monitoring
async with (
run_compiled(yaml_config, line_callback=check_output),
api_client_connected() as client,
):
# Verify device info
device_info = await client.device_info()
assert device_info is not None
assert device_info.name == "api-action-responses-test"
# List services
_, services = await client.list_entities_services()
# Should have 5 services
assert len(services) == 5, f"Expected 5 services, found {len(services)}"
# Find our services
action_no_response: UserService | None = None
action_status_response: UserService | None = None
action_optional_response: UserService | None = None
action_only_response: UserService | None = None
action_nested_json: UserService | None = None
for service in services:
if service.name == "action_no_response":
action_no_response = service
elif service.name == "action_status_response":
action_status_response = service
elif service.name == "action_optional_response":
action_optional_response = service
elif service.name == "action_only_response":
action_only_response = service
elif service.name == "action_nested_json":
action_nested_json = service
assert action_no_response is not None, "action_no_response not found"
assert action_status_response is not None, "action_status_response not found"
assert action_optional_response is not None, (
"action_optional_response not found"
)
assert action_only_response is not None, "action_only_response not found"
assert action_nested_json is not None, "action_nested_json not found"
# Verify supports_response modes
assert action_no_response.supports_response is None or (
action_no_response.supports_response == SupportsResponseType.NONE
), (
f"action_no_response should have supports_response=NONE, got {action_no_response.supports_response}"
)
assert (
action_status_response.supports_response == SupportsResponseType.STATUS
), (
f"action_status_response should have supports_response=STATUS, "
f"got {action_status_response.supports_response}"
)
assert (
action_optional_response.supports_response == SupportsResponseType.OPTIONAL
), (
f"action_optional_response should have supports_response=OPTIONAL, "
f"got {action_optional_response.supports_response}"
)
assert action_only_response.supports_response == SupportsResponseType.ONLY, (
f"action_only_response should have supports_response=ONLY, "
f"got {action_only_response.supports_response}"
)
assert action_nested_json.supports_response == SupportsResponseType.ONLY, (
f"action_nested_json should have supports_response=ONLY, "
f"got {action_nested_json.supports_response}"
)
# Verify argument types
# action_no_response: string message
assert len(action_no_response.args) == 1
assert action_no_response.args[0].name == "message"
assert action_no_response.args[0].type == UserServiceArgType.STRING
# action_status_response: bool should_succeed
assert len(action_status_response.args) == 1
assert action_status_response.args[0].name == "should_succeed"
assert action_status_response.args[0].type == UserServiceArgType.BOOL
# action_optional_response: int value
assert len(action_optional_response.args) == 1
assert action_optional_response.args[0].name == "value"
assert action_optional_response.args[0].type == UserServiceArgType.INT
# action_only_response: string name
assert len(action_only_response.args) == 1
assert action_only_response.args[0].name == "name"
assert action_only_response.args[0].type == UserServiceArgType.STRING
# action_nested_json: no args
assert len(action_nested_json.args) == 0
# Test action_no_response (supports_response: none)
# No response expected for this action
response = await client.execute_service(
action_no_response, {"message": "test_message"}
)
assert response is None, "action_no_response should not return a response"
await asyncio.wait_for(no_response_future, timeout=5.0)
# Test action_status_response with success (supports_response: status)
response = await client.execute_service(
action_status_response,
{"should_succeed": True},
return_response=True,
)
await asyncio.wait_for(status_success_future, timeout=5.0)
assert response is not None, "Expected response for status action"
assert response.success is True, (
f"Expected success=True, got {response.success}"
)
assert response.error_message == "", (
f"Expected empty error_message, got '{response.error_message}'"
)
# Test action_status_response with error
response = await client.execute_service(
action_status_response,
{"should_succeed": False},
return_response=True,
)
await asyncio.wait_for(status_error_future, timeout=5.0)
assert response is not None, "Expected response for status action"
assert response.success is False, (
f"Expected success=False, got {response.success}"
)
assert "Intentional failure" in response.error_message, (
f"Expected error message containing 'Intentional failure', "
f"got '{response.error_message}'"
)
# Test action_optional_response (supports_response: optional)
response = await client.execute_service(
action_optional_response,
{"value": 42},
return_response=True,
)
await asyncio.wait_for(optional_response_future, timeout=5.0)
assert response is not None, "Expected response for optional action"
assert response.success is True, (
f"Expected success=True, got {response.success}"
)
# Parse response data as JSON
response_json = json.loads(response.response_data.decode("utf-8"))
assert response_json["input"] == 42, (
f"Expected input=42, got {response_json.get('input')}"
)
assert response_json["doubled"] == 84, (
f"Expected doubled=84, got {response_json.get('doubled')}"
)
# Test action_only_response (supports_response: only)
response = await client.execute_service(
action_only_response,
{"name": "World"},
return_response=True,
)
await asyncio.wait_for(only_response_future, timeout=5.0)
assert response is not None, "Expected response for only action"
assert response.success is True, (
f"Expected success=True, got {response.success}"
)
response_json = json.loads(response.response_data.decode("utf-8"))
assert response_json["greeting"] == "Hello, World!", (
f"Expected greeting='Hello, World!', got {response_json.get('greeting')}"
)
assert response_json["length"] == 5, (
f"Expected length=5, got {response_json.get('length')}"
)
# Test action_nested_json
response = await client.execute_service(
action_nested_json,
{},
return_response=True,
)
await asyncio.wait_for(nested_json_future, timeout=5.0)
assert response is not None, "Expected response for nested json action"
assert response.success is True, (
f"Expected success=True, got {response.success}"
)
response_json = json.loads(response.response_data.decode("utf-8"))
# Verify nested structure
assert response_json["config"]["wifi"]["connected"] is True
assert response_json["config"]["api"]["port"] == 6053
assert response_json["items"][0] == "first"
assert response_json["items"][1] == "second"

View File

@@ -0,0 +1,172 @@
"""Integration test for API action call timeout functionality.
Tests that action calls are automatically cleaned up after timeout,
and that late responses are handled gracefully.
"""
from __future__ import annotations
import asyncio
import contextlib
import re
from aioesphomeapi import UserService
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
@pytest.mark.asyncio
async def test_api_action_timeout(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test API action call timeout behavior.
This test uses a 500ms timeout (set via USE_API_ACTION_CALL_TIMEOUT_MS define)
to verify:
1. Actions that respond within the timeout work correctly
2. Actions that exceed the timeout have their calls cleaned up
3. Late responses log a warning but don't crash
"""
loop = asyncio.get_running_loop()
# Track log messages
immediate_future = loop.create_future()
short_delay_responding_future = loop.create_future()
long_delay_starting_future = loop.create_future()
long_delay_responding_future = loop.create_future()
timeout_warning_future = loop.create_future()
# Patterns to match in logs
immediate_pattern = re.compile(r"ACTION_IMMEDIATE responding")
short_delay_responding_pattern = re.compile(r"ACTION_SHORT_DELAY responding")
long_delay_starting_pattern = re.compile(r"ACTION_LONG_DELAY starting")
long_delay_responding_pattern = re.compile(
r"ACTION_LONG_DELAY responding \(after timeout\)"
)
# This warning is logged when api.respond is called after the action call timed out
timeout_warning_pattern = re.compile(
r"Cannot send response: no active call found for action_call_id"
)
def check_output(line: str) -> None:
"""Check log output for expected messages."""
if not immediate_future.done() and immediate_pattern.search(line):
immediate_future.set_result(True)
elif (
not short_delay_responding_future.done()
and short_delay_responding_pattern.search(line)
):
short_delay_responding_future.set_result(True)
elif (
not long_delay_starting_future.done()
and long_delay_starting_pattern.search(line)
):
long_delay_starting_future.set_result(True)
elif (
not long_delay_responding_future.done()
and long_delay_responding_pattern.search(line)
):
long_delay_responding_future.set_result(True)
elif not timeout_warning_future.done() and timeout_warning_pattern.search(line):
timeout_warning_future.set_result(True)
# Run with log monitoring
async with (
run_compiled(yaml_config, line_callback=check_output),
api_client_connected() as client,
):
# Verify device info
device_info = await client.device_info()
assert device_info is not None
assert device_info.name == "api-action-timeout-test"
# List services
_, services = await client.list_entities_services()
# Should have 3 services
assert len(services) == 3, f"Expected 3 services, found {len(services)}"
# Find our services
action_immediate: UserService | None = None
action_short_delay: UserService | None = None
action_long_delay: UserService | None = None
for service in services:
if service.name == "action_immediate":
action_immediate = service
elif service.name == "action_short_delay":
action_short_delay = service
elif service.name == "action_long_delay":
action_long_delay = service
assert action_immediate is not None, "action_immediate not found"
assert action_short_delay is not None, "action_short_delay not found"
assert action_long_delay is not None, "action_long_delay not found"
# Test 1: Immediate response should work
response = await client.execute_service(
action_immediate,
{},
return_response=True,
)
await asyncio.wait_for(immediate_future, timeout=1.0)
assert response is not None, "Expected response for immediate action"
assert response.success is True
# Test 2: Short delay (200ms) should work within the 500ms timeout
response = await client.execute_service(
action_short_delay,
{},
return_response=True,
)
await asyncio.wait_for(short_delay_responding_future, timeout=1.0)
assert response is not None, "Expected response for short delay action"
assert response.success is True
# Test 3: Long delay (1s) should exceed the 500ms timeout
# The server-side timeout will clean up the action call after 500ms
# The client will timeout waiting for the response
# When the action finally tries to respond after 1s, it will log a warning
# Start the long delay action (don't await it fully - it will timeout)
long_delay_task = asyncio.create_task(
client.execute_service(
action_long_delay,
{},
return_response=True,
timeout=2.0, # Give client enough time to see the late response attempt
)
)
# Wait for the action to start
await asyncio.wait_for(long_delay_starting_future, timeout=1.0)
# Wait for the action to try to respond (after 1s delay)
await asyncio.wait_for(long_delay_responding_future, timeout=2.0)
# Wait for the warning log about no active call
await asyncio.wait_for(timeout_warning_future, timeout=1.0)
# The client task should complete (either with None response or timeout)
# Client timing out is acceptable - the server-side timeout already cleaned up the call
with contextlib.suppress(TimeoutError):
await asyncio.wait_for(long_delay_task, timeout=1.0)
# Verify the system is still functional after the timeout
# Call the immediate action again to prove cleanup worked
immediate_future_2 = loop.create_future()
def check_output_2(line: str) -> None:
if not immediate_future_2.done() and immediate_pattern.search(line):
immediate_future_2.set_result(True)
response = await client.execute_service(
action_immediate,
{},
return_response=True,
)
assert response is not None, "System should still work after timeout"
assert response.success is True

View File

@@ -88,13 +88,13 @@ async def test_api_conditional_memory(
assert arg_types["arg_float"] == UserServiceArgType.FLOAT
# Call simple service
client.execute_service(simple_service, {})
await client.execute_service(simple_service, {})
# Wait for service log
await asyncio.wait_for(service_simple_future, timeout=5.0)
# Call service with arguments
client.execute_service(
await client.execute_service(
service_with_args,
{
"arg_string": "test_string",

View File

@@ -114,7 +114,7 @@ async def test_api_custom_services(
assert custom_arrays_service is not None, "custom_service_with_arrays not found"
# Test YAML service
client.execute_service(yaml_service, {})
await client.execute_service(yaml_service, {})
await asyncio.wait_for(yaml_service_future, timeout=5.0)
# Verify YAML service with args arguments
@@ -124,7 +124,7 @@ async def test_api_custom_services(
assert yaml_args_types["my_string"] == UserServiceArgType.STRING
# Test YAML service with arguments
client.execute_service(
await client.execute_service(
yaml_args_service,
{
"my_int": 123,
@@ -144,7 +144,7 @@ async def test_api_custom_services(
assert yaml_many_args_types["arg4"] == UserServiceArgType.STRING
# Test YAML service with many arguments
client.execute_service(
await client.execute_service(
yaml_many_args_service,
{
"arg1": 42,
@@ -156,7 +156,7 @@ async def test_api_custom_services(
await asyncio.wait_for(yaml_many_args_future, timeout=5.0)
# Test simple CustomAPIDevice service
client.execute_service(custom_service, {})
await client.execute_service(custom_service, {})
await asyncio.wait_for(custom_service_future, timeout=5.0)
# Verify custom_args_service arguments
@@ -168,7 +168,7 @@ async def test_api_custom_services(
assert arg_types["arg_float"] == UserServiceArgType.FLOAT
# Test CustomAPIDevice service with arguments
client.execute_service(
await client.execute_service(
custom_args_service,
{
"arg_string": "test_string",
@@ -188,7 +188,7 @@ async def test_api_custom_services(
assert array_arg_types["string_array"] == UserServiceArgType.STRING_ARRAY
# Test CustomAPIDevice service with arrays
client.execute_service(
await client.execute_service(
custom_arrays_service,
{
"bool_array": [True, False],

View File

@@ -163,7 +163,7 @@ async def test_api_homeassistant(
assert trigger_service is not None, "trigger_all_tests service not found"
# Execute all tests
client.execute_service(trigger_service, {})
await client.execute_service(trigger_service, {})
# Wait for all tests to complete with appropriate timeouts
try:

View File

@@ -75,10 +75,12 @@ async def test_api_string_lambda(
assert char_ptr_service is not None, "test_char_ptr_lambda service not found"
# Execute all four services to test different lambda return types
client.execute_service(string_service, {"input_string": "STRING_FROM_LAMBDA"})
client.execute_service(int_service, {"input_number": 42})
client.execute_service(float_service, {"input_float": 3.14})
client.execute_service(
await client.execute_service(
string_service, {"input_string": "STRING_FROM_LAMBDA"}
)
await client.execute_service(int_service, {"input_number": 42})
await client.execute_service(float_service, {"input_float": 3.14})
await client.execute_service(
char_ptr_service, {"input_number": 123, "input_string": "test_string"}
)

View File

@@ -71,7 +71,7 @@ async def test_automation_wait_actions(
# Test 1: wait_until in automation - trigger 5 times rapidly
test_service = next((s for s in services if s.name == "test_wait_until"), None)
assert test_service is not None, "test_wait_until service not found"
client.execute_service(test_service, {})
await client.execute_service(test_service, {})
await asyncio.wait_for(test1_complete, timeout=3.0)
# Verify Test 1: All 5 triggers should complete
@@ -82,7 +82,7 @@ async def test_automation_wait_actions(
# Test 2: script.wait in automation - trigger 5 times rapidly
test_service = next((s for s in services if s.name == "test_script_wait"), None)
assert test_service is not None, "test_script_wait service not found"
client.execute_service(test_service, {})
await client.execute_service(test_service, {})
await asyncio.wait_for(test2_complete, timeout=3.0)
# Verify Test 2: All 5 triggers should complete
@@ -95,7 +95,7 @@ async def test_automation_wait_actions(
(s for s in services if s.name == "test_wait_timeout"), None
)
assert test_service is not None, "test_wait_timeout service not found"
client.execute_service(test_service, {})
await client.execute_service(test_service, {})
await asyncio.wait_for(test3_complete, timeout=3.0)
# Verify Test 3: All 5 triggers should timeout and complete

View File

@@ -67,7 +67,7 @@ async def test_delay_action_cancellation(
assert test_service is not None, "start_delay_then_restart service not found"
# Execute the test sequence
client.execute_service(test_service, {})
await client.execute_service(test_service, {})
# Wait for the second script to start
await asyncio.wait_for(second_script_started, timeout=5.0)
@@ -138,7 +138,7 @@ async def test_parallel_script_delays(
assert test_service is not None, "test_parallel_delays service not found"
# Execute the test - this will start 3 parallel scripts with 1 second delays
client.execute_service(test_service, {})
await client.execute_service(test_service, {})
# Wait for all scripts to complete (should take ~1 second, not 3)
await asyncio.wait_for(all_scripts_completed, timeout=2.0)

View File

@@ -142,7 +142,7 @@ async def test_continuation_actions(
# Test 1: IfAction with then branch
test_service = next((s for s in services if s.name == "test_if_action"), None)
assert test_service is not None, "test_if_action service not found"
client.execute_service(test_service, {"condition": True, "value": 42})
await client.execute_service(test_service, {"condition": True, "value": 42})
await asyncio.wait_for(test1_complete, timeout=2.0)
assert test_results["if_then"], "IfAction then branch not executed"
assert test_results["if_complete"], "IfAction did not complete"
@@ -150,7 +150,7 @@ async def test_continuation_actions(
# Test 1b: IfAction with else branch
test1_complete = loop.create_future()
test_results["if_complete"] = False
client.execute_service(test_service, {"condition": False, "value": 99})
await client.execute_service(test_service, {"condition": False, "value": 99})
await asyncio.wait_for(test1_complete, timeout=2.0)
assert test_results["if_else"], "IfAction else branch not executed"
assert test_results["if_complete"], "IfAction did not complete"
@@ -160,14 +160,14 @@ async def test_continuation_actions(
assert test_service is not None, "test_nested_if service not found"
# Both true
client.execute_service(test_service, {"outer": True, "inner": True})
await client.execute_service(test_service, {"outer": True, "inner": True})
await asyncio.wait_for(test2_complete, timeout=2.0)
assert test_results["nested_both_true"], "Nested both true not executed"
# Outer true, inner false
test2_complete = loop.create_future()
test_results["nested_complete"] = False
client.execute_service(test_service, {"outer": True, "inner": False})
await client.execute_service(test_service, {"outer": True, "inner": False})
await asyncio.wait_for(test2_complete, timeout=2.0)
assert test_results["nested_outer_true_inner_false"], (
"Nested outer true inner false not executed"
@@ -176,7 +176,7 @@ async def test_continuation_actions(
# Outer false
test2_complete = loop.create_future()
test_results["nested_complete"] = False
client.execute_service(test_service, {"outer": False, "inner": True})
await client.execute_service(test_service, {"outer": False, "inner": True})
await asyncio.wait_for(test2_complete, timeout=2.0)
assert test_results["nested_outer_false"], "Nested outer false not executed"
@@ -185,7 +185,7 @@ async def test_continuation_actions(
(s for s in services if s.name == "test_while_action"), None
)
assert test_service is not None, "test_while_action service not found"
client.execute_service(test_service, {"max_count": 3})
await client.execute_service(test_service, {"max_count": 3})
await asyncio.wait_for(test3_complete, timeout=2.0)
assert test_results["while_iterations"] == 3, (
f"WhileAction expected 3 iterations, got {test_results['while_iterations']}"
@@ -197,7 +197,7 @@ async def test_continuation_actions(
(s for s in services if s.name == "test_repeat_action"), None
)
assert test_service is not None, "test_repeat_action service not found"
client.execute_service(test_service, {"count": 5})
await client.execute_service(test_service, {"count": 5})
await asyncio.wait_for(test4_complete, timeout=2.0)
assert test_results["repeat_iterations"] == 5, (
f"RepeatAction expected 5 iterations, got {test_results['repeat_iterations']}"
@@ -207,7 +207,7 @@ async def test_continuation_actions(
# Test 5: Combined (if + repeat + while)
test_service = next((s for s in services if s.name == "test_combined"), None)
assert test_service is not None, "test_combined service not found"
client.execute_service(test_service, {"do_loop": True, "loop_count": 2})
await client.execute_service(test_service, {"do_loop": True, "loop_count": 2})
await asyncio.wait_for(test5_complete, timeout=2.0)
# Should execute: repeat 2 times, each iteration does while from iteration down to 0
# iteration 0: while 0 times = 0
@@ -221,7 +221,7 @@ async def test_continuation_actions(
# Test 6: Rapid triggers (tests memory efficiency of ContinuationAction)
test_service = next((s for s in services if s.name == "test_rapid_if"), None)
assert test_service is not None, "test_rapid_if service not found"
client.execute_service(test_service, {})
await client.execute_service(test_service, {})
await asyncio.wait_for(test6_complete, timeout=2.0)
# Values 1, 2 should hit else (<=2), values 3, 4, 5 should hit then (>2)
assert test_results["rapid_else"] == 2, (

View File

@@ -98,7 +98,7 @@ async def test_scheduler_bulk_cleanup(
)
# Execute the test
client.execute_service(trigger_bulk_cleanup_service, {})
await client.execute_service(trigger_bulk_cleanup_service, {})
# Wait for test completion
try:

View File

@@ -81,7 +81,7 @@ async def test_scheduler_defer_cancel(
client.subscribe_states(on_state)
# Execute the test
client.execute_service(test_defer_cancel_service, {})
await client.execute_service(test_defer_cancel_service, {})
# Wait for test completion
try:

View File

@@ -59,7 +59,7 @@ async def test_scheduler_defer_cancels_regular(
assert test_service is not None, "test_defer_cancels_regular service not found"
# Execute the test
client.execute_service(test_service, {})
await client.execute_service(test_service, {})
# Wait for test completion
try:

View File

@@ -84,7 +84,7 @@ async def test_scheduler_defer_fifo_simple(
client.subscribe_states(on_state)
# Test 1: Test set_timeout(0)
client.execute_service(test_set_timeout_service, {})
await client.execute_service(test_set_timeout_service, {})
# Wait for first test completion
try:
@@ -102,7 +102,7 @@ async def test_scheduler_defer_fifo_simple(
test_result_future = loop.create_future()
# Test 2: Test defer()
client.execute_service(test_defer_service, {})
await client.execute_service(test_defer_service, {})
# Wait for second test completion
try:

View File

@@ -92,7 +92,7 @@ async def test_scheduler_defer_stress(
assert run_stress_test_service is not None, "run_stress_test service not found"
# Call the run_stress_test service to start the test
client.execute_service(run_stress_test_service, {})
await client.execute_service(run_stress_test_service, {})
# Wait for all defers to execute (should be quick)
try:

View File

@@ -99,7 +99,7 @@ async def test_scheduler_heap_stress(
)
# Call the run_heap_stress_test service to start the test
client.execute_service(run_stress_test_service, {})
await client.execute_service(run_stress_test_service, {})
# Wait for all callbacks to execute (should be quick, but give more time for scheduling)
try:

View File

@@ -48,7 +48,7 @@ async def test_scheduler_null_name(
assert test_null_name_service is not None, "test_null_name service not found"
# Execute the test
client.execute_service(test_null_name_service, {})
await client.execute_service(test_null_name_service, {})
# Wait for test completion
try:

View File

@@ -120,42 +120,42 @@ async def test_scheduler_pool(
try:
# Phase 1: Component lifecycle
client.execute_service(phase_services[1], {})
await client.execute_service(phase_services[1], {})
await asyncio.wait_for(phase_futures[1], timeout=1.0)
await asyncio.sleep(0.05) # Let timeouts complete
# Phase 2: Sensor polling
client.execute_service(phase_services[2], {})
await client.execute_service(phase_services[2], {})
await asyncio.wait_for(phase_futures[2], timeout=1.0)
await asyncio.sleep(0.1) # Let intervals run a bit
# Phase 3: Communication patterns
client.execute_service(phase_services[3], {})
await client.execute_service(phase_services[3], {})
await asyncio.wait_for(phase_futures[3], timeout=1.0)
await asyncio.sleep(0.1) # Let heartbeat run
# Phase 4: Defer patterns
client.execute_service(phase_services[4], {})
await client.execute_service(phase_services[4], {})
await asyncio.wait_for(phase_futures[4], timeout=1.0)
await asyncio.sleep(0.2) # Let everything settle and recycle
# Phase 5: Pool reuse verification
client.execute_service(phase_services[5], {})
await client.execute_service(phase_services[5], {})
await asyncio.wait_for(phase_futures[5], timeout=1.0)
await asyncio.sleep(0.1) # Let Phase 5 timeouts complete and recycle
# Phase 6: Full pool reuse verification
client.execute_service(phase_services[6], {})
await client.execute_service(phase_services[6], {})
await asyncio.wait_for(phase_futures[6], timeout=1.0)
await asyncio.sleep(0.1) # Let Phase 6 timeouts complete
# Phase 7: Same-named defer optimization
client.execute_service(phase_services[7], {})
await client.execute_service(phase_services[7], {})
await asyncio.wait_for(phase_futures[7], timeout=1.0)
await asyncio.sleep(0.05) # Let the single defer execute
# Complete test
client.execute_service(complete_service, {})
await client.execute_service(complete_service, {})
await asyncio.wait_for(test_complete_future, timeout=0.5)
except TimeoutError as e:

View File

@@ -108,7 +108,7 @@ async def test_scheduler_rapid_cancellation(
)
# Call the service to start the test
client.execute_service(run_test_service, {})
await client.execute_service(run_test_service, {})
# Wait for test to complete with timeout
try:

View File

@@ -79,7 +79,7 @@ async def test_scheduler_recursive_timeout(
)
# Call the service to start the test
client.execute_service(run_test_service, {})
await client.execute_service(run_test_service, {})
# Wait for test to complete
try:

View File

@@ -81,7 +81,7 @@ async def test_scheduler_removed_item_race(
assert run_test_service is not None, "run_test service not found"
# Execute the test
client.execute_service(run_test_service, {})
await client.execute_service(run_test_service, {})
# Wait for test completion
try:

View File

@@ -98,7 +98,7 @@ async def test_scheduler_simultaneous_callbacks(
)
# Call the service to start the test
client.execute_service(run_test_service, {})
await client.execute_service(run_test_service, {})
# Wait for test to complete
try:

View File

@@ -134,27 +134,27 @@ async def test_scheduler_string_lifetime(
# Run tests sequentially, waiting for each to complete
try:
# Test 1
client.execute_service(test_services["test1"], {})
await client.execute_service(test_services["test1"], {})
await asyncio.wait_for(test1_complete.wait(), timeout=5.0)
# Test 2
client.execute_service(test_services["test2"], {})
await client.execute_service(test_services["test2"], {})
await asyncio.wait_for(test2_complete.wait(), timeout=5.0)
# Test 3
client.execute_service(test_services["test3"], {})
await client.execute_service(test_services["test3"], {})
await asyncio.wait_for(test3_complete.wait(), timeout=5.0)
# Test 4
client.execute_service(test_services["test4"], {})
await client.execute_service(test_services["test4"], {})
await asyncio.wait_for(test4_complete.wait(), timeout=5.0)
# Test 5
client.execute_service(test_services["test5"], {})
await client.execute_service(test_services["test5"], {})
await asyncio.wait_for(test5_complete.wait(), timeout=5.0)
# Final check
client.execute_service(test_services["final"], {})
await client.execute_service(test_services["final"], {})
await asyncio.wait_for(all_tests_complete.wait(), timeout=5.0)
except TimeoutError:

View File

@@ -92,7 +92,7 @@ async def test_scheduler_string_name_stress(
)
# Call the service to start the test
client.execute_service(run_stress_test_service, {})
await client.execute_service(run_stress_test_service, {})
# Wait for test to complete or crash
try:

View File

@@ -90,7 +90,7 @@ async def test_script_delay_with_params(
assert test_service is not None, "test_repeat_with_delay service not found"
# Execute the test
client.execute_service(test_service, {})
await client.execute_service(test_service, {})
# Wait for test to complete (10 iterations * ~100ms each + margin)
try:

View File

@@ -136,7 +136,7 @@ async def test_script_queued(
# Test 1: Queue depth limit
test_service = next((s for s in services if s.name == "test_queue_depth"), None)
assert test_service is not None, "test_queue_depth service not found"
client.execute_service(test_service, {})
await client.execute_service(test_service, {})
await asyncio.wait_for(test1_complete, timeout=2.0)
await asyncio.sleep(0.1) # Give time for rejections
@@ -151,7 +151,7 @@ async def test_script_queued(
# Test 2: Ring buffer order
test_service = next((s for s in services if s.name == "test_ring_buffer"), None)
assert test_service is not None, "test_ring_buffer service not found"
client.execute_service(test_service, {})
await client.execute_service(test_service, {})
await asyncio.wait_for(test2_complete, timeout=2.0)
# Verify Test 2
@@ -165,7 +165,7 @@ async def test_script_queued(
# Test 3: Stop clears queue
test_service = next((s for s in services if s.name == "test_stop_clears"), None)
assert test_service is not None, "test_stop_clears service not found"
client.execute_service(test_service, {})
await client.execute_service(test_service, {})
await asyncio.wait_for(test3_complete, timeout=2.0)
# Verify Test 3
@@ -179,7 +179,7 @@ async def test_script_queued(
# Test 4: Rejection enforcement (max_runs=3)
test_service = next((s for s in services if s.name == "test_rejection"), None)
assert test_service is not None, "test_rejection service not found"
client.execute_service(test_service, {})
await client.execute_service(test_service, {})
await asyncio.wait_for(test4_complete, timeout=2.0)
await asyncio.sleep(0.1) # Give time for rejections
@@ -194,7 +194,7 @@ async def test_script_queued(
# Test 5: No parameters
test_service = next((s for s in services if s.name == "test_no_params"), None)
assert test_service is not None, "test_no_params service not found"
client.execute_service(test_service, {})
await client.execute_service(test_service, {})
await asyncio.wait_for(test5_complete, timeout=2.0)
# Verify Test 5

View File

@@ -86,7 +86,7 @@ async def test_wait_until_mid_loop_timing(
assert test_service is not None, "test_mid_loop_timeout service not found"
# Execute the test
client.execute_service(test_service, {})
await client.execute_service(test_service, {})
# Wait for test to complete (100ms delay + 200ms timeout + margins = ~500ms)
await asyncio.wait_for(test_complete, timeout=5.0)

View File

@@ -74,7 +74,7 @@ async def test_wait_until_on_boot(
)
assert set_flag_service is not None, "set_test_flag service not found"
client.execute_service(set_flag_service, {})
await client.execute_service(set_flag_service, {})
# If the fix works, wait_until's loop() will check the condition and proceed
# If the bug exists, wait_until is stuck with disabled loop and will timeout

View File

@@ -71,7 +71,7 @@ async def test_wait_until_fifo_ordering(
assert test_service is not None, "test_wait_until_fifo service not found"
# Execute the test
client.execute_service(test_service, {})
await client.execute_service(test_service, {})
# Wait for test to complete
try: