Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #pragma once
16 : :
17 : : #include <hpactor/actor/actor_fwd.hpp>
18 : : #include <hpactor/sched/coroutine_frame_pool.hpp>
19 : : #include <hpactor/sched/work_queue.hpp>
20 : :
21 : : #include <atomic>
22 : : #include <cstdint>
23 : : #include <thread>
24 : : #include <vector>
25 : :
26 : : namespace hpactor::mem { class ThreadLocalAllocator; }
27 : : namespace hpactor::sched {
28 : :
29 : : // Forward declaration
30 : : class HybridScheduler;
31 : :
32 : : // -----------------------------------------------------------------------------
33 : : // WorkerThread: per-thread worker for work-stealing scheduler
34 : : // -----------------------------------------------------------------------------
35 : : // Each worker has:
36 : : // - A local MultiPriorityWorkQueue for actor messages
37 : : // - Thread ID for identification
38 : : // - State flags for scheduling decisions
39 : : //
40 : : // Work-stealing strategy: Adaptive Work Stealing (AWS)
41 : : // - Each worker maintains a "donation" counter
42 : : // - When donation threshold exceeded, worker becomes a thief
43 : : // - Victims selected via per-worker round-robin pointer
44 : : // -----------------------------------------------------------------------------
45 : : class WorkerThread {
46 : : public:
47 : : struct Config {
48 : : uint32_t worker_index = 0;
49 : : uint32_t priority_levels = 4;
50 : : uint32_t steal_threshold = 10; // attempts before becoming active thief
51 : : uint32_t victim_scan_limit = 4; // max victims to scan per steal attempt
52 : : };
53 : :
54 : : explicit WorkerThread(const Config& config);
55 : : ~WorkerThread();
56 : :
57 : : WorkerThread(const WorkerThread&) = delete;
58 : : WorkerThread& operator=(const WorkerThread&) = delete;
59 : : WorkerThread(WorkerThread&&) = delete;
60 : : WorkerThread& operator=(WorkerThread&&) = delete;
61 : :
62 : : // Start the worker thread
63 : : void start();
64 : :
65 : : // Stop the worker thread
66 : : void stop();
67 : :
68 : : // Enqueue work to this worker (push_bottom - owner operation)
69 : : void push(uint8_t priority, WorkItem item);
70 : :
71 : : // Try to pop from local queue (pop_bottom - owner operation)
72 : : bool pop(WorkItem& out);
73 : :
74 : : // Try to steal from this worker (steal_top - thief operation)
75 : : bool steal(WorkItem& out);
76 : :
77 : : // Process a single work item
78 : : void process(const WorkItem& item);
79 : :
80 : : // Worker index
81 : 1 : uint32_t index() const {
82 : 1 : return config_.worker_index;
83 : : }
84 : :
85 : : // Approximate queue depth
86 : : size_t depth() const;
87 : :
88 : : // Check if running
89 : 5 : bool is_running() const {
90 : 5 : return running_.load(std::memory_order_acquire);
91 : : }
92 : :
93 : : // For scheduling coordination: donation count for work-stealing decisions
94 : 3 : uint64_t donation_count() const {
95 : 6 : return donation_count_.load(std::memory_order_relaxed);
96 : : }
97 : 3 : void increment_donations() {
98 : 3 : donation_count_.fetch_add(1, std::memory_order_relaxed);
99 : 3 : }
100 : :
101 : : // Coroutine frame pool integration
102 : : // Acquire a coroutine frame from the pool (for blocking operations)
103 : : CoroutineFramePool::Frame* acquire_frame();
104 : : void release_frame(CoroutineFramePool::Frame* frame);
105 : :
106 : : // Set the frame pool (called by scheduler)
107 : 1 : void set_frame_pool(CoroutineFramePool* pool) {
108 : 1 : frame_pool_ = pool;
109 : 1 : }
110 : :
111 : : // Set the owner scheduler (for A2WS-based stealing)
112 : : void set_owner(HybridScheduler* owner) {
113 : : owner_ = owner;
114 : : }
115 : :
116 : : // Per-thread memory allocator accessor
117 : : mem::ThreadLocalAllocator* allocator() { return allocator_; }
118 : :
119 : : // Try to steal work using A2WS victim selection
120 : : bool try_steal(WorkItem& out);
121 : :
122 : : private:
123 : : void thread_loop();
124 : :
125 : : Config config_;
126 : : std::thread thread_;
127 : : std::atomic<bool> running_{false};
128 : : std::atomic<bool> stop_requested_{false};
129 : :
130 : : // Owner scheduler (for A2WS access)
131 : : HybridScheduler* owner_{nullptr};
132 : :
133 : : // Local priority queue for this worker
134 : : MultiPriorityWorkQueue local_queue_;
135 : :
136 : : // Donation counter for adaptive stealing
137 : : std::atomic<uint64_t> donation_count_{0};
138 : :
139 : : // Per-thread memory allocator
140 : : mem::ThreadLocalAllocator* allocator_{nullptr};
141 : :
142 : : // Coroutine frame pool
143 : : CoroutineFramePool* frame_pool_{nullptr};
144 : : };
145 : :
146 : : } // namespace hpactor::sched
|