LCOV - code coverage report
Current view: top level - include/hpactor/sched - scheduler.hpp (source / functions) Coverage Total Hit
Test: HPActor Coverage Lines: 34.4 % 32 11
Test Date: 2026-05-20 02:24:49 Functions: 33.3 % 15 5
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: - 0 0

             Branch data     Line data    Source code
       1                 :             : // Copyright 2026 HPActor Contributors
       2                 :             : //
       3                 :             : // Licensed under the Apache License, Version 2.0 (the "License");
       4                 :             : // you may not use this file except in compliance with the License.
       5                 :             : // You may obtain a copy of the License at
       6                 :             : //
       7                 :             : //     http://www.apache.org/licenses/LICENSE-2.0
       8                 :             : //
       9                 :             : // Unless required by applicable law or agreed to in writing, software
      10                 :             : // distributed under the License is distributed on an "AS IS" BASIS,
      11                 :             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12                 :             : // See the License for the specific language governing permissions and
      13                 :             : // limitations under the License.
      14                 :             : 
      15                 :             : #pragma once
      16                 :             : 
      17                 :             : #include <hpactor/actor/actor_fwd.hpp>
      18                 :             : #include <hpactor/adt/id.hpp>
      19                 :             : #include <hpactor/adt/tags.hpp>
      20                 :             : #include <hpactor/metrics/metrics_event.hpp>
      21                 :             : #include <hpactor/metrics/metrics_ring_buffer.hpp>
      22                 :             : #include <hpactor/sched/a2ws.hpp>
      23                 :             : #include <hpactor/sched/calendar_queue.hpp>
      24                 :             : #include <hpactor/sched/edf_queue.hpp>
      25                 :             : #include <hpactor/sched/timing_wheel.hpp>
      26                 :             : #include <hpactor/sched/work_queue.hpp>
      27                 :             : 
      28                 :             : #include <atomic>
      29                 :             : #include <condition_variable>
      30                 :             : #include <cstdint>
      31                 :             : #include <functional>
      32                 :             : #include <memory>
      33                 :             : #include <mutex>
      34                 :             : #include <thread>
      35                 :             : #include <unordered_map>
      36                 :             : #include <unordered_set>
      37                 :             : #include <variant>
      38                 :             : #include <vector>
      39                 :             : 
      40                 :             : namespace hpactor {
      41                 :             : 
      42                 :             : // Forward declare ActorSystem to avoid circular include
      43                 :             : class ActorSystem;
      44                 :             : 
      45                 :             : namespace log {
      46                 :             : class Logger;
      47                 :             : } // namespace log
      48                 :             : 
      49                 :             : } // namespace hpactor
      50                 :             : 
      51                 :             : namespace hpactor::sched {
      52                 :             : 
      53                 :             : class DedicatedThreadPool; // forward decl
      54                 :             : 
      55                 :             : // -----------------------------------------------------------------------------
      56                 :             : // TimerHandle and timer_callback types
      57                 :             : // -----------------------------------------------------------------------------
      58                 :             : using TimerHandle = Id<TimerTag>;
      59                 :             : 
      60                 :             : using timer_callback = std::function<void()>;
      61                 :             : 
      62                 :             : enum class TimerBackend : uint8_t { TimingWheel = 0, CalendarQueue = 1 };
      63                 :             : 
      64                 :             : struct SchedulerDrainResult {
      65                 :             :     size_t executed = 0;
      66                 :             :     bool idle = true;
      67                 :             : };
      68                 :             : 
      69                 :             : // -----------------------------------------------------------------------------
      70                 :             : // IScheduler: interface for actor schedulers
      71                 :             : // -----------------------------------------------------------------------------
      72                 :             : class IScheduler {
      73                 :             :   public:
      74                 :         120 :     virtual ~IScheduler() = default;
      75                 :             : 
      76                 :           0 :     virtual void set_metrics_ring_buffer(void* /*buf*/) {}
      77                 :           0 :     virtual void set_logger(void* /*logger*/) noexcept {}
      78                 :             : 
      79                 :             :     // Start the scheduler
      80                 :             :     virtual void start() = 0;
      81                 :             : 
      82                 :             :     // Stop the scheduler
      83                 :             :     virtual void stop() = 0;
      84                 :             : 
      85                 :             :     // Thread-safe; may be called from any thread including I/O threads
      86                 :             :     // Notify scheduler that an actor is ready to run at given priority
      87                 :             :     virtual void
      88                 :             :     notify_ready(ActorId actor, uint8_t priority, int64_t deadline_ns) = 0;
      89                 :             : 
      90                 :             :     // Notify scheduler that an actor has become idle (blocked on I/O, etc.)
      91                 :             :     virtual void notify_idle(ActorId actor) = 0;
      92                 :             : 
      93                 :             :     // Voluntarily yield — re-enqueue actor at same priority for cooperative
      94                 :             :     // multitasking
      95                 :             :     virtual void yield(ActorId actor, uint8_t priority) = 0;
      96                 :             : 
      97                 :             :     // Schedule a one-shot timer to fire after delay_ns
      98                 :             :     virtual TimerHandle schedule_after(timer_callback cb, int64_t delay_ns) = 0;
      99                 :             : 
     100                 :             :     // Schedule a recurring timer to fire every interval_ns
     101                 :             :     virtual TimerHandle schedule_every(timer_callback cb, int64_t interval_ns) = 0;
     102                 :             : 
     103                 :             :     // Cancel a previously scheduled timer
     104                 :             :     virtual void cancel_timer(TimerHandle handle) = 0;
     105                 :             : 
     106                 :             :     // Number of worker threads
     107                 :             :     virtual size_t worker_count() const = 0;
     108                 :             : 
     109                 :             :     // Check if scheduler is running
     110                 :             :     virtual bool is_running() const = 0;
     111                 :             : 
     112                 :             :     // Register an actor that needs a dedicated OS thread.
     113                 :             :     // cpu_affinity: -1 = no affinity, >=0 = pin to specific core.
     114                 :             :     virtual void register_dedicated_thread(ActorId actor, int cpu_affinity) = 0;
     115                 :             : 
     116                 :             :     // Register an actor that needs a dedicated worker pool.
     117                 :             :     // The scheduler creates or reuses a DedicatedThreadPool of the given size.
     118                 :             :     virtual void register_dedicated_pool(ActorId actor, uint32_t pool_size) = 0;
     119                 :             : 
     120                 :             :     // Shutdown a dedicated execution context for an actor.
     121                 :             :     virtual void unregister_dedicated(ActorId actor) = 0;
     122                 :             : 
     123                 :             :     // Worker control (intended for deterministic testing).
     124                 :           0 :     virtual void pause_workers() noexcept {}
     125                 :           0 :     virtual void resume_workers() noexcept {}
     126                 :           0 :     virtual bool workers_paused() const noexcept {
     127                 :           0 :         return false;
     128                 :             :     }
     129                 :           0 :     virtual bool run_one_ready() {
     130                 :           0 :         return false;
     131                 :             :     }
     132                 :           0 :     virtual SchedulerDrainResult drain_ready(size_t max_items) {
     133                 :           0 :         SchedulerDrainResult result;
     134                 :           0 :         for (size_t i = 0; i < max_items; ++i) {
     135                 :           0 :             if (!run_one_ready()) {
     136                 :           0 :                 result.idle = true;
     137                 :           0 :                 return result;
     138                 :             :             }
     139                 :           0 :             ++result.executed;
     140                 :             :         }
     141                 :           0 :         result.idle = false;
     142                 :           0 :         return result;
     143                 :             :     }
     144                 :             : };
     145                 :             : 
     146                 :             : // -----------------------------------------------------------------------------
     147                 :             : // HybridScheduler: work-stealing scheduler with priority queues
     148                 :             : // -----------------------------------------------------------------------------
     149                 :             : // Each worker has its own ChaseLev deque. Work-stealing is done by trying
     150                 :             : // to pop from the target worker's deque when local work is exhausted.
     151                 :             : // Priority levels 0-3 (0 = highest).
     152                 :             : //
     153                 :             : // Uses MultiPriorityWorkQueue for priority-based local enqueue.
     154                 :             : // Work-stealing is attempted in round-robin order across workers.
     155                 :             : // -----------------------------------------------------------------------------
     156                 :             : class HybridScheduler : public IScheduler {
     157                 :             :   public:
     158                 :             :     // num_priorities: number of priority levels (default 4, priorities 0..N-1)
     159                 :             :     // ActorSystem reference is held for processing actors
     160                 :             :     explicit HybridScheduler(ActorSystem& system, uint32_t num_workers,
     161                 :             :                              uint32_t num_priorities = 4,
     162                 :             :                              TimerBackend timer_backend = TimerBackend::TimingWheel,
     163                 :             :                              bool start_paused = false);
     164                 :             :     ~HybridScheduler() override;
     165                 :             : 
     166                 :             :     HybridScheduler(const HybridScheduler&) = delete;
     167                 :             :     HybridScheduler& operator=(const HybridScheduler&) = delete;
     168                 :             :     HybridScheduler(HybridScheduler&&) = delete;
     169                 :             :     HybridScheduler& operator=(HybridScheduler&&) = delete;
     170                 :             : 
     171                 :             :     void start() override;
     172                 :             :     void stop() override;
     173                 :             :     void notify_ready(ActorId actor, uint8_t priority, int64_t deadline_ns) override;
     174                 :             :     void notify_idle(ActorId actor) override;
     175                 :             :     void yield(ActorId actor, uint8_t priority) override;
     176                 :           2 :     bool is_running() const override {
     177                 :           2 :         return running_.load(std::memory_order_acquire);
     178                 :             :     }
     179                 :           2 :     size_t worker_count() const override {
     180                 :           2 :         return num_workers_;
     181                 :             :     }
     182                 :             :     TimerHandle schedule_after(timer_callback cb, int64_t delay_ns) override;
     183                 :             :     TimerHandle schedule_every(timer_callback cb, int64_t interval_ns) override;
     184                 :             :     void cancel_timer(TimerHandle handle) override;
     185                 :             : 
     186                 :             :     void register_dedicated_thread(ActorId actor, int cpu_affinity) override;
     187                 :             :     void register_dedicated_pool(ActorId actor, uint32_t pool_size) override;
     188                 :             :     void unregister_dedicated(ActorId actor) override;
     189                 :             : 
     190                 :             :     // Worker control (deterministic testing)
     191                 :             :     void pause_workers() noexcept override;
     192                 :             :     void resume_workers() noexcept override;
     193                 :             :     bool workers_paused() const noexcept override;
     194                 :             :     bool run_one_ready() override;
     195                 :             :     SchedulerDrainResult drain_ready(size_t max_items) override;
     196                 :             : 
     197                 :             :     // Try to steal work from another worker (called when local queue is empty)
     198                 :             :     bool try_steal(WorkItem& out);
     199                 :             : 
     200                 :             :     // Process one actor (called by worker loop)
     201                 :             :     void process_actor(ActorId actor);
     202                 :             : 
     203                 :             :     // Execute an actor (handles coroutine resumption when available)
     204                 :             :     // TODO(Task 4.2): Integrate coroutine resumption when get_coroutine() is
     205                 :             :     // available
     206                 :             :     void execute_actor(const WorkItem& item);
     207                 :             : 
     208                 :             :     // Timing wheel integration
     209                 :             :     // Schedule a timer to fire after delay_ns (in nanoseconds)
     210                 :             :     // Returns timer ID that can be used to cancel
     211                 :             :     uint64_t schedule_timer(int64_t delay_ns, timer_callback callback);
     212                 :             : 
     213                 :             :     // Advance time - processes expired timers
     214                 :             :     void advance_time(int64_t now_ns);
     215                 :             : 
     216                 :             :   private:
     217                 :             :     struct alignas(64) WorkerState {
     218                 :             :         // Using unique_ptr array to avoid move semantics issues with
     219                 :             :         // ChaselevDeque
     220                 :             :         std::unique_ptr<ChaselevDeque<WorkItem>[]> queues;
     221                 :             :         uint32_t index;
     222                 :             :         EDFQueue edf_queue; // For deadline-ordered work
     223                 :             :     };
     224                 :             : 
     225                 :             :   public:
     226                 :             :     // A2WS access for WorkerThread
     227                 :           0 :     A2WS& a2ws() {
     228                 :           0 :         return a2ws_;
     229                 :             :     }
     230                 :           0 :     std::vector<WorkerState>& workers() {
     231                 :           0 :         return workers_;
     232                 :             :     }
     233                 :             : 
     234                 :             :     friend class WorkerThread;
     235                 :             : 
     236                 :             :     void wait_if_paused(uint32_t worker_id);
     237                 :             :     bool pop_any_ready(WorkItem& out);
     238                 :             :     void mark_dispatch_begin() noexcept;
     239                 :             :     void mark_dispatch_end() noexcept;
     240                 :             : 
     241                 :             :     void worker_loop(uint32_t worker_id);
     242                 :             :     bool pop_local(WorkItem& out, uint32_t worker_id);
     243                 :             :     bool pop_edf(WorkItem& out, uint32_t worker_id);
     244                 :             : 
     245                 :             :     // Thread-local worker identification
     246                 :             :     uint32_t current_worker_id() const;
     247                 :             : 
     248                 :             :     // Exponential backoff when no work available
     249                 :             :     void backoff();
     250                 :             : 
     251                 :             :     ActorSystem& system_;
     252                 :             :     uint32_t num_workers_;
     253                 :             :     uint32_t num_priorities_;
     254                 :             :     std::atomic<bool> running_{false};
     255                 :             :     std::vector<WorkerState> workers_;
     256                 :             :     std::vector<std::thread> worker_threads_;
     257                 :             : 
     258                 :             :     // Adaptive two-level work stealing
     259                 :             :     A2WS a2ws_;
     260                 :             : 
     261                 :             :     // Timer backend (TimingWheel or CalendarQueue via variant dispatch)
     262                 :             :     std::variant<TimingWheel, CalendarQueue> timer_backend_;
     263                 :             : 
     264                 :         106 :     void set_metrics_ring_buffer(void* buf) noexcept override {
     265                 :         106 :         metrics_ring_buffer_ =
     266                 :             :             static_cast<metrics::MpscRingBuffer<metrics::MetricEvent>*>(buf);
     267                 :         106 :     }
     268                 :             : 
     269                 :         106 :     void set_logger(void* logger) noexcept override {
     270                 :         106 :         logger_ = static_cast<log::Logger*>(logger);
     271                 :         106 :     }
     272                 :             : 
     273                 :             :     // For recurring timer cancellation: maps timer ID to cancellation flag
     274                 :             :     std::unordered_map<uint64_t, std::shared_ptr<std::atomic<bool>>> recurring_cancellations_;
     275                 :             :     std::mutex cancellation_mutex_;
     276                 :             : 
     277                 :             :     metrics::MpscRingBuffer<metrics::MetricEvent>* metrics_ring_buffer_{nullptr};
     278                 :             : 
     279                 :             :     log::Logger* logger_{nullptr};
     280                 :             : 
     281                 :             :     // Worker control
     282                 :             :     std::atomic<bool> workers_paused_{false};
     283                 :             :     std::atomic<uint32_t> active_worker_dispatches_{0};
     284                 :             :     std::atomic<uint32_t> parked_worker_count_{0};
     285                 :             :     std::mutex worker_control_mutex_;
     286                 :             :     std::condition_variable worker_control_cv_;
     287                 :             : 
     288                 :             :     // Timer advancement thread
     289                 :             :     std::thread timer_thread_;
     290                 :             : 
     291                 :             :     // Dedicated execution storage (PIMPL to avoid incomplete-type-in-container
     292                 :             :     // issue with libc++'s noexcept default constructors)
     293                 :             :     struct DedicatedStorage;
     294                 :             :     std::unique_ptr<DedicatedStorage> dedicated_;
     295                 :             : };
     296                 :             : 
     297                 :             : } // namespace hpactor::sched
        

Generated by: LCOV version 2.0-1