mirror of
https://github.com/esphome/esphome.git
synced 2026-02-24 02:08:23 -07:00
[scheduler] Reduce lock acquisitions in process_defer_queue_ (#14107)
This commit is contained in:
@@ -421,6 +421,29 @@ void Scheduler::full_cleanup_removed_items_() {
|
||||
this->to_remove_ = 0;
|
||||
}
|
||||
|
||||
#ifndef ESPHOME_THREAD_SINGLE
|
||||
void Scheduler::compact_defer_queue_locked_() {
|
||||
// Rare case: new items were added during processing - compact the vector
|
||||
// This only happens when:
|
||||
// 1. A deferred callback calls defer() again, or
|
||||
// 2. Another thread calls defer() while we're processing
|
||||
//
|
||||
// Move unprocessed items (added during this loop) to the front for next iteration
|
||||
//
|
||||
// SAFETY: Compacted items may include cancelled items (marked for removal via
|
||||
// cancel_item_locked_() during execution). This is safe because should_skip_item_()
|
||||
// checks is_item_removed_() before executing, so cancelled items will be skipped
|
||||
// and recycled on the next loop iteration.
|
||||
size_t remaining = this->defer_queue_.size() - this->defer_queue_front_;
|
||||
for (size_t i = 0; i < remaining; i++) {
|
||||
this->defer_queue_[i] = std::move(this->defer_queue_[this->defer_queue_front_ + i]);
|
||||
}
|
||||
// Use erase() instead of resize() to avoid instantiating _M_default_append
|
||||
// (saves ~156 bytes flash). Erasing from the end is O(1) - no shifting needed.
|
||||
this->defer_queue_.erase(this->defer_queue_.begin() + remaining, this->defer_queue_.end());
|
||||
}
|
||||
#endif /* not ESPHOME_THREAD_SINGLE */
|
||||
|
||||
void HOT Scheduler::call(uint32_t now) {
|
||||
#ifndef ESPHOME_THREAD_SINGLE
|
||||
this->process_defer_queue_(now);
|
||||
|
||||
@@ -387,41 +387,46 @@ class Scheduler {
|
||||
// No lock needed: single consumer (main loop), stale read just means we process less this iteration
|
||||
size_t defer_queue_end = this->defer_queue_.size();
|
||||
|
||||
// Fast path: nothing to process, avoid lock entirely.
|
||||
// Safe without lock: single consumer (main loop) reads front_, and a stale size() read
|
||||
// from a concurrent push can only make us see fewer items — they'll be processed next loop.
|
||||
if (this->defer_queue_front_ >= defer_queue_end)
|
||||
return;
|
||||
|
||||
// Merge lock acquisitions: instead of separate locks for move-out and recycle (2N+1 total),
|
||||
// recycle each item after re-acquiring the lock for the next iteration (N+1 total).
|
||||
// The lock is held across: recycle → loop condition → move-out, then released for execution.
|
||||
std::unique_ptr<SchedulerItem> item;
|
||||
|
||||
this->lock_.lock();
|
||||
while (this->defer_queue_front_ < defer_queue_end) {
|
||||
std::unique_ptr<SchedulerItem> item;
|
||||
{
|
||||
LockGuard lock(this->lock_);
|
||||
// SAFETY: Moving out the unique_ptr leaves a nullptr in the vector at defer_queue_front_.
|
||||
// This is intentional and safe because:
|
||||
// 1. The vector is only cleaned up by cleanup_defer_queue_locked_() at the end of this function
|
||||
// 2. Any code iterating defer_queue_ MUST check for nullptr items (see mark_matching_items_removed_locked_
|
||||
// and has_cancelled_timeout_in_container_locked_ in scheduler.h)
|
||||
// 3. The lock protects concurrent access, but the nullptr remains until cleanup
|
||||
item = std::move(this->defer_queue_[this->defer_queue_front_]);
|
||||
this->defer_queue_front_++;
|
||||
}
|
||||
// SAFETY: Moving out the unique_ptr leaves a nullptr in the vector at defer_queue_front_.
|
||||
// This is intentional and safe because:
|
||||
// 1. The vector is only cleaned up by cleanup_defer_queue_locked_() at the end of this function
|
||||
// 2. Any code iterating defer_queue_ MUST check for nullptr items (see mark_matching_items_removed_locked_
|
||||
// and has_cancelled_timeout_in_container_locked_ in scheduler.h)
|
||||
// 3. The lock protects concurrent access, but the nullptr remains until cleanup
|
||||
item = std::move(this->defer_queue_[this->defer_queue_front_]);
|
||||
this->defer_queue_front_++;
|
||||
this->lock_.unlock();
|
||||
|
||||
// Execute callback without holding lock to prevent deadlocks
|
||||
// if the callback tries to call defer() again
|
||||
if (!this->should_skip_item_(item.get())) {
|
||||
now = this->execute_item_(item.get(), now);
|
||||
}
|
||||
// Recycle the defer item after execution
|
||||
{
|
||||
LockGuard lock(this->lock_);
|
||||
this->recycle_item_main_loop_(std::move(item));
|
||||
}
|
||||
}
|
||||
|
||||
// If we've consumed all items up to the snapshot point, clean up the dead space
|
||||
// Single consumer (main loop), so no lock needed for this check
|
||||
if (this->defer_queue_front_ >= defer_queue_end) {
|
||||
LockGuard lock(this->lock_);
|
||||
this->cleanup_defer_queue_locked_();
|
||||
this->lock_.lock();
|
||||
this->recycle_item_main_loop_(std::move(item));
|
||||
}
|
||||
// Clean up the queue (lock already held from last recycle or initial acquisition)
|
||||
this->cleanup_defer_queue_locked_();
|
||||
this->lock_.unlock();
|
||||
}
|
||||
|
||||
// Helper to cleanup defer_queue_ after processing
|
||||
// Helper to cleanup defer_queue_ after processing.
|
||||
// Keeps the common clear() path inline, outlines the rare compaction to keep
|
||||
// cold code out of the hot instruction cache lines.
|
||||
// IMPORTANT: Caller must hold the scheduler lock before calling this function.
|
||||
inline void cleanup_defer_queue_locked_() {
|
||||
// Check if new items were added by producers during processing
|
||||
@@ -429,27 +434,17 @@ class Scheduler {
|
||||
// Common case: no new items - clear everything
|
||||
this->defer_queue_.clear();
|
||||
} else {
|
||||
// Rare case: new items were added during processing - compact the vector
|
||||
// This only happens when:
|
||||
// 1. A deferred callback calls defer() again, or
|
||||
// 2. Another thread calls defer() while we're processing
|
||||
//
|
||||
// Move unprocessed items (added during this loop) to the front for next iteration
|
||||
//
|
||||
// SAFETY: Compacted items may include cancelled items (marked for removal via
|
||||
// cancel_item_locked_() during execution). This is safe because should_skip_item_()
|
||||
// checks is_item_removed_() before executing, so cancelled items will be skipped
|
||||
// and recycled on the next loop iteration.
|
||||
size_t remaining = this->defer_queue_.size() - this->defer_queue_front_;
|
||||
for (size_t i = 0; i < remaining; i++) {
|
||||
this->defer_queue_[i] = std::move(this->defer_queue_[this->defer_queue_front_ + i]);
|
||||
}
|
||||
// Use erase() instead of resize() to avoid instantiating _M_default_append
|
||||
// (saves ~156 bytes flash). Erasing from the end is O(1) - no shifting needed.
|
||||
this->defer_queue_.erase(this->defer_queue_.begin() + remaining, this->defer_queue_.end());
|
||||
// Rare case: new items were added during processing - outlined to keep cold code
|
||||
// out of the hot instruction cache lines
|
||||
this->compact_defer_queue_locked_();
|
||||
}
|
||||
this->defer_queue_front_ = 0;
|
||||
}
|
||||
|
||||
// Cold path for compacting defer_queue_ when new items were added during processing.
|
||||
// IMPORTANT: Caller must hold the scheduler lock before calling this function.
|
||||
// IMPORTANT: Must not be inlined - rare path, outlined to keep it out of the hot instruction cache lines.
|
||||
void __attribute__((noinline)) compact_defer_queue_locked_();
|
||||
#endif /* not ESPHOME_THREAD_SINGLE */
|
||||
|
||||
// Helper to check if item is marked for removal (platform-specific)
|
||||
|
||||
Reference in New Issue
Block a user