Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #include <hpactor/actor/event_based_actor.hpp>
16 : : #include <hpactor/core/actor_system.hpp>
17 : : #include <hpactor/hpactor_config.hpp>
18 : : #include <hpactor/log/log_field.hpp>
19 : : #include <hpactor/log/logger.hpp>
20 : : #include <hpactor/sched/dedicated_thread_pool.hpp>
21 : : #include <hpactor/sched/scheduler.hpp>
22 : :
23 : : #include <variant>
24 : :
25 : : #if HPACTOR_SUPPORT_COROUTINES
26 : : # include <hpactor/sched/coroutine_task.hpp>
27 : : #endif
28 : :
29 : : namespace hpactor::sched {
30 : :
31 : : // DedicatedStorage: holds per-actor dedicated execution state.
32 : : // Defined in .cpp to allow DedicatedThreadPool to remain incomplete in the
33 : : // header (libc++ noexcept containers require complete types).
34 : : struct HybridScheduler::DedicatedStorage {
35 : : // Dedicated thread tracking (thread lifecycle managed by DaemonActor)
36 : : std::unordered_set<ActorId> dedicated_thread_actors_;
37 : : std::unordered_map<ActorId, int> dedicated_thread_affinity_;
38 : : std::mutex dedicated_mutex_;
39 : :
40 : : // Dedicated thread pools (pool_size -> pool)
41 : : std::unordered_map<uint32_t, std::unique_ptr<DedicatedThreadPool>> dedicated_pools_;
42 : : std::unordered_map<ActorId, uint32_t> actor_pool_map_; // actor -> pool_size
43 : : };
44 : :
45 : : // Thread-local pointer to the current worker executing on this thread
46 : : thread_local uint32_t tl_current_worker_id = UINT32_MAX;
47 : :
48 : 106 : HybridScheduler::HybridScheduler(ActorSystem& system, uint32_t num_workers,
49 : : uint32_t num_priorities,
50 : 106 : TimerBackend timer_backend, bool start_paused)
51 : 106 : : system_(system), num_workers_(num_workers), num_priorities_(num_priorities),
52 : 318 : workers_(num_workers), a2ws_(num_workers), workers_paused_(start_paused),
53 : 318 : dedicated_(std::make_unique<DedicatedStorage>()) {
54 : 106 : switch (timer_backend) {
55 : 106 : case TimerBackend::TimingWheel:
56 : 106 : timer_backend_.emplace<TimingWheel>(1'000'000, 4);
57 : 106 : break;
58 : 0 : case TimerBackend::CalendarQueue:
59 : 0 : timer_backend_.emplace<CalendarQueue>();
60 : 0 : break;
61 : : }
62 : 206 : for (uint32_t i = 0; i < num_workers; ++i) {
63 : 100 : workers_[i].queues =
64 : 200 : std::make_unique<ChaselevDeque<WorkItem>[]>(num_priorities);
65 : 100 : workers_[i].index = i;
66 : : }
67 : 106 : }
68 : :
69 : 106 : void HybridScheduler::start() {
70 : 106 : if (running_.load(std::memory_order_acquire)) {
71 : 0 : return;
72 : : }
73 : 106 : running_.store(true, std::memory_order_release);
74 : :
75 : 106 : worker_threads_.reserve(workers_.size());
76 : 206 : for (size_t i = 0; i < workers_.size(); ++i) {
77 : 100 : worker_threads_.emplace_back(
78 : 200 : [this, i] { worker_loop(static_cast<uint32_t>(i)); });
79 : : }
80 : :
81 : : // Start timer advancement thread
82 : 212 : timer_thread_ = std::thread([this] {
83 : 19945 : while (running_.load(std::memory_order_acquire)) {
84 : 19839 : auto now = std::chrono::steady_clock::now().time_since_epoch().count();
85 : 39678 : std::visit([&](auto& backend) { backend.advance(now); }, timer_backend_);
86 : 19839 : std::this_thread::sleep_for(std::chrono::milliseconds(1));
87 : : }
88 : 106 : });
89 : : }
90 : :
91 : 212 : HybridScheduler::~HybridScheduler() {
92 : 106 : stop();
93 : 212 : }
94 : :
95 : 212 : void HybridScheduler::stop() {
96 : 212 : running_.store(false, std::memory_order_release);
97 : : // Wake any workers parked in wait_if_paused so they see running_ == false
98 : : // and exit their loop.
99 : 212 : resume_workers();
100 : 312 : for (auto& t : worker_threads_) {
101 : 100 : if (t.joinable())
102 : 100 : t.join();
103 : : }
104 : 212 : worker_threads_.clear();
105 : :
106 : : // Stop and join timer thread
107 : 212 : if (timer_thread_.joinable()) {
108 : 106 : timer_thread_.join();
109 : : }
110 : 212 : }
111 : :
112 : 159 : void HybridScheduler::notify_ready(ActorId actor, uint8_t priority,
113 : : int64_t deadline_ns) {
114 : 159 : if (!running_.load(std::memory_order_acquire)) {
115 : 108 : return;
116 : : }
117 : :
118 : 159 : WorkItem item{actor, deadline_ns, 0};
119 : :
120 : 159 : DedicatedThreadPool* dedicated_pool = nullptr;
121 : : {
122 : 159 : std::lock_guard<std::mutex> lock(dedicated_->dedicated_mutex_);
123 : :
124 : 159 : if (dedicated_->dedicated_thread_actors_.find(actor) !=
125 : 318 : dedicated_->dedicated_thread_actors_.end()) {
126 : 1 : return;
127 : : }
128 : :
129 : 158 : auto actor_pool = dedicated_->actor_pool_map_.find(actor);
130 : 158 : if (actor_pool != dedicated_->actor_pool_map_.end()) {
131 : 2 : auto pool = dedicated_->dedicated_pools_.find(actor_pool->second);
132 : 2 : if (pool != dedicated_->dedicated_pools_.end()) {
133 : 2 : dedicated_pool = pool->second.get();
134 : : }
135 : : }
136 : 159 : }
137 : :
138 : 158 : if (dedicated_pool != nullptr) {
139 : 4 : dedicated_pool->enqueue(actor, [this, item]() { execute_actor(item); });
140 : 2 : return;
141 : : }
142 : :
143 : : // Cooperative path: gate on actor state to prevent double-enqueue.
144 : : // If the actor is already executing (Running) or enqueued (Ready),
145 : : // a second WorkItem would be redundant. CAS Idle→Ready atomically.
146 : 156 : auto actor_ptr = system_.get_actor(actor);
147 : 156 : if (actor_ptr && actor_ptr->is_event_based_actor()) {
148 : 149 : auto* eb = static_cast<EventBasedActor*>(actor_ptr.get());
149 : 149 : auto& state = eb->actor_state();
150 : 149 : uint32_t current = state.get();
151 : 149 : if (current == ActorState::kReady || current == ActorState::kRunning)
152 : 38 : return;
153 : 111 : if (current == ActorState::kTerminated)
154 : 0 : return;
155 : 111 : if (!state.cas(current, ActorState::kReady))
156 : 0 : return; // another thread won the race
157 : : }
158 : :
159 : : // Round-robin across workers for fair initial placement.
160 : : // The atomic counter avoids the stale hint issue where get_victim always
161 : : // returns the same value because record_attempt is only called on steals.
162 : 118 : if (num_workers_ == 0)
163 : 67 : return;
164 : : static std::atomic<uint32_t> rr_counter{0};
165 : 51 : uint32_t victim = rr_counter.fetch_add(1, std::memory_order_relaxed);
166 : :
167 : : // If deadline is INT64_MAX, use priority queue; otherwise use EDF queue
168 : 51 : if (deadline_ns == INT64_MAX) {
169 : : // Push to priority queue using A2WS-selected victim
170 : 48 : workers_[victim % num_workers_].queues[priority].push_bottom(item);
171 : : } else {
172 : : // Push to EDF queue for deadline-ordered processing
173 : 3 : workers_[victim % num_workers_].edf_queue.push(deadline_ns, item);
174 : : }
175 : 156 : }
176 : :
177 : 0 : void HybridScheduler::notify_idle(ActorId actor) {
178 : : // Remove actor from EDF tracking if it was scheduled there
179 : : // For now, this is a stub - full implementation would need EDF cancellation
180 : : (void)actor;
181 : 0 : }
182 : :
183 : 0 : void HybridScheduler::yield(ActorId actor, uint8_t priority) {
184 : 0 : notify_ready(actor, priority, INT64_MAX);
185 : 0 : }
186 : :
187 : 25649 : bool HybridScheduler::try_steal(WorkItem& out) {
188 : : // Use A2WS for adaptive victim selection
189 : 88667 : for (uint32_t attempt = 0; attempt < num_workers_; ++attempt) {
190 : : // Get next victim from A2WS
191 : 63032 : uint32_t victim_idx = a2ws_.get_victim(attempt % num_workers_);
192 : :
193 : 63032 : auto& victim = workers_[victim_idx];
194 : :
195 : : // Try EDF queue first (deadline-ordered work has highest urgency)
196 : 63032 : if (victim.edf_queue.pop(out)) {
197 : 0 : a2ws_.record_steal(attempt % num_workers_, victim_idx);
198 : 0 : if (metrics_ring_buffer_) [[unlikely]] {
199 : 0 : metrics::MetricEvent evt{};
200 : 0 : evt.actor_id = out.actor;
201 : 0 : evt.event_type = metrics::MetricEventType::kSchedulerSteal;
202 : 0 : evt.value_hi = victim_idx;
203 : 0 : metrics_ring_buffer_->try_push(evt);
204 : : }
205 : 0 : HPACTOR_LOG_DEBUG(
206 : : log::LogCategory::kScheduler, out.actor,
207 : : static_cast<uint32_t>(log::LogEventId::kSchedulerSteal),
208 : : "work stolen",
209 : : log::field("from_worker", static_cast<uint64_t>(victim_idx)),
210 : : log::field("to_worker",
211 : : static_cast<uint64_t>(tl_current_worker_id)));
212 : 0 : return true;
213 : : }
214 : :
215 : : // Try each priority level from highest to lowest
216 : 315106 : for (uint32_t p = 0; p < num_priorities_; ++p) {
217 : 252088 : if (victim.queues[p].steal_top(out)) {
218 : 14 : a2ws_.record_steal(attempt % num_workers_, victim_idx);
219 : 14 : if (metrics_ring_buffer_) [[unlikely]] {
220 : 14 : metrics::MetricEvent evt{};
221 : 14 : evt.actor_id = out.actor;
222 : 14 : evt.event_type = metrics::MetricEventType::kSchedulerSteal;
223 : 14 : evt.value_hi = victim_idx;
224 : 14 : metrics_ring_buffer_->try_push(evt);
225 : : }
226 : 14 : HPACTOR_LOG_DEBUG(
227 : : log::LogCategory::kScheduler, out.actor,
228 : : static_cast<uint32_t>(log::LogEventId::kSchedulerSteal),
229 : : "work stolen",
230 : : log::field("from_worker", static_cast<uint64_t>(victim_idx)),
231 : : log::field("to_worker",
232 : : static_cast<uint64_t>(tl_current_worker_id)));
233 : 14 : return true;
234 : : }
235 : : }
236 : :
237 : : // Record failed attempt
238 : 63018 : a2ws_.record_attempt(attempt % num_workers_, victim_idx, false);
239 : : }
240 : 25635 : return false;
241 : : }
242 : :
243 : 25665 : bool HybridScheduler::pop_local(WorkItem& out, uint32_t worker_id) {
244 : 25665 : auto& worker = workers_[worker_id];
245 : :
246 : : // Check EDF queue first for deadline-ordered work
247 : 25665 : if (pop_edf(out, worker_id)) {
248 : 3 : return true;
249 : : }
250 : :
251 : : // Check priority queues from highest to lowest
252 : 128262 : for (uint32_t p = 0; p < num_priorities_; ++p) {
253 : 102613 : if (worker.queues[p].pop_bottom(out)) {
254 : 13 : return true;
255 : : }
256 : : }
257 : 25649 : return false;
258 : : }
259 : :
260 : 52003 : bool HybridScheduler::pop_edf(WorkItem& out, uint32_t worker_id) {
261 : 52003 : auto& worker = workers_[worker_id];
262 : :
263 : : // Check EDF queue
264 : 52003 : if (worker.edf_queue.empty()) {
265 : 52000 : return false;
266 : : }
267 : :
268 : : // Check if earliest deadline is urgent (within next ~10ms)
269 : : // For now, just return the earliest deadline item
270 : : int64_t deadline;
271 : 3 : if (worker.edf_queue.peek(deadline)) {
272 : : // In a real implementation, we'd check if deadline < now + threshold
273 : : // For simplicity, just process EDF items when they exist
274 : 3 : return worker.edf_queue.pop(out);
275 : : }
276 : 0 : return false;
277 : : }
278 : :
279 : 0 : void HybridScheduler::process_actor(ActorId actor) {
280 : 0 : auto actor_ptr = system_.get_actor(actor);
281 : 0 : if (!actor_ptr) {
282 : 0 : return;
283 : : }
284 : :
285 : 0 : auto mailbox = system_.get_mailbox(actor);
286 : 0 : if (!mailbox) {
287 : 0 : return;
288 : : }
289 : :
290 : 0 : TypedMessage msg;
291 : 0 : if (mailbox->try_pop(msg)) {
292 : 0 : actor_ptr->receive(msg);
293 : : }
294 : 0 : }
295 : :
296 : 360 : void HybridScheduler::execute_actor(const WorkItem& item) {
297 : 360 : auto actor_ptr = system_.get_actor(item.actor);
298 : 360 : if (!actor_ptr || !actor_ptr->is_event_based_actor()) {
299 : 7 : return;
300 : : }
301 : :
302 : 353 : if (metrics_ring_buffer_) [[unlikely]] {
303 : 353 : metrics::MetricEvent evt{};
304 : 353 : evt.actor_id = item.actor;
305 : 353 : evt.event_type = metrics::MetricEventType::kSchedulerDispatch;
306 : 353 : evt.value_hi = 0; // worker_id filled by the steal event
307 : 353 : metrics_ring_buffer_->try_push(evt);
308 : : }
309 : :
310 : 353 : HPACTOR_LOG_DEBUG(
311 : : log::LogCategory::kScheduler, item.actor,
312 : : static_cast<uint32_t>(log::LogEventId::kSchedulerDispatch),
313 : : "actor dispatched",
314 : : log::field("worker_id", static_cast<uint64_t>(tl_current_worker_id)));
315 : :
316 : 353 : auto* actor = static_cast<EventBasedActor*>(actor_ptr.get());
317 : :
318 : : // Set thread-local current actor ID so SlabAllocated::operator new and
319 : : // mem::current_actor_id() can attribute allocations to the correct actor.
320 : 353 : mem::set_current_actor_id(item.actor);
321 : :
322 : : #if HPACTOR_SUPPORT_COROUTINES
323 : 353 : if (system_.use_coroutines()) {
324 : : // C++20 coroutine path (runtime opt-in via Config::use_coroutines)
325 : : // Lazily start the coroutine on first pickup
326 : 11 : actor->ensure_coroutine_started();
327 : :
328 : 11 : auto& coroutine = actor->get_actor_coroutine();
329 : 11 : if (!coroutine)
330 : 0 : return;
331 : :
332 : 11 : auto& promise = coroutine.task().handle().promise();
333 : :
334 : : // First transition: kIdle/kIOWaiting → kReady (if needed)
335 : : // This handles the case where actor is picked up after suspending
336 : : // on mailbox (kIdle) or on timer/IO (kIOWaiting).
337 : 11 : if (promise.actor_state->is_idle() || promise.actor_state->is_io_waiting()) {
338 : 0 : promise.actor_state->set(ActorState::kReady);
339 : : }
340 : :
341 : : // Transition: Ready → Running
342 : : // If not in Ready state (already Running/Terminated), skip
343 : 11 : uint32_t expected = ActorState::kReady;
344 : 11 : if (!promise.actor_state->cas(expected, ActorState::kRunning)) {
345 : 0 : if (promise.actor_state->is_terminated()) {
346 : 0 : actor->set_exit_reason(errors::actor_down);
347 : 0 : actor->on_exit();
348 : : }
349 : : // Already running or terminated by another path — skip
350 : 0 : return;
351 : : }
352 : :
353 : : // Resume the coroutine
354 : 11 : coroutine.resume();
355 : :
356 : : // Post-resume: coroutine suspended (Idle/IOWaiting) or terminated.
357 : : // Note: cannot access promise after resume() returns if coroutine
358 : : // terminated — the promise is destroyed with the coroutine frame. Use
359 : : // coroutine.done() which checks internal handle state (not the
360 : : // promise).
361 : 11 : if (coroutine.done()) {
362 : 4 : actor->on_exit();
363 : : }
364 : : // If idle or IOWaiting, the actor will be re-woken by:
365 : : // - MailboxAwaiter edge-trigger (MPSCActorMailbox::enqueue →
366 : : // notify_ready)
367 : : // - TimerAwaiter callback (EventLoop → notify_ready)
368 : : // Nothing to do here for suspended actors
369 : 11 : return;
370 : : }
371 : : #endif // HPACTOR_SUPPORT_COROUTINES
372 : :
373 : : // Behavior-based scheduling — state-aware CAS dispatch.
374 : : // Uses the same ActorState machine as the coroutine path:
375 : : // Idle -> Ready -> Running -> Idle (if empty) / Ready (if more work).
376 : 342 : auto& actor_state = actor->actor_state();
377 : :
378 : : // First transition: kIdle → kReady (if actor is idle on first pickup)
379 : 342 : if (actor_state.is_idle()) {
380 : 2 : actor_state.set(ActorState::kReady);
381 : : }
382 : :
383 : : // Transition: Ready → Running
384 : 342 : uint32_t expected = ActorState::kReady;
385 : 342 : if (!actor_state.cas(expected, ActorState::kRunning)) {
386 : 0 : if (actor_state.is_terminated()) {
387 : 0 : actor->set_exit_reason(errors::actor_down);
388 : 0 : actor->on_exit();
389 : : }
390 : 0 : return;
391 : : }
392 : :
393 : 342 : auto mailbox = system_.get_mailbox(item.actor);
394 : 342 : if (!mailbox) {
395 : 0 : actor_state.set(ActorState::kIdle);
396 : 0 : return;
397 : : }
398 : :
399 : 342 : TypedMessage msg;
400 : 342 : if (mailbox->try_pop(msg)) {
401 : 321 : actor->receive(msg);
402 : : }
403 : :
404 : 342 : if (!mailbox->empty()) {
405 : : // More messages waiting — re-enqueue directly.
406 : : // We set kReady and push to a worker queue, bypassing the state
407 : : // gate in notify_ready() (which would skip kReady actors).
408 : 307 : actor_state.set(ActorState::kReady);
409 : : static std::atomic<uint32_t> rr{0};
410 : 307 : uint32_t v = rr.fetch_add(1, std::memory_order_relaxed);
411 : 307 : workers_[v % num_workers_].queues[0].push_bottom(item);
412 : : } else {
413 : 35 : actor_state.set(ActorState::kIdle);
414 : : // Double-check: a message may have arrived between the empty check
415 : : // and setting Idle. notify_ready() will CAS Idle→Ready and enqueue.
416 : : // If we lost the race, our CAS fails and we skip the redundant enqueue.
417 : 35 : if (!mailbox->empty()) {
418 : 0 : expected = ActorState::kIdle;
419 : 0 : if (actor_state.cas(expected, ActorState::kReady))
420 : 0 : notify_ready(item.actor, 0, INT64_MAX);
421 : : }
422 : : }
423 : 360 : }
424 : :
425 : 25665 : void HybridScheduler::wait_if_paused(uint32_t worker_id) {
426 : : (void)worker_id;
427 : 25665 : if (!workers_paused_.load(std::memory_order_acquire)) {
428 : 25649 : return;
429 : : }
430 : 16 : parked_worker_count_.fetch_add(1, std::memory_order_relaxed);
431 : 16 : std::unique_lock<std::mutex> lock(worker_control_mutex_);
432 : 16 : worker_control_cv_.wait(lock, [this] {
433 : 48 : return !workers_paused_.load(std::memory_order_acquire) ||
434 : 48 : !running_.load(std::memory_order_acquire);
435 : : });
436 : 16 : parked_worker_count_.fetch_sub(1, std::memory_order_relaxed);
437 : 16 : }
438 : :
439 : 30 : void HybridScheduler::mark_dispatch_begin() noexcept {
440 : 30 : active_worker_dispatches_.fetch_add(1, std::memory_order_release);
441 : 30 : }
442 : :
443 : 30 : void HybridScheduler::mark_dispatch_end() noexcept {
444 : 30 : active_worker_dispatches_.fetch_sub(1, std::memory_order_release);
445 : 30 : }
446 : :
447 : 100 : void HybridScheduler::worker_loop(uint32_t worker_id) {
448 : 100 : tl_current_worker_id = worker_id; // set thread-local
449 : :
450 : 100 : HPACTOR_LOG_DEBUG(log::LogCategory::kScheduler, ActorId{0}, 0, "worker started",
451 : : log::field("worker_id", static_cast<uint64_t>(worker_id)));
452 : :
453 : 25765 : while (running_.load(std::memory_order_acquire)) {
454 : 25665 : wait_if_paused(worker_id);
455 : :
456 : 25665 : WorkItem item;
457 : :
458 : : // Try local pop first (owner operation - wait-free)
459 : 25665 : if (pop_local(item, worker_id)) {
460 : 16 : mark_dispatch_begin();
461 : 16 : execute_actor(item);
462 : 16 : mark_dispatch_end();
463 : 30 : continue;
464 : : }
465 : :
466 : : // Check EDF queue for deadline-ordered work
467 : 25649 : if (pop_edf(item, worker_id)) {
468 : 0 : mark_dispatch_begin();
469 : 0 : execute_actor(item);
470 : 0 : mark_dispatch_end();
471 : 0 : continue;
472 : : }
473 : :
474 : : // Local empty - try stealing (lock-free but may fail)
475 : 25649 : if (try_steal(item)) {
476 : 14 : mark_dispatch_begin();
477 : 14 : execute_actor(item);
478 : 14 : mark_dispatch_end();
479 : 14 : continue;
480 : : }
481 : :
482 : : // No work available - backoff
483 : 25635 : backoff();
484 : : }
485 : 100 : }
486 : :
487 : 2 : uint32_t HybridScheduler::current_worker_id() const {
488 : 2 : return tl_current_worker_id;
489 : : }
490 : :
491 : 25635 : void HybridScheduler::backoff() {
492 : : // Exponential backoff: yield for small counts, sleep for larger
493 : : static thread_local uint32_t count = 0;
494 : 25635 : uint32_t c = count++;
495 : :
496 : 25635 : if (c < 4) {
497 : 364 : std::this_thread::yield();
498 : : } else {
499 : : // Sleep for a short interval (exponential, capped)
500 : 25271 : uint32_t backoff_us = std::min<uint32_t>(1024u, 10u << (c - 4));
501 : 25271 : std::this_thread::sleep_for(std::chrono::microseconds(backoff_us));
502 : : }
503 : 25635 : }
504 : :
505 : : uint64_t
506 : 0 : HybridScheduler::schedule_timer(int64_t delay_ns, timer_callback callback) {
507 : 0 : return std::visit(
508 : 0 : [&](auto& backend) {
509 : 0 : return backend.schedule(delay_ns, std::move(callback));
510 : : },
511 : 0 : timer_backend_);
512 : : }
513 : :
514 : 0 : void HybridScheduler::advance_time(int64_t now_ns) {
515 : 0 : std::visit([&](auto& backend) { backend.advance(now_ns); }, timer_backend_);
516 : 0 : }
517 : :
518 : 7 : TimerHandle HybridScheduler::schedule_after(timer_callback cb, int64_t delay_ns) {
519 : : auto id =
520 : 21 : std::visit([&](auto& backend) { return backend.schedule(delay_ns, cb); },
521 : 7 : timer_backend_);
522 : 7 : return TimerHandle{id};
523 : : }
524 : :
525 : : TimerHandle
526 : 0 : HybridScheduler::schedule_every(timer_callback cb, int64_t interval_ns) {
527 : : // For recurring timers, we wrap the callback to reschedule itself
528 : : // We need to use a shared_ptr to hold the interval value and the callback
529 : : // to avoid lifecycle issues with the lambda. We also use a cancellation
530 : : // flag to allow the recurring chain to be stopped.
531 : 0 : auto cancelled = std::make_shared<std::atomic<bool>>(false);
532 : 0 : auto interval = std::make_shared<int64_t>(interval_ns);
533 : 0 : auto callback = std::make_shared<timer_callback>(std::move(cb));
534 : :
535 : 0 : std::function<void()> recurring;
536 : 0 : recurring = [this, cancelled, interval, callback, recurring]() {
537 : 0 : if (cancelled->load(std::memory_order_acquire))
538 : 0 : return;
539 : 0 : if (running_.load(std::memory_order_acquire)) {
540 : 0 : (*callback)();
541 : 0 : if (!cancelled->load(std::memory_order_acquire)) {
542 : 0 : std::visit(
543 : 0 : [&](auto& backend) {
544 : 0 : static_cast<void>(backend.schedule(*interval, recurring));
545 : 0 : },
546 : 0 : timer_backend_);
547 : : }
548 : : }
549 : 0 : };
550 : :
551 : 0 : auto id = std::visit(
552 : 0 : [&](auto& backend) { return backend.schedule(*interval, recurring); },
553 : 0 : timer_backend_);
554 : : {
555 : 0 : std::lock_guard<std::mutex> lock(cancellation_mutex_);
556 : 0 : recurring_cancellations_[id] = cancelled;
557 : 0 : }
558 : 0 : return TimerHandle{id};
559 : 0 : }
560 : :
561 : 3 : void HybridScheduler::cancel_timer(TimerHandle handle) {
562 : 3 : if (!handle.valid())
563 : 0 : return;
564 : :
565 : 3 : std::lock_guard<std::mutex> lock(cancellation_mutex_);
566 : 3 : auto it = recurring_cancellations_.find(handle.value());
567 : 3 : if (it != recurring_cancellations_.end()) {
568 : 0 : it->second->store(true, std::memory_order_release);
569 : 0 : recurring_cancellations_.erase(it);
570 : : }
571 : 9 : std::visit([&](auto& backend) { backend.cancel(handle.value()); },
572 : 3 : timer_backend_);
573 : 3 : }
574 : :
575 : 9 : void HybridScheduler::register_dedicated_thread(ActorId actor, int cpu_affinity) {
576 : 9 : std::lock_guard<std::mutex> lock(dedicated_->dedicated_mutex_);
577 : 9 : dedicated_->dedicated_thread_actors_.insert(actor);
578 : 9 : if (cpu_affinity >= 0) {
579 : 0 : dedicated_->dedicated_thread_affinity_[actor] = cpu_affinity;
580 : : }
581 : 9 : }
582 : :
583 : 3 : void HybridScheduler::register_dedicated_pool(ActorId actor, uint32_t pool_size) {
584 : 3 : std::lock_guard<std::mutex> lock(dedicated_->dedicated_mutex_);
585 : 3 : auto& pool = dedicated_->dedicated_pools_[pool_size];
586 : 3 : if (!pool) {
587 : 2 : pool = std::make_unique<DedicatedThreadPool>(pool_size);
588 : 2 : pool->start();
589 : : }
590 : 3 : dedicated_->actor_pool_map_[actor] = pool_size;
591 : 3 : }
592 : :
593 : 9 : void HybridScheduler::pause_workers() noexcept {
594 : 9 : workers_paused_.store(true, std::memory_order_release);
595 : : // Wait until no worker is inside actor code.
596 : 18 : while (active_worker_dispatches_.load(std::memory_order_acquire) > 0) {
597 : 0 : std::this_thread::yield();
598 : : }
599 : 9 : }
600 : :
601 : 221 : void HybridScheduler::resume_workers() noexcept {
602 : 221 : workers_paused_.store(false, std::memory_order_release);
603 : : {
604 : 221 : std::lock_guard<std::mutex> lock(worker_control_mutex_);
605 : 221 : }
606 : 221 : worker_control_cv_.notify_all();
607 : 221 : }
608 : :
609 : 5 : bool HybridScheduler::workers_paused() const noexcept {
610 : 5 : return workers_paused_.load(std::memory_order_acquire);
611 : : }
612 : :
613 : 332 : bool HybridScheduler::pop_any_ready(WorkItem& out) {
614 : : // Scan all workers, stealing from each. We must use steal_top() rather
615 : : // than pop_bottom() because the caller (e.g., run_one_ready from test
616 : : // thread) is not the owning worker of any queue.
617 : 693 : for (uint32_t w = 0; w < num_workers_; ++w) {
618 : 689 : auto& worker = workers_[w];
619 : : // Check EDF first
620 : 689 : if (pop_edf(out, w)) {
621 : 0 : return true;
622 : : }
623 : : // Check priority queues, highest to lowest
624 : 2133 : for (uint32_t p = 0; p < num_priorities_; ++p) {
625 : 1772 : if (worker.queues[p].steal_top(out)) {
626 : 328 : return true;
627 : : }
628 : : }
629 : : }
630 : 4 : return false;
631 : : }
632 : :
633 : 333 : bool HybridScheduler::run_one_ready() {
634 : 333 : if (!workers_paused_.load(std::memory_order_acquire)) {
635 : 1 : return false;
636 : : }
637 : 332 : WorkItem item;
638 : 332 : if (!pop_any_ready(item)) {
639 : 4 : return false;
640 : : }
641 : : // Temporarily set thread-local for metrics/logging attribution.
642 : 328 : uint32_t saved_id = tl_current_worker_id;
643 : 328 : tl_current_worker_id = UINT32_MAX;
644 : 328 : execute_actor(item);
645 : 328 : tl_current_worker_id = saved_id;
646 : 328 : return true;
647 : : }
648 : :
649 : 3 : SchedulerDrainResult HybridScheduler::drain_ready(size_t max_items) {
650 : 3 : SchedulerDrainResult result;
651 : 17 : for (size_t i = 0; i < max_items; ++i) {
652 : 17 : if (!run_one_ready()) {
653 : 3 : result.idle = true;
654 : 3 : return result;
655 : : }
656 : 14 : ++result.executed;
657 : : }
658 : 0 : result.idle = false;
659 : 0 : return result;
660 : : }
661 : :
662 : 0 : void HybridScheduler::unregister_dedicated(ActorId actor) {
663 : 0 : std::lock_guard<std::mutex> lock(dedicated_->dedicated_mutex_);
664 : 0 : dedicated_->dedicated_thread_actors_.erase(actor);
665 : 0 : dedicated_->dedicated_thread_affinity_.erase(actor);
666 : 0 : dedicated_->actor_pool_map_.erase(actor);
667 : 0 : }
668 : :
669 : : } // namespace hpactor::sched
|