mirror of
https://github.com/esphome/esphome.git
synced 2026-02-18 15:35:59 -07:00
Merge branch 'usb_memory_order_retry' into integration
This commit is contained in:
@@ -175,20 +175,75 @@ void USBUartComponent::reset_input_state_(USBUartChannel *channel) {
|
||||
}
|
||||
|
||||
void USBUartComponent::restart_input_(USBUartChannel *channel) {
|
||||
// Atomically check if still started and clear it before calling start_input
|
||||
// This prevents race with concurrent restart attempts from different threads
|
||||
// Atomically verify it's still started (true) and keep it started
|
||||
// This prevents the race window of toggling true->false->true
|
||||
bool expected = true;
|
||||
if (channel->input_started_.compare_exchange_strong(expected, false)) {
|
||||
if (channel->input_started_.compare_exchange_strong(expected, true)) {
|
||||
// Still started - do the actual restart work without toggling the flag
|
||||
this->do_start_input_(channel);
|
||||
}
|
||||
}
|
||||
|
||||
void USBUartComponent::do_start_input_(USBUartChannel *channel) {
|
||||
// This function does the actual work of starting input
|
||||
// Caller must ensure input_started_ is already set to true
|
||||
const auto *ep = channel->cdc_dev_.in_ep;
|
||||
// CALLBACK CONTEXT: This lambda is executed in USB task via transfer_callback
|
||||
auto callback = [this, channel](const usb_host::TransferStatus &status) {
|
||||
ESP_LOGV(TAG, "Transfer result: length: %u; status %X", status.data_len, status.error_code);
|
||||
if (!status.success) {
|
||||
ESP_LOGE(TAG, "Control transfer failed, status=%s", esp_err_to_name(status.error_code));
|
||||
// Transfer failed, slot already released
|
||||
// Reset state so normal operations can restart later
|
||||
this->reset_input_state_(channel);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!channel->dummy_receiver_ && status.data_len > 0) {
|
||||
// Allocate a chunk from the pool
|
||||
UsbDataChunk *chunk = this->chunk_pool_.allocate();
|
||||
if (chunk == nullptr) {
|
||||
// No chunks available - queue is full, data dropped, slot already released
|
||||
this->usb_data_queue_.increment_dropped_count();
|
||||
// Reset state so normal operations can restart later
|
||||
this->reset_input_state_(channel);
|
||||
return;
|
||||
}
|
||||
|
||||
// Copy data to chunk (this is fast, happens in USB task)
|
||||
memcpy(chunk->data, status.data, status.data_len);
|
||||
chunk->length = status.data_len;
|
||||
chunk->channel = channel;
|
||||
|
||||
// Push to lock-free queue for main loop processing
|
||||
// Push always succeeds because pool size == queue size
|
||||
this->usb_data_queue_.push(chunk);
|
||||
}
|
||||
|
||||
// On success, reset retry count and restart input immediately from USB task for performance
|
||||
// The lock-free queue will handle backpressure
|
||||
channel->input_retry_count_.store(0);
|
||||
channel->input_started_.store(false);
|
||||
this->start_input(channel);
|
||||
};
|
||||
// input_started_ already set to true by caller
|
||||
auto result = this->transfer_in(ep->bEndpointAddress, callback, ep->wMaxPacketSize);
|
||||
if (result == usb_host::TRANSFER_ERROR_NO_SLOTS) {
|
||||
// No slots available - defer retry to main loop
|
||||
this->defer_input_retry_(channel);
|
||||
} else if (result != usb_host::TRANSFER_OK) {
|
||||
// Other error (submit failed) - don't retry, just reset state
|
||||
// Error already logged by transfer_in()
|
||||
this->reset_input_state_(channel);
|
||||
}
|
||||
}
|
||||
|
||||
void USBUartComponent::defer_input_retry_(USBUartChannel *channel) {
|
||||
static constexpr uint8_t MAX_INPUT_RETRIES = 10;
|
||||
|
||||
// Atomically increment and get previous value
|
||||
uint8_t retry_count = channel->input_retry_count_.fetch_add(1);
|
||||
if (retry_count >= MAX_INPUT_RETRIES) {
|
||||
// Atomically increment and get the NEW value (previous + 1)
|
||||
uint8_t new_retry_count = channel->input_retry_count_.fetch_add(1) + 1;
|
||||
if (new_retry_count > MAX_INPUT_RETRIES) {
|
||||
ESP_LOGE(TAG, "Input retry limit reached for channel %d, stopping retries", channel->index_);
|
||||
this->reset_input_state_(channel);
|
||||
return;
|
||||
@@ -250,7 +305,8 @@ void USBUartComponent::start_input(USBUartChannel *channel) {
|
||||
// Atomically check if not started and set to started in one operation
|
||||
bool expected = false;
|
||||
if (!channel->input_started_.compare_exchange_strong(expected, true))
|
||||
return; // Already started, another thread won the race
|
||||
return; // Already started - prevents duplicate transfers from concurrent threads
|
||||
|
||||
// THREAD CONTEXT: Called from both USB task and main loop threads
|
||||
// - USB task: Immediate restart after successful transfer for continuous data flow
|
||||
// - Main loop: Controlled restart after consuming data (backpressure mechanism)
|
||||
@@ -261,55 +317,9 @@ void USBUartComponent::start_input(USBUartChannel *channel) {
|
||||
//
|
||||
// The underlying transfer_in() uses lock-free atomic allocation from the
|
||||
// TransferRequest pool, making this multi-threaded access safe
|
||||
const auto *ep = channel->cdc_dev_.in_ep;
|
||||
// CALLBACK CONTEXT: This lambda is executed in USB task via transfer_callback
|
||||
auto callback = [this, channel](const usb_host::TransferStatus &status) {
|
||||
ESP_LOGV(TAG, "Transfer result: length: %u; status %X", status.data_len, status.error_code);
|
||||
if (!status.success) {
|
||||
ESP_LOGE(TAG, "Control transfer failed, status=%s", esp_err_to_name(status.error_code));
|
||||
// Transfer failed, slot already released
|
||||
// Reset state so normal operations can restart later
|
||||
this->reset_input_state_(channel);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!channel->dummy_receiver_ && status.data_len > 0) {
|
||||
// Allocate a chunk from the pool
|
||||
UsbDataChunk *chunk = this->chunk_pool_.allocate();
|
||||
if (chunk == nullptr) {
|
||||
// No chunks available - queue is full, data dropped, slot already released
|
||||
this->usb_data_queue_.increment_dropped_count();
|
||||
// Reset state so normal operations can restart later
|
||||
this->reset_input_state_(channel);
|
||||
return;
|
||||
}
|
||||
|
||||
// Copy data to chunk (this is fast, happens in USB task)
|
||||
memcpy(chunk->data, status.data, status.data_len);
|
||||
chunk->length = status.data_len;
|
||||
chunk->channel = channel;
|
||||
|
||||
// Push to lock-free queue for main loop processing
|
||||
// Push always succeeds because pool size == queue size
|
||||
this->usb_data_queue_.push(chunk);
|
||||
}
|
||||
|
||||
// On success, reset retry count and restart input immediately from USB task for performance
|
||||
// The lock-free queue will handle backpressure
|
||||
channel->input_retry_count_.store(0);
|
||||
channel->input_started_.store(false);
|
||||
this->start_input(channel);
|
||||
};
|
||||
// input_started_ already set to true by compare_exchange_strong above
|
||||
auto result = this->transfer_in(ep->bEndpointAddress, callback, ep->wMaxPacketSize);
|
||||
if (result == usb_host::TRANSFER_ERROR_NO_SLOTS) {
|
||||
// No slots available - defer retry to main loop
|
||||
this->defer_input_retry_(channel);
|
||||
} else if (result != usb_host::TRANSFER_OK) {
|
||||
// Other error (submit failed) - don't retry, just reset state
|
||||
// Error already logged by transfer_in()
|
||||
this->reset_input_state_(channel);
|
||||
}
|
||||
// Do the actual work (input_started_ already set to true by CAS above)
|
||||
this->do_start_input_(channel);
|
||||
}
|
||||
|
||||
void USBUartComponent::start_output(USBUartChannel *channel) {
|
||||
|
||||
@@ -144,6 +144,7 @@ class USBUartComponent : public usb_host::USBClient {
|
||||
void defer_input_retry_(USBUartChannel *channel);
|
||||
void reset_input_state_(USBUartChannel *channel);
|
||||
void restart_input_(USBUartChannel *channel);
|
||||
void do_start_input_(USBUartChannel *channel);
|
||||
std::vector<USBUartChannel *> channels_{};
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user