[scheduler] Replace defer queue deque with vector to avoid 512-byte upfront allocation (#11305)
This commit is contained in:
@@ -328,17 +328,30 @@ void HOT Scheduler::call(uint32_t now) {
|
|||||||
// Single-core platforms don't use this queue and fall back to the heap-based approach.
|
// Single-core platforms don't use this queue and fall back to the heap-based approach.
|
||||||
//
|
//
|
||||||
// Note: Items cancelled via cancel_item_locked_() are marked with remove=true but still
|
// Note: Items cancelled via cancel_item_locked_() are marked with remove=true but still
|
||||||
// processed here. They are removed from the queue normally via pop_front() but skipped
|
// processed here. They are skipped during execution by should_skip_item_().
|
||||||
// during execution by should_skip_item_(). This is intentional - no memory leak occurs.
|
// This is intentional - no memory leak occurs.
|
||||||
while (!this->defer_queue_.empty()) {
|
//
|
||||||
// The outer check is done without a lock for performance. If the queue
|
// We use an index (defer_queue_front_) to track the read position instead of calling
|
||||||
// appears non-empty, we lock and process an item. We don't need to check
|
// erase() on every pop, which would be O(n). The queue is processed once per loop -
|
||||||
// empty() again inside the lock because only this thread can remove items.
|
// any items added during processing are left for the next loop iteration.
|
||||||
|
|
||||||
|
// Snapshot the queue end point - only process items that existed at loop start
|
||||||
|
// Items added during processing (by callbacks or other threads) run next loop
|
||||||
|
// No lock needed: single consumer (main loop), stale read just means we process less this iteration
|
||||||
|
size_t defer_queue_end = this->defer_queue_.size();
|
||||||
|
|
||||||
|
while (this->defer_queue_front_ < defer_queue_end) {
|
||||||
std::unique_ptr<SchedulerItem> item;
|
std::unique_ptr<SchedulerItem> item;
|
||||||
{
|
{
|
||||||
LockGuard lock(this->lock_);
|
LockGuard lock(this->lock_);
|
||||||
item = std::move(this->defer_queue_.front());
|
// SAFETY: Moving out the unique_ptr leaves a nullptr in the vector at defer_queue_front_.
|
||||||
this->defer_queue_.pop_front();
|
// This is intentional and safe because:
|
||||||
|
// 1. The vector is only cleaned up by cleanup_defer_queue_locked_() at the end of this function
|
||||||
|
// 2. Any code iterating defer_queue_ MUST check for nullptr items (see mark_matching_items_removed_
|
||||||
|
// and has_cancelled_timeout_in_container_ in scheduler.h)
|
||||||
|
// 3. The lock protects concurrent access, but the nullptr remains until cleanup
|
||||||
|
item = std::move(this->defer_queue_[this->defer_queue_front_]);
|
||||||
|
this->defer_queue_front_++;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Execute callback without holding lock to prevent deadlocks
|
// Execute callback without holding lock to prevent deadlocks
|
||||||
@@ -349,6 +362,13 @@ void HOT Scheduler::call(uint32_t now) {
|
|||||||
// Recycle the defer item after execution
|
// Recycle the defer item after execution
|
||||||
this->recycle_item_(std::move(item));
|
this->recycle_item_(std::move(item));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we've consumed all items up to the snapshot point, clean up the dead space
|
||||||
|
// Single consumer (main loop), so no lock needed for this check
|
||||||
|
if (this->defer_queue_front_ >= defer_queue_end) {
|
||||||
|
LockGuard lock(this->lock_);
|
||||||
|
this->cleanup_defer_queue_locked_();
|
||||||
|
}
|
||||||
#endif /* not ESPHOME_THREAD_SINGLE */
|
#endif /* not ESPHOME_THREAD_SINGLE */
|
||||||
|
|
||||||
// Convert the fresh timestamp from main loop to 64-bit for scheduler operations
|
// Convert the fresh timestamp from main loop to 64-bit for scheduler operations
|
||||||
|
|||||||
@@ -264,6 +264,36 @@ class Scheduler {
|
|||||||
// Helper to recycle a SchedulerItem
|
// Helper to recycle a SchedulerItem
|
||||||
void recycle_item_(std::unique_ptr<SchedulerItem> item);
|
void recycle_item_(std::unique_ptr<SchedulerItem> item);
|
||||||
|
|
||||||
|
#ifndef ESPHOME_THREAD_SINGLE
|
||||||
|
// Helper to cleanup defer_queue_ after processing
|
||||||
|
// IMPORTANT: Caller must hold the scheduler lock before calling this function.
|
||||||
|
inline void cleanup_defer_queue_locked_() {
|
||||||
|
// Check if new items were added by producers during processing
|
||||||
|
if (this->defer_queue_front_ >= this->defer_queue_.size()) {
|
||||||
|
// Common case: no new items - clear everything
|
||||||
|
this->defer_queue_.clear();
|
||||||
|
} else {
|
||||||
|
// Rare case: new items were added during processing - compact the vector
|
||||||
|
// This only happens when:
|
||||||
|
// 1. A deferred callback calls defer() again, or
|
||||||
|
// 2. Another thread calls defer() while we're processing
|
||||||
|
//
|
||||||
|
// Move unprocessed items (added during this loop) to the front for next iteration
|
||||||
|
//
|
||||||
|
// SAFETY: Compacted items may include cancelled items (marked for removal via
|
||||||
|
// cancel_item_locked_() during execution). This is safe because should_skip_item_()
|
||||||
|
// checks is_item_removed_() before executing, so cancelled items will be skipped
|
||||||
|
// and recycled on the next loop iteration.
|
||||||
|
size_t remaining = this->defer_queue_.size() - this->defer_queue_front_;
|
||||||
|
for (size_t i = 0; i < remaining; i++) {
|
||||||
|
this->defer_queue_[i] = std::move(this->defer_queue_[this->defer_queue_front_ + i]);
|
||||||
|
}
|
||||||
|
this->defer_queue_.resize(remaining);
|
||||||
|
}
|
||||||
|
this->defer_queue_front_ = 0;
|
||||||
|
}
|
||||||
|
#endif /* not ESPHOME_THREAD_SINGLE */
|
||||||
|
|
||||||
// Helper to check if item is marked for removal (platform-specific)
|
// Helper to check if item is marked for removal (platform-specific)
|
||||||
// Returns true if item should be skipped, handles platform-specific synchronization
|
// Returns true if item should be skipped, handles platform-specific synchronization
|
||||||
// For ESPHOME_THREAD_MULTI_NO_ATOMICS platforms, the caller must hold the scheduler lock before calling this
|
// For ESPHOME_THREAD_MULTI_NO_ATOMICS platforms, the caller must hold the scheduler lock before calling this
|
||||||
@@ -282,13 +312,18 @@ class Scheduler {
|
|||||||
|
|
||||||
// Helper to mark matching items in a container as removed
|
// Helper to mark matching items in a container as removed
|
||||||
// Returns the number of items marked for removal
|
// Returns the number of items marked for removal
|
||||||
// For ESPHOME_THREAD_MULTI_NO_ATOMICS platforms, the caller must hold the scheduler lock before calling this
|
// IMPORTANT: Caller must hold the scheduler lock before calling this function.
|
||||||
// function.
|
|
||||||
template<typename Container>
|
template<typename Container>
|
||||||
size_t mark_matching_items_removed_(Container &container, Component *component, const char *name_cstr,
|
size_t mark_matching_items_removed_(Container &container, Component *component, const char *name_cstr,
|
||||||
SchedulerItem::Type type, bool match_retry) {
|
SchedulerItem::Type type, bool match_retry) {
|
||||||
size_t count = 0;
|
size_t count = 0;
|
||||||
for (auto &item : container) {
|
for (auto &item : container) {
|
||||||
|
// Skip nullptr items (can happen in defer_queue_ when items are being processed)
|
||||||
|
// The defer_queue_ uses index-based processing: items are std::moved out but left in the
|
||||||
|
// vector as nullptr until cleanup. Even though this function is called with lock held,
|
||||||
|
// the vector can still contain nullptr items from the processing loop. This check prevents crashes.
|
||||||
|
if (!item)
|
||||||
|
continue;
|
||||||
if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
|
if (this->matches_item_(item, component, name_cstr, type, match_retry)) {
|
||||||
// Mark item for removal (platform-specific)
|
// Mark item for removal (platform-specific)
|
||||||
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
|
#ifdef ESPHOME_THREAD_MULTI_ATOMICS
|
||||||
@@ -311,6 +346,12 @@ class Scheduler {
|
|||||||
bool has_cancelled_timeout_in_container_(const Container &container, Component *component, const char *name_cstr,
|
bool has_cancelled_timeout_in_container_(const Container &container, Component *component, const char *name_cstr,
|
||||||
bool match_retry) const {
|
bool match_retry) const {
|
||||||
for (const auto &item : container) {
|
for (const auto &item : container) {
|
||||||
|
// Skip nullptr items (can happen in defer_queue_ when items are being processed)
|
||||||
|
// The defer_queue_ uses index-based processing: items are std::moved out but left in the
|
||||||
|
// vector as nullptr until cleanup. If this function is called during defer queue processing,
|
||||||
|
// it will iterate over these nullptr items. This check prevents crashes.
|
||||||
|
if (!item)
|
||||||
|
continue;
|
||||||
if (is_item_removed_(item.get()) &&
|
if (is_item_removed_(item.get()) &&
|
||||||
this->matches_item_(item, component, name_cstr, SchedulerItem::TIMEOUT, match_retry,
|
this->matches_item_(item, component, name_cstr, SchedulerItem::TIMEOUT, match_retry,
|
||||||
/* skip_removed= */ false)) {
|
/* skip_removed= */ false)) {
|
||||||
@@ -324,9 +365,12 @@ class Scheduler {
|
|||||||
std::vector<std::unique_ptr<SchedulerItem>> items_;
|
std::vector<std::unique_ptr<SchedulerItem>> items_;
|
||||||
std::vector<std::unique_ptr<SchedulerItem>> to_add_;
|
std::vector<std::unique_ptr<SchedulerItem>> to_add_;
|
||||||
#ifndef ESPHOME_THREAD_SINGLE
|
#ifndef ESPHOME_THREAD_SINGLE
|
||||||
// Single-core platforms don't need the defer queue and save 40 bytes of RAM
|
// Single-core platforms don't need the defer queue and save ~32 bytes of RAM
|
||||||
std::deque<std::unique_ptr<SchedulerItem>> defer_queue_; // FIFO queue for defer() calls
|
// Using std::vector instead of std::deque avoids 512-byte chunked allocations
|
||||||
#endif /* ESPHOME_THREAD_SINGLE */
|
// Index tracking avoids O(n) erase() calls when draining the queue each loop
|
||||||
|
std::vector<std::unique_ptr<SchedulerItem>> defer_queue_; // FIFO queue for defer() calls
|
||||||
|
size_t defer_queue_front_{0}; // Index of first valid item in defer_queue_ (tracks consumed items)
|
||||||
|
#endif /* ESPHOME_THREAD_SINGLE */
|
||||||
uint32_t to_remove_{0};
|
uint32_t to_remove_{0};
|
||||||
|
|
||||||
// Memory pool for recycling SchedulerItem objects to reduce heap churn.
|
// Memory pool for recycling SchedulerItem objects to reduce heap churn.
|
||||||
|
|||||||
Reference in New Issue
Block a user