LCOV - code coverage report
Current view: top level - src/sched - scheduler.cpp (source / functions) Coverage Total Hit
Test: HPActor Coverage Lines: 72.2 % 353 255
Test Date: 2026-05-20 02:24:49 Functions: 63.5 % 52 33
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: - 0 0

             Branch data     Line data    Source code
       1                 :             : // Copyright 2026 HPActor Contributors
       2                 :             : //
       3                 :             : // Licensed under the Apache License, Version 2.0 (the "License");
       4                 :             : // you may not use this file except in compliance with the License.
       5                 :             : // You may obtain a copy of the License at
       6                 :             : //
       7                 :             : //     http://www.apache.org/licenses/LICENSE-2.0
       8                 :             : //
       9                 :             : // Unless required by applicable law or agreed to in writing, software
      10                 :             : // distributed under the License is distributed on an "AS IS" BASIS,
      11                 :             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12                 :             : // See the License for the specific language governing permissions and
      13                 :             : // limitations under the License.
      14                 :             : 
      15                 :             : #include <hpactor/actor/event_based_actor.hpp>
      16                 :             : #include <hpactor/core/actor_system.hpp>
      17                 :             : #include <hpactor/hpactor_config.hpp>
      18                 :             : #include <hpactor/log/log_field.hpp>
      19                 :             : #include <hpactor/log/logger.hpp>
      20                 :             : #include <hpactor/sched/dedicated_thread_pool.hpp>
      21                 :             : #include <hpactor/sched/scheduler.hpp>
      22                 :             : 
      23                 :             : #include <variant>
      24                 :             : 
      25                 :             : #if HPACTOR_SUPPORT_COROUTINES
      26                 :             : #    include <hpactor/sched/coroutine_task.hpp>
      27                 :             : #endif
      28                 :             : 
      29                 :             : namespace hpactor::sched {
      30                 :             : 
      31                 :             : // DedicatedStorage: holds per-actor dedicated execution state.
      32                 :             : // Defined in .cpp to allow DedicatedThreadPool to remain incomplete in the
      33                 :             : // header (libc++ noexcept containers require complete types).
      34                 :             : struct HybridScheduler::DedicatedStorage {
      35                 :             :     // Dedicated thread tracking (thread lifecycle managed by DaemonActor)
      36                 :             :     std::unordered_set<ActorId> dedicated_thread_actors_;
      37                 :             :     std::unordered_map<ActorId, int> dedicated_thread_affinity_;
      38                 :             :     std::mutex dedicated_mutex_;
      39                 :             : 
      40                 :             :     // Dedicated thread pools (pool_size -> pool)
      41                 :             :     std::unordered_map<uint32_t, std::unique_ptr<DedicatedThreadPool>> dedicated_pools_;
      42                 :             :     std::unordered_map<ActorId, uint32_t> actor_pool_map_; // actor -> pool_size
      43                 :             : };
      44                 :             : 
      45                 :             : // Thread-local pointer to the current worker executing on this thread
      46                 :             : thread_local uint32_t tl_current_worker_id = UINT32_MAX;
      47                 :             : 
      48                 :         106 : HybridScheduler::HybridScheduler(ActorSystem& system, uint32_t num_workers,
      49                 :             :                                  uint32_t num_priorities,
      50                 :         106 :                                  TimerBackend timer_backend, bool start_paused)
      51                 :         106 :     : system_(system), num_workers_(num_workers), num_priorities_(num_priorities),
      52                 :         318 :       workers_(num_workers), a2ws_(num_workers), workers_paused_(start_paused),
      53                 :         318 :       dedicated_(std::make_unique<DedicatedStorage>()) {
      54                 :         106 :     switch (timer_backend) {
      55                 :         106 :         case TimerBackend::TimingWheel:
      56                 :         106 :             timer_backend_.emplace<TimingWheel>(1'000'000, 4);
      57                 :         106 :             break;
      58                 :           0 :         case TimerBackend::CalendarQueue:
      59                 :           0 :             timer_backend_.emplace<CalendarQueue>();
      60                 :           0 :             break;
      61                 :             :     }
      62                 :         206 :     for (uint32_t i = 0; i < num_workers; ++i) {
      63                 :         100 :         workers_[i].queues =
      64                 :         200 :             std::make_unique<ChaselevDeque<WorkItem>[]>(num_priorities);
      65                 :         100 :         workers_[i].index = i;
      66                 :             :     }
      67                 :         106 : }
      68                 :             : 
      69                 :         106 : void HybridScheduler::start() {
      70                 :         106 :     if (running_.load(std::memory_order_acquire)) {
      71                 :           0 :         return;
      72                 :             :     }
      73                 :         106 :     running_.store(true, std::memory_order_release);
      74                 :             : 
      75                 :         106 :     worker_threads_.reserve(workers_.size());
      76                 :         206 :     for (size_t i = 0; i < workers_.size(); ++i) {
      77                 :         100 :         worker_threads_.emplace_back(
      78                 :         200 :             [this, i] { worker_loop(static_cast<uint32_t>(i)); });
      79                 :             :     }
      80                 :             : 
      81                 :             :     // Start timer advancement thread
      82                 :         212 :     timer_thread_ = std::thread([this] {
      83                 :       19945 :         while (running_.load(std::memory_order_acquire)) {
      84                 :       19839 :             auto now = std::chrono::steady_clock::now().time_since_epoch().count();
      85                 :       39678 :             std::visit([&](auto& backend) { backend.advance(now); }, timer_backend_);
      86                 :       19839 :             std::this_thread::sleep_for(std::chrono::milliseconds(1));
      87                 :             :         }
      88                 :         106 :     });
      89                 :             : }
      90                 :             : 
      91                 :         212 : HybridScheduler::~HybridScheduler() {
      92                 :         106 :     stop();
      93                 :         212 : }
      94                 :             : 
      95                 :         212 : void HybridScheduler::stop() {
      96                 :         212 :     running_.store(false, std::memory_order_release);
      97                 :             :     // Wake any workers parked in wait_if_paused so they see running_ == false
      98                 :             :     // and exit their loop.
      99                 :         212 :     resume_workers();
     100                 :         312 :     for (auto& t : worker_threads_) {
     101                 :         100 :         if (t.joinable())
     102                 :         100 :             t.join();
     103                 :             :     }
     104                 :         212 :     worker_threads_.clear();
     105                 :             : 
     106                 :             :     // Stop and join timer thread
     107                 :         212 :     if (timer_thread_.joinable()) {
     108                 :         106 :         timer_thread_.join();
     109                 :             :     }
     110                 :         212 : }
     111                 :             : 
     112                 :         159 : void HybridScheduler::notify_ready(ActorId actor, uint8_t priority,
     113                 :             :                                    int64_t deadline_ns) {
     114                 :         159 :     if (!running_.load(std::memory_order_acquire)) {
     115                 :         108 :         return;
     116                 :             :     }
     117                 :             : 
     118                 :         159 :     WorkItem item{actor, deadline_ns, 0};
     119                 :             : 
     120                 :         159 :     DedicatedThreadPool* dedicated_pool = nullptr;
     121                 :             :     {
     122                 :         159 :         std::lock_guard<std::mutex> lock(dedicated_->dedicated_mutex_);
     123                 :             : 
     124                 :         159 :         if (dedicated_->dedicated_thread_actors_.find(actor) !=
     125                 :         318 :             dedicated_->dedicated_thread_actors_.end()) {
     126                 :           1 :             return;
     127                 :             :         }
     128                 :             : 
     129                 :         158 :         auto actor_pool = dedicated_->actor_pool_map_.find(actor);
     130                 :         158 :         if (actor_pool != dedicated_->actor_pool_map_.end()) {
     131                 :           2 :             auto pool = dedicated_->dedicated_pools_.find(actor_pool->second);
     132                 :           2 :             if (pool != dedicated_->dedicated_pools_.end()) {
     133                 :           2 :                 dedicated_pool = pool->second.get();
     134                 :             :             }
     135                 :             :         }
     136                 :         159 :     }
     137                 :             : 
     138                 :         158 :     if (dedicated_pool != nullptr) {
     139                 :           4 :         dedicated_pool->enqueue(actor, [this, item]() { execute_actor(item); });
     140                 :           2 :         return;
     141                 :             :     }
     142                 :             : 
     143                 :             :     // Cooperative path: gate on actor state to prevent double-enqueue.
     144                 :             :     // If the actor is already executing (Running) or enqueued (Ready),
     145                 :             :     // a second WorkItem would be redundant.  CAS Idle→Ready atomically.
     146                 :         156 :     auto actor_ptr = system_.get_actor(actor);
     147                 :         156 :     if (actor_ptr && actor_ptr->is_event_based_actor()) {
     148                 :         149 :         auto* eb = static_cast<EventBasedActor*>(actor_ptr.get());
     149                 :         149 :         auto& state = eb->actor_state();
     150                 :         149 :         uint32_t current = state.get();
     151                 :         149 :         if (current == ActorState::kReady || current == ActorState::kRunning)
     152                 :          38 :             return;
     153                 :         111 :         if (current == ActorState::kTerminated)
     154                 :           0 :             return;
     155                 :         111 :         if (!state.cas(current, ActorState::kReady))
     156                 :           0 :             return; // another thread won the race
     157                 :             :     }
     158                 :             : 
     159                 :             :     // Round-robin across workers for fair initial placement.
     160                 :             :     // The atomic counter avoids the stale hint issue where get_victim always
     161                 :             :     // returns the same value because record_attempt is only called on steals.
     162                 :         118 :     if (num_workers_ == 0)
     163                 :          67 :         return;
     164                 :             :     static std::atomic<uint32_t> rr_counter{0};
     165                 :          51 :     uint32_t victim = rr_counter.fetch_add(1, std::memory_order_relaxed);
     166                 :             : 
     167                 :             :     // If deadline is INT64_MAX, use priority queue; otherwise use EDF queue
     168                 :          51 :     if (deadline_ns == INT64_MAX) {
     169                 :             :         // Push to priority queue using A2WS-selected victim
     170                 :          48 :         workers_[victim % num_workers_].queues[priority].push_bottom(item);
     171                 :             :     } else {
     172                 :             :         // Push to EDF queue for deadline-ordered processing
     173                 :           3 :         workers_[victim % num_workers_].edf_queue.push(deadline_ns, item);
     174                 :             :     }
     175                 :         156 : }
     176                 :             : 
     177                 :           0 : void HybridScheduler::notify_idle(ActorId actor) {
     178                 :             :     // Remove actor from EDF tracking if it was scheduled there
     179                 :             :     // For now, this is a stub - full implementation would need EDF cancellation
     180                 :             :     (void)actor;
     181                 :           0 : }
     182                 :             : 
     183                 :           0 : void HybridScheduler::yield(ActorId actor, uint8_t priority) {
     184                 :           0 :     notify_ready(actor, priority, INT64_MAX);
     185                 :           0 : }
     186                 :             : 
     187                 :       25649 : bool HybridScheduler::try_steal(WorkItem& out) {
     188                 :             :     // Use A2WS for adaptive victim selection
     189                 :       88667 :     for (uint32_t attempt = 0; attempt < num_workers_; ++attempt) {
     190                 :             :         // Get next victim from A2WS
     191                 :       63032 :         uint32_t victim_idx = a2ws_.get_victim(attempt % num_workers_);
     192                 :             : 
     193                 :       63032 :         auto& victim = workers_[victim_idx];
     194                 :             : 
     195                 :             :         // Try EDF queue first (deadline-ordered work has highest urgency)
     196                 :       63032 :         if (victim.edf_queue.pop(out)) {
     197                 :           0 :             a2ws_.record_steal(attempt % num_workers_, victim_idx);
     198                 :           0 :             if (metrics_ring_buffer_) [[unlikely]] {
     199                 :           0 :                 metrics::MetricEvent evt{};
     200                 :           0 :                 evt.actor_id = out.actor;
     201                 :           0 :                 evt.event_type = metrics::MetricEventType::kSchedulerSteal;
     202                 :           0 :                 evt.value_hi = victim_idx;
     203                 :           0 :                 metrics_ring_buffer_->try_push(evt);
     204                 :             :             }
     205                 :           0 :             HPACTOR_LOG_DEBUG(
     206                 :             :                 log::LogCategory::kScheduler, out.actor,
     207                 :             :                 static_cast<uint32_t>(log::LogEventId::kSchedulerSteal),
     208                 :             :                 "work stolen",
     209                 :             :                 log::field("from_worker", static_cast<uint64_t>(victim_idx)),
     210                 :             :                 log::field("to_worker",
     211                 :             :                            static_cast<uint64_t>(tl_current_worker_id)));
     212                 :           0 :             return true;
     213                 :             :         }
     214                 :             : 
     215                 :             :         // Try each priority level from highest to lowest
     216                 :      315106 :         for (uint32_t p = 0; p < num_priorities_; ++p) {
     217                 :      252088 :             if (victim.queues[p].steal_top(out)) {
     218                 :          14 :                 a2ws_.record_steal(attempt % num_workers_, victim_idx);
     219                 :          14 :                 if (metrics_ring_buffer_) [[unlikely]] {
     220                 :          14 :                     metrics::MetricEvent evt{};
     221                 :          14 :                     evt.actor_id = out.actor;
     222                 :          14 :                     evt.event_type = metrics::MetricEventType::kSchedulerSteal;
     223                 :          14 :                     evt.value_hi = victim_idx;
     224                 :          14 :                     metrics_ring_buffer_->try_push(evt);
     225                 :             :                 }
     226                 :          14 :                 HPACTOR_LOG_DEBUG(
     227                 :             :                     log::LogCategory::kScheduler, out.actor,
     228                 :             :                     static_cast<uint32_t>(log::LogEventId::kSchedulerSteal),
     229                 :             :                     "work stolen",
     230                 :             :                     log::field("from_worker", static_cast<uint64_t>(victim_idx)),
     231                 :             :                     log::field("to_worker",
     232                 :             :                                static_cast<uint64_t>(tl_current_worker_id)));
     233                 :          14 :                 return true;
     234                 :             :             }
     235                 :             :         }
     236                 :             : 
     237                 :             :         // Record failed attempt
     238                 :       63018 :         a2ws_.record_attempt(attempt % num_workers_, victim_idx, false);
     239                 :             :     }
     240                 :       25635 :     return false;
     241                 :             : }
     242                 :             : 
     243                 :       25665 : bool HybridScheduler::pop_local(WorkItem& out, uint32_t worker_id) {
     244                 :       25665 :     auto& worker = workers_[worker_id];
     245                 :             : 
     246                 :             :     // Check EDF queue first for deadline-ordered work
     247                 :       25665 :     if (pop_edf(out, worker_id)) {
     248                 :           3 :         return true;
     249                 :             :     }
     250                 :             : 
     251                 :             :     // Check priority queues from highest to lowest
     252                 :      128262 :     for (uint32_t p = 0; p < num_priorities_; ++p) {
     253                 :      102613 :         if (worker.queues[p].pop_bottom(out)) {
     254                 :          13 :             return true;
     255                 :             :         }
     256                 :             :     }
     257                 :       25649 :     return false;
     258                 :             : }
     259                 :             : 
     260                 :       52003 : bool HybridScheduler::pop_edf(WorkItem& out, uint32_t worker_id) {
     261                 :       52003 :     auto& worker = workers_[worker_id];
     262                 :             : 
     263                 :             :     // Check EDF queue
     264                 :       52003 :     if (worker.edf_queue.empty()) {
     265                 :       52000 :         return false;
     266                 :             :     }
     267                 :             : 
     268                 :             :     // Check if earliest deadline is urgent (within next ~10ms)
     269                 :             :     // For now, just return the earliest deadline item
     270                 :             :     int64_t deadline;
     271                 :           3 :     if (worker.edf_queue.peek(deadline)) {
     272                 :             :         // In a real implementation, we'd check if deadline < now + threshold
     273                 :             :         // For simplicity, just process EDF items when they exist
     274                 :           3 :         return worker.edf_queue.pop(out);
     275                 :             :     }
     276                 :           0 :     return false;
     277                 :             : }
     278                 :             : 
     279                 :           0 : void HybridScheduler::process_actor(ActorId actor) {
     280                 :           0 :     auto actor_ptr = system_.get_actor(actor);
     281                 :           0 :     if (!actor_ptr) {
     282                 :           0 :         return;
     283                 :             :     }
     284                 :             : 
     285                 :           0 :     auto mailbox = system_.get_mailbox(actor);
     286                 :           0 :     if (!mailbox) {
     287                 :           0 :         return;
     288                 :             :     }
     289                 :             : 
     290                 :           0 :     TypedMessage msg;
     291                 :           0 :     if (mailbox->try_pop(msg)) {
     292                 :           0 :         actor_ptr->receive(msg);
     293                 :             :     }
     294                 :           0 : }
     295                 :             : 
     296                 :         360 : void HybridScheduler::execute_actor(const WorkItem& item) {
     297                 :         360 :     auto actor_ptr = system_.get_actor(item.actor);
     298                 :         360 :     if (!actor_ptr || !actor_ptr->is_event_based_actor()) {
     299                 :           7 :         return;
     300                 :             :     }
     301                 :             : 
     302                 :         353 :     if (metrics_ring_buffer_) [[unlikely]] {
     303                 :         353 :         metrics::MetricEvent evt{};
     304                 :         353 :         evt.actor_id = item.actor;
     305                 :         353 :         evt.event_type = metrics::MetricEventType::kSchedulerDispatch;
     306                 :         353 :         evt.value_hi = 0; // worker_id filled by the steal event
     307                 :         353 :         metrics_ring_buffer_->try_push(evt);
     308                 :             :     }
     309                 :             : 
     310                 :         353 :     HPACTOR_LOG_DEBUG(
     311                 :             :         log::LogCategory::kScheduler, item.actor,
     312                 :             :         static_cast<uint32_t>(log::LogEventId::kSchedulerDispatch),
     313                 :             :         "actor dispatched",
     314                 :             :         log::field("worker_id", static_cast<uint64_t>(tl_current_worker_id)));
     315                 :             : 
     316                 :         353 :     auto* actor = static_cast<EventBasedActor*>(actor_ptr.get());
     317                 :             : 
     318                 :             :     // Set thread-local current actor ID so SlabAllocated::operator new and
     319                 :             :     // mem::current_actor_id() can attribute allocations to the correct actor.
     320                 :         353 :     mem::set_current_actor_id(item.actor);
     321                 :             : 
     322                 :             : #if HPACTOR_SUPPORT_COROUTINES
     323                 :         353 :     if (system_.use_coroutines()) {
     324                 :             :         // C++20 coroutine path (runtime opt-in via Config::use_coroutines)
     325                 :             :         // Lazily start the coroutine on first pickup
     326                 :          11 :         actor->ensure_coroutine_started();
     327                 :             : 
     328                 :          11 :         auto& coroutine = actor->get_actor_coroutine();
     329                 :          11 :         if (!coroutine)
     330                 :           0 :             return;
     331                 :             : 
     332                 :          11 :         auto& promise = coroutine.task().handle().promise();
     333                 :             : 
     334                 :             :         // First transition: kIdle/kIOWaiting → kReady (if needed)
     335                 :             :         // This handles the case where actor is picked up after suspending
     336                 :             :         // on mailbox (kIdle) or on timer/IO (kIOWaiting).
     337                 :          11 :         if (promise.actor_state->is_idle() || promise.actor_state->is_io_waiting()) {
     338                 :           0 :             promise.actor_state->set(ActorState::kReady);
     339                 :             :         }
     340                 :             : 
     341                 :             :         // Transition: Ready → Running
     342                 :             :         // If not in Ready state (already Running/Terminated), skip
     343                 :          11 :         uint32_t expected = ActorState::kReady;
     344                 :          11 :         if (!promise.actor_state->cas(expected, ActorState::kRunning)) {
     345                 :           0 :             if (promise.actor_state->is_terminated()) {
     346                 :           0 :                 actor->set_exit_reason(errors::actor_down);
     347                 :           0 :                 actor->on_exit();
     348                 :             :             }
     349                 :             :             // Already running or terminated by another path — skip
     350                 :           0 :             return;
     351                 :             :         }
     352                 :             : 
     353                 :             :         // Resume the coroutine
     354                 :          11 :         coroutine.resume();
     355                 :             : 
     356                 :             :         // Post-resume: coroutine suspended (Idle/IOWaiting) or terminated.
     357                 :             :         // Note: cannot access promise after resume() returns if coroutine
     358                 :             :         // terminated — the promise is destroyed with the coroutine frame. Use
     359                 :             :         // coroutine.done() which checks internal handle state (not the
     360                 :             :         // promise).
     361                 :          11 :         if (coroutine.done()) {
     362                 :           4 :             actor->on_exit();
     363                 :             :         }
     364                 :             :         // If idle or IOWaiting, the actor will be re-woken by:
     365                 :             :         // - MailboxAwaiter edge-trigger (MPSCActorMailbox::enqueue →
     366                 :             :         // notify_ready)
     367                 :             :         // - TimerAwaiter callback (EventLoop → notify_ready)
     368                 :             :         // Nothing to do here for suspended actors
     369                 :          11 :         return;
     370                 :             :     }
     371                 :             : #endif // HPACTOR_SUPPORT_COROUTINES
     372                 :             : 
     373                 :             :     // Behavior-based scheduling — state-aware CAS dispatch.
     374                 :             :     // Uses the same ActorState machine as the coroutine path:
     375                 :             :     // Idle -> Ready -> Running -> Idle (if empty) / Ready (if more work).
     376                 :         342 :     auto& actor_state = actor->actor_state();
     377                 :             : 
     378                 :             :     // First transition: kIdle → kReady (if actor is idle on first pickup)
     379                 :         342 :     if (actor_state.is_idle()) {
     380                 :           2 :         actor_state.set(ActorState::kReady);
     381                 :             :     }
     382                 :             : 
     383                 :             :     // Transition: Ready → Running
     384                 :         342 :     uint32_t expected = ActorState::kReady;
     385                 :         342 :     if (!actor_state.cas(expected, ActorState::kRunning)) {
     386                 :           0 :         if (actor_state.is_terminated()) {
     387                 :           0 :             actor->set_exit_reason(errors::actor_down);
     388                 :           0 :             actor->on_exit();
     389                 :             :         }
     390                 :           0 :         return;
     391                 :             :     }
     392                 :             : 
     393                 :         342 :     auto mailbox = system_.get_mailbox(item.actor);
     394                 :         342 :     if (!mailbox) {
     395                 :           0 :         actor_state.set(ActorState::kIdle);
     396                 :           0 :         return;
     397                 :             :     }
     398                 :             : 
     399                 :         342 :     TypedMessage msg;
     400                 :         342 :     if (mailbox->try_pop(msg)) {
     401                 :         321 :         actor->receive(msg);
     402                 :             :     }
     403                 :             : 
     404                 :         342 :     if (!mailbox->empty()) {
     405                 :             :         // More messages waiting — re-enqueue directly.
     406                 :             :         // We set kReady and push to a worker queue, bypassing the state
     407                 :             :         // gate in notify_ready() (which would skip kReady actors).
     408                 :         307 :         actor_state.set(ActorState::kReady);
     409                 :             :         static std::atomic<uint32_t> rr{0};
     410                 :         307 :         uint32_t v = rr.fetch_add(1, std::memory_order_relaxed);
     411                 :         307 :         workers_[v % num_workers_].queues[0].push_bottom(item);
     412                 :             :     } else {
     413                 :          35 :         actor_state.set(ActorState::kIdle);
     414                 :             :         // Double-check: a message may have arrived between the empty check
     415                 :             :         // and setting Idle.  notify_ready() will CAS Idle→Ready and enqueue.
     416                 :             :         // If we lost the race, our CAS fails and we skip the redundant enqueue.
     417                 :          35 :         if (!mailbox->empty()) {
     418                 :           0 :             expected = ActorState::kIdle;
     419                 :           0 :             if (actor_state.cas(expected, ActorState::kReady))
     420                 :           0 :                 notify_ready(item.actor, 0, INT64_MAX);
     421                 :             :         }
     422                 :             :     }
     423                 :         360 : }
     424                 :             : 
     425                 :       25665 : void HybridScheduler::wait_if_paused(uint32_t worker_id) {
     426                 :             :     (void)worker_id;
     427                 :       25665 :     if (!workers_paused_.load(std::memory_order_acquire)) {
     428                 :       25649 :         return;
     429                 :             :     }
     430                 :          16 :     parked_worker_count_.fetch_add(1, std::memory_order_relaxed);
     431                 :          16 :     std::unique_lock<std::mutex> lock(worker_control_mutex_);
     432                 :          16 :     worker_control_cv_.wait(lock, [this] {
     433                 :          48 :         return !workers_paused_.load(std::memory_order_acquire) ||
     434                 :          48 :                !running_.load(std::memory_order_acquire);
     435                 :             :     });
     436                 :          16 :     parked_worker_count_.fetch_sub(1, std::memory_order_relaxed);
     437                 :          16 : }
     438                 :             : 
     439                 :          30 : void HybridScheduler::mark_dispatch_begin() noexcept {
     440                 :          30 :     active_worker_dispatches_.fetch_add(1, std::memory_order_release);
     441                 :          30 : }
     442                 :             : 
     443                 :          30 : void HybridScheduler::mark_dispatch_end() noexcept {
     444                 :          30 :     active_worker_dispatches_.fetch_sub(1, std::memory_order_release);
     445                 :          30 : }
     446                 :             : 
     447                 :         100 : void HybridScheduler::worker_loop(uint32_t worker_id) {
     448                 :         100 :     tl_current_worker_id = worker_id; // set thread-local
     449                 :             : 
     450                 :         100 :     HPACTOR_LOG_DEBUG(log::LogCategory::kScheduler, ActorId{0}, 0, "worker started",
     451                 :             :                       log::field("worker_id", static_cast<uint64_t>(worker_id)));
     452                 :             : 
     453                 :       25765 :     while (running_.load(std::memory_order_acquire)) {
     454                 :       25665 :         wait_if_paused(worker_id);
     455                 :             : 
     456                 :       25665 :         WorkItem item;
     457                 :             : 
     458                 :             :         // Try local pop first (owner operation - wait-free)
     459                 :       25665 :         if (pop_local(item, worker_id)) {
     460                 :          16 :             mark_dispatch_begin();
     461                 :          16 :             execute_actor(item);
     462                 :          16 :             mark_dispatch_end();
     463                 :          30 :             continue;
     464                 :             :         }
     465                 :             : 
     466                 :             :         // Check EDF queue for deadline-ordered work
     467                 :       25649 :         if (pop_edf(item, worker_id)) {
     468                 :           0 :             mark_dispatch_begin();
     469                 :           0 :             execute_actor(item);
     470                 :           0 :             mark_dispatch_end();
     471                 :           0 :             continue;
     472                 :             :         }
     473                 :             : 
     474                 :             :         // Local empty - try stealing (lock-free but may fail)
     475                 :       25649 :         if (try_steal(item)) {
     476                 :          14 :             mark_dispatch_begin();
     477                 :          14 :             execute_actor(item);
     478                 :          14 :             mark_dispatch_end();
     479                 :          14 :             continue;
     480                 :             :         }
     481                 :             : 
     482                 :             :         // No work available - backoff
     483                 :       25635 :         backoff();
     484                 :             :     }
     485                 :         100 : }
     486                 :             : 
     487                 :           2 : uint32_t HybridScheduler::current_worker_id() const {
     488                 :           2 :     return tl_current_worker_id;
     489                 :             : }
     490                 :             : 
     491                 :       25635 : void HybridScheduler::backoff() {
     492                 :             :     // Exponential backoff: yield for small counts, sleep for larger
     493                 :             :     static thread_local uint32_t count = 0;
     494                 :       25635 :     uint32_t c = count++;
     495                 :             : 
     496                 :       25635 :     if (c < 4) {
     497                 :         364 :         std::this_thread::yield();
     498                 :             :     } else {
     499                 :             :         // Sleep for a short interval (exponential, capped)
     500                 :       25271 :         uint32_t backoff_us = std::min<uint32_t>(1024u, 10u << (c - 4));
     501                 :       25271 :         std::this_thread::sleep_for(std::chrono::microseconds(backoff_us));
     502                 :             :     }
     503                 :       25635 : }
     504                 :             : 
     505                 :             : uint64_t
     506                 :           0 : HybridScheduler::schedule_timer(int64_t delay_ns, timer_callback callback) {
     507                 :           0 :     return std::visit(
     508                 :           0 :         [&](auto& backend) {
     509                 :           0 :             return backend.schedule(delay_ns, std::move(callback));
     510                 :             :         },
     511                 :           0 :         timer_backend_);
     512                 :             : }
     513                 :             : 
     514                 :           0 : void HybridScheduler::advance_time(int64_t now_ns) {
     515                 :           0 :     std::visit([&](auto& backend) { backend.advance(now_ns); }, timer_backend_);
     516                 :           0 : }
     517                 :             : 
     518                 :           7 : TimerHandle HybridScheduler::schedule_after(timer_callback cb, int64_t delay_ns) {
     519                 :             :     auto id =
     520                 :          21 :         std::visit([&](auto& backend) { return backend.schedule(delay_ns, cb); },
     521                 :           7 :                    timer_backend_);
     522                 :           7 :     return TimerHandle{id};
     523                 :             : }
     524                 :             : 
     525                 :             : TimerHandle
     526                 :           0 : HybridScheduler::schedule_every(timer_callback cb, int64_t interval_ns) {
     527                 :             :     // For recurring timers, we wrap the callback to reschedule itself
     528                 :             :     // We need to use a shared_ptr to hold the interval value and the callback
     529                 :             :     // to avoid lifecycle issues with the lambda. We also use a cancellation
     530                 :             :     // flag to allow the recurring chain to be stopped.
     531                 :           0 :     auto cancelled = std::make_shared<std::atomic<bool>>(false);
     532                 :           0 :     auto interval = std::make_shared<int64_t>(interval_ns);
     533                 :           0 :     auto callback = std::make_shared<timer_callback>(std::move(cb));
     534                 :             : 
     535                 :           0 :     std::function<void()> recurring;
     536                 :           0 :     recurring = [this, cancelled, interval, callback, recurring]() {
     537                 :           0 :         if (cancelled->load(std::memory_order_acquire))
     538                 :           0 :             return;
     539                 :           0 :         if (running_.load(std::memory_order_acquire)) {
     540                 :           0 :             (*callback)();
     541                 :           0 :             if (!cancelled->load(std::memory_order_acquire)) {
     542                 :           0 :                 std::visit(
     543                 :           0 :                     [&](auto& backend) {
     544                 :           0 :                         static_cast<void>(backend.schedule(*interval, recurring));
     545                 :           0 :                     },
     546                 :           0 :                     timer_backend_);
     547                 :             :             }
     548                 :             :         }
     549                 :           0 :     };
     550                 :             : 
     551                 :           0 :     auto id = std::visit(
     552                 :           0 :         [&](auto& backend) { return backend.schedule(*interval, recurring); },
     553                 :           0 :         timer_backend_);
     554                 :             :     {
     555                 :           0 :         std::lock_guard<std::mutex> lock(cancellation_mutex_);
     556                 :           0 :         recurring_cancellations_[id] = cancelled;
     557                 :           0 :     }
     558                 :           0 :     return TimerHandle{id};
     559                 :           0 : }
     560                 :             : 
     561                 :           3 : void HybridScheduler::cancel_timer(TimerHandle handle) {
     562                 :           3 :     if (!handle.valid())
     563                 :           0 :         return;
     564                 :             : 
     565                 :           3 :     std::lock_guard<std::mutex> lock(cancellation_mutex_);
     566                 :           3 :     auto it = recurring_cancellations_.find(handle.value());
     567                 :           3 :     if (it != recurring_cancellations_.end()) {
     568                 :           0 :         it->second->store(true, std::memory_order_release);
     569                 :           0 :         recurring_cancellations_.erase(it);
     570                 :             :     }
     571                 :           9 :     std::visit([&](auto& backend) { backend.cancel(handle.value()); },
     572                 :           3 :                timer_backend_);
     573                 :           3 : }
     574                 :             : 
     575                 :           9 : void HybridScheduler::register_dedicated_thread(ActorId actor, int cpu_affinity) {
     576                 :           9 :     std::lock_guard<std::mutex> lock(dedicated_->dedicated_mutex_);
     577                 :           9 :     dedicated_->dedicated_thread_actors_.insert(actor);
     578                 :           9 :     if (cpu_affinity >= 0) {
     579                 :           0 :         dedicated_->dedicated_thread_affinity_[actor] = cpu_affinity;
     580                 :             :     }
     581                 :           9 : }
     582                 :             : 
     583                 :           3 : void HybridScheduler::register_dedicated_pool(ActorId actor, uint32_t pool_size) {
     584                 :           3 :     std::lock_guard<std::mutex> lock(dedicated_->dedicated_mutex_);
     585                 :           3 :     auto& pool = dedicated_->dedicated_pools_[pool_size];
     586                 :           3 :     if (!pool) {
     587                 :           2 :         pool = std::make_unique<DedicatedThreadPool>(pool_size);
     588                 :           2 :         pool->start();
     589                 :             :     }
     590                 :           3 :     dedicated_->actor_pool_map_[actor] = pool_size;
     591                 :           3 : }
     592                 :             : 
     593                 :           9 : void HybridScheduler::pause_workers() noexcept {
     594                 :           9 :     workers_paused_.store(true, std::memory_order_release);
     595                 :             :     // Wait until no worker is inside actor code.
     596                 :          18 :     while (active_worker_dispatches_.load(std::memory_order_acquire) > 0) {
     597                 :           0 :         std::this_thread::yield();
     598                 :             :     }
     599                 :           9 : }
     600                 :             : 
     601                 :         221 : void HybridScheduler::resume_workers() noexcept {
     602                 :         221 :     workers_paused_.store(false, std::memory_order_release);
     603                 :             :     {
     604                 :         221 :         std::lock_guard<std::mutex> lock(worker_control_mutex_);
     605                 :         221 :     }
     606                 :         221 :     worker_control_cv_.notify_all();
     607                 :         221 : }
     608                 :             : 
     609                 :           5 : bool HybridScheduler::workers_paused() const noexcept {
     610                 :           5 :     return workers_paused_.load(std::memory_order_acquire);
     611                 :             : }
     612                 :             : 
     613                 :         332 : bool HybridScheduler::pop_any_ready(WorkItem& out) {
     614                 :             :     // Scan all workers, stealing from each. We must use steal_top() rather
     615                 :             :     // than pop_bottom() because the caller (e.g., run_one_ready from test
     616                 :             :     // thread) is not the owning worker of any queue.
     617                 :         693 :     for (uint32_t w = 0; w < num_workers_; ++w) {
     618                 :         689 :         auto& worker = workers_[w];
     619                 :             :         // Check EDF first
     620                 :         689 :         if (pop_edf(out, w)) {
     621                 :           0 :             return true;
     622                 :             :         }
     623                 :             :         // Check priority queues, highest to lowest
     624                 :        2133 :         for (uint32_t p = 0; p < num_priorities_; ++p) {
     625                 :        1772 :             if (worker.queues[p].steal_top(out)) {
     626                 :         328 :                 return true;
     627                 :             :             }
     628                 :             :         }
     629                 :             :     }
     630                 :           4 :     return false;
     631                 :             : }
     632                 :             : 
     633                 :         333 : bool HybridScheduler::run_one_ready() {
     634                 :         333 :     if (!workers_paused_.load(std::memory_order_acquire)) {
     635                 :           1 :         return false;
     636                 :             :     }
     637                 :         332 :     WorkItem item;
     638                 :         332 :     if (!pop_any_ready(item)) {
     639                 :           4 :         return false;
     640                 :             :     }
     641                 :             :     // Temporarily set thread-local for metrics/logging attribution.
     642                 :         328 :     uint32_t saved_id = tl_current_worker_id;
     643                 :         328 :     tl_current_worker_id = UINT32_MAX;
     644                 :         328 :     execute_actor(item);
     645                 :         328 :     tl_current_worker_id = saved_id;
     646                 :         328 :     return true;
     647                 :             : }
     648                 :             : 
     649                 :           3 : SchedulerDrainResult HybridScheduler::drain_ready(size_t max_items) {
     650                 :           3 :     SchedulerDrainResult result;
     651                 :          17 :     for (size_t i = 0; i < max_items; ++i) {
     652                 :          17 :         if (!run_one_ready()) {
     653                 :           3 :             result.idle = true;
     654                 :           3 :             return result;
     655                 :             :         }
     656                 :          14 :         ++result.executed;
     657                 :             :     }
     658                 :           0 :     result.idle = false;
     659                 :           0 :     return result;
     660                 :             : }
     661                 :             : 
     662                 :           0 : void HybridScheduler::unregister_dedicated(ActorId actor) {
     663                 :           0 :     std::lock_guard<std::mutex> lock(dedicated_->dedicated_mutex_);
     664                 :           0 :     dedicated_->dedicated_thread_actors_.erase(actor);
     665                 :           0 :     dedicated_->dedicated_thread_affinity_.erase(actor);
     666                 :           0 :     dedicated_->actor_pool_map_.erase(actor);
     667                 :           0 : }
     668                 :             : 
     669                 :             : } // namespace hpactor::sched
        

Generated by: LCOV version 2.0-1