Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #pragma once
16 : :
17 : : #include <hpactor/actor/actor_fwd.hpp>
18 : : #include <hpactor/adt/id.hpp>
19 : : #include <hpactor/adt/tags.hpp>
20 : : #include <hpactor/metrics/metrics_event.hpp>
21 : : #include <hpactor/metrics/metrics_ring_buffer.hpp>
22 : : #include <hpactor/sched/a2ws.hpp>
23 : : #include <hpactor/sched/calendar_queue.hpp>
24 : : #include <hpactor/sched/edf_queue.hpp>
25 : : #include <hpactor/sched/timing_wheel.hpp>
26 : : #include <hpactor/sched/work_queue.hpp>
27 : :
28 : : #include <atomic>
29 : : #include <condition_variable>
30 : : #include <cstdint>
31 : : #include <functional>
32 : : #include <memory>
33 : : #include <mutex>
34 : : #include <thread>
35 : : #include <unordered_map>
36 : : #include <unordered_set>
37 : : #include <variant>
38 : : #include <vector>
39 : :
40 : : namespace hpactor {
41 : :
42 : : // Forward declare ActorSystem to avoid circular include
43 : : class ActorSystem;
44 : :
45 : : namespace log {
46 : : class Logger;
47 : : } // namespace log
48 : :
49 : : } // namespace hpactor
50 : :
51 : : namespace hpactor::sched {
52 : :
53 : : class DedicatedThreadPool; // forward decl
54 : :
55 : : // -----------------------------------------------------------------------------
56 : : // TimerHandle and timer_callback types
57 : : // -----------------------------------------------------------------------------
58 : : using TimerHandle = Id<TimerTag>;
59 : :
60 : : using timer_callback = std::function<void()>;
61 : :
62 : : enum class TimerBackend : uint8_t { TimingWheel = 0, CalendarQueue = 1 };
63 : :
64 : : struct SchedulerDrainResult {
65 : : size_t executed = 0;
66 : : bool idle = true;
67 : : };
68 : :
69 : : // -----------------------------------------------------------------------------
70 : : // IScheduler: interface for actor schedulers
71 : : // -----------------------------------------------------------------------------
72 : : class IScheduler {
73 : : public:
74 : 120 : virtual ~IScheduler() = default;
75 : :
76 : 0 : virtual void set_metrics_ring_buffer(void* /*buf*/) {}
77 : 0 : virtual void set_logger(void* /*logger*/) noexcept {}
78 : :
79 : : // Start the scheduler
80 : : virtual void start() = 0;
81 : :
82 : : // Stop the scheduler
83 : : virtual void stop() = 0;
84 : :
85 : : // Thread-safe; may be called from any thread including I/O threads
86 : : // Notify scheduler that an actor is ready to run at given priority
87 : : virtual void
88 : : notify_ready(ActorId actor, uint8_t priority, int64_t deadline_ns) = 0;
89 : :
90 : : // Notify scheduler that an actor has become idle (blocked on I/O, etc.)
91 : : virtual void notify_idle(ActorId actor) = 0;
92 : :
93 : : // Voluntarily yield — re-enqueue actor at same priority for cooperative
94 : : // multitasking
95 : : virtual void yield(ActorId actor, uint8_t priority) = 0;
96 : :
97 : : // Schedule a one-shot timer to fire after delay_ns
98 : : virtual TimerHandle schedule_after(timer_callback cb, int64_t delay_ns) = 0;
99 : :
100 : : // Schedule a recurring timer to fire every interval_ns
101 : : virtual TimerHandle schedule_every(timer_callback cb, int64_t interval_ns) = 0;
102 : :
103 : : // Cancel a previously scheduled timer
104 : : virtual void cancel_timer(TimerHandle handle) = 0;
105 : :
106 : : // Number of worker threads
107 : : virtual size_t worker_count() const = 0;
108 : :
109 : : // Check if scheduler is running
110 : : virtual bool is_running() const = 0;
111 : :
112 : : // Register an actor that needs a dedicated OS thread.
113 : : // cpu_affinity: -1 = no affinity, >=0 = pin to specific core.
114 : : virtual void register_dedicated_thread(ActorId actor, int cpu_affinity) = 0;
115 : :
116 : : // Register an actor that needs a dedicated worker pool.
117 : : // The scheduler creates or reuses a DedicatedThreadPool of the given size.
118 : : virtual void register_dedicated_pool(ActorId actor, uint32_t pool_size) = 0;
119 : :
120 : : // Shutdown a dedicated execution context for an actor.
121 : : virtual void unregister_dedicated(ActorId actor) = 0;
122 : :
123 : : // Worker control (intended for deterministic testing).
124 : 0 : virtual void pause_workers() noexcept {}
125 : 0 : virtual void resume_workers() noexcept {}
126 : 0 : virtual bool workers_paused() const noexcept {
127 : 0 : return false;
128 : : }
129 : 0 : virtual bool run_one_ready() {
130 : 0 : return false;
131 : : }
132 : 0 : virtual SchedulerDrainResult drain_ready(size_t max_items) {
133 : 0 : SchedulerDrainResult result;
134 : 0 : for (size_t i = 0; i < max_items; ++i) {
135 : 0 : if (!run_one_ready()) {
136 : 0 : result.idle = true;
137 : 0 : return result;
138 : : }
139 : 0 : ++result.executed;
140 : : }
141 : 0 : result.idle = false;
142 : 0 : return result;
143 : : }
144 : : };
145 : :
146 : : // -----------------------------------------------------------------------------
147 : : // HybridScheduler: work-stealing scheduler with priority queues
148 : : // -----------------------------------------------------------------------------
149 : : // Each worker has its own ChaseLev deque. Work-stealing is done by trying
150 : : // to pop from the target worker's deque when local work is exhausted.
151 : : // Priority levels 0-3 (0 = highest).
152 : : //
153 : : // Uses MultiPriorityWorkQueue for priority-based local enqueue.
154 : : // Work-stealing is attempted in round-robin order across workers.
155 : : // -----------------------------------------------------------------------------
156 : : class HybridScheduler : public IScheduler {
157 : : public:
158 : : // num_priorities: number of priority levels (default 4, priorities 0..N-1)
159 : : // ActorSystem reference is held for processing actors
160 : : explicit HybridScheduler(ActorSystem& system, uint32_t num_workers,
161 : : uint32_t num_priorities = 4,
162 : : TimerBackend timer_backend = TimerBackend::TimingWheel,
163 : : bool start_paused = false);
164 : : ~HybridScheduler() override;
165 : :
166 : : HybridScheduler(const HybridScheduler&) = delete;
167 : : HybridScheduler& operator=(const HybridScheduler&) = delete;
168 : : HybridScheduler(HybridScheduler&&) = delete;
169 : : HybridScheduler& operator=(HybridScheduler&&) = delete;
170 : :
171 : : void start() override;
172 : : void stop() override;
173 : : void notify_ready(ActorId actor, uint8_t priority, int64_t deadline_ns) override;
174 : : void notify_idle(ActorId actor) override;
175 : : void yield(ActorId actor, uint8_t priority) override;
176 : 2 : bool is_running() const override {
177 : 2 : return running_.load(std::memory_order_acquire);
178 : : }
179 : 2 : size_t worker_count() const override {
180 : 2 : return num_workers_;
181 : : }
182 : : TimerHandle schedule_after(timer_callback cb, int64_t delay_ns) override;
183 : : TimerHandle schedule_every(timer_callback cb, int64_t interval_ns) override;
184 : : void cancel_timer(TimerHandle handle) override;
185 : :
186 : : void register_dedicated_thread(ActorId actor, int cpu_affinity) override;
187 : : void register_dedicated_pool(ActorId actor, uint32_t pool_size) override;
188 : : void unregister_dedicated(ActorId actor) override;
189 : :
190 : : // Worker control (deterministic testing)
191 : : void pause_workers() noexcept override;
192 : : void resume_workers() noexcept override;
193 : : bool workers_paused() const noexcept override;
194 : : bool run_one_ready() override;
195 : : SchedulerDrainResult drain_ready(size_t max_items) override;
196 : :
197 : : // Try to steal work from another worker (called when local queue is empty)
198 : : bool try_steal(WorkItem& out);
199 : :
200 : : // Process one actor (called by worker loop)
201 : : void process_actor(ActorId actor);
202 : :
203 : : // Execute an actor (handles coroutine resumption when available)
204 : : // TODO(Task 4.2): Integrate coroutine resumption when get_coroutine() is
205 : : // available
206 : : void execute_actor(const WorkItem& item);
207 : :
208 : : // Timing wheel integration
209 : : // Schedule a timer to fire after delay_ns (in nanoseconds)
210 : : // Returns timer ID that can be used to cancel
211 : : uint64_t schedule_timer(int64_t delay_ns, timer_callback callback);
212 : :
213 : : // Advance time - processes expired timers
214 : : void advance_time(int64_t now_ns);
215 : :
216 : : private:
217 : : struct alignas(64) WorkerState {
218 : : // Using unique_ptr array to avoid move semantics issues with
219 : : // ChaselevDeque
220 : : std::unique_ptr<ChaselevDeque<WorkItem>[]> queues;
221 : : uint32_t index;
222 : : EDFQueue edf_queue; // For deadline-ordered work
223 : : };
224 : :
225 : : public:
226 : : // A2WS access for WorkerThread
227 : 0 : A2WS& a2ws() {
228 : 0 : return a2ws_;
229 : : }
230 : 0 : std::vector<WorkerState>& workers() {
231 : 0 : return workers_;
232 : : }
233 : :
234 : : friend class WorkerThread;
235 : :
236 : : void wait_if_paused(uint32_t worker_id);
237 : : bool pop_any_ready(WorkItem& out);
238 : : void mark_dispatch_begin() noexcept;
239 : : void mark_dispatch_end() noexcept;
240 : :
241 : : void worker_loop(uint32_t worker_id);
242 : : bool pop_local(WorkItem& out, uint32_t worker_id);
243 : : bool pop_edf(WorkItem& out, uint32_t worker_id);
244 : :
245 : : // Thread-local worker identification
246 : : uint32_t current_worker_id() const;
247 : :
248 : : // Exponential backoff when no work available
249 : : void backoff();
250 : :
251 : : ActorSystem& system_;
252 : : uint32_t num_workers_;
253 : : uint32_t num_priorities_;
254 : : std::atomic<bool> running_{false};
255 : : std::vector<WorkerState> workers_;
256 : : std::vector<std::thread> worker_threads_;
257 : :
258 : : // Adaptive two-level work stealing
259 : : A2WS a2ws_;
260 : :
261 : : // Timer backend (TimingWheel or CalendarQueue via variant dispatch)
262 : : std::variant<TimingWheel, CalendarQueue> timer_backend_;
263 : :
264 : 106 : void set_metrics_ring_buffer(void* buf) noexcept override {
265 : 106 : metrics_ring_buffer_ =
266 : : static_cast<metrics::MpscRingBuffer<metrics::MetricEvent>*>(buf);
267 : 106 : }
268 : :
269 : 106 : void set_logger(void* logger) noexcept override {
270 : 106 : logger_ = static_cast<log::Logger*>(logger);
271 : 106 : }
272 : :
273 : : // For recurring timer cancellation: maps timer ID to cancellation flag
274 : : std::unordered_map<uint64_t, std::shared_ptr<std::atomic<bool>>> recurring_cancellations_;
275 : : std::mutex cancellation_mutex_;
276 : :
277 : : metrics::MpscRingBuffer<metrics::MetricEvent>* metrics_ring_buffer_{nullptr};
278 : :
279 : : log::Logger* logger_{nullptr};
280 : :
281 : : // Worker control
282 : : std::atomic<bool> workers_paused_{false};
283 : : std::atomic<uint32_t> active_worker_dispatches_{0};
284 : : std::atomic<uint32_t> parked_worker_count_{0};
285 : : std::mutex worker_control_mutex_;
286 : : std::condition_variable worker_control_cv_;
287 : :
288 : : // Timer advancement thread
289 : : std::thread timer_thread_;
290 : :
291 : : // Dedicated execution storage (PIMPL to avoid incomplete-type-in-container
292 : : // issue with libc++'s noexcept default constructors)
293 : : struct DedicatedStorage;
294 : : std::unique_ptr<DedicatedStorage> dedicated_;
295 : : };
296 : :
297 : : } // namespace hpactor::sched
|