Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #include <hpactor/sched/scheduler.hpp>
16 : : #include <hpactor/sched/worker_thread.hpp>
17 : : #include <hpactor/mem/thread_local_allocator.hpp>
18 : : #include <hpactor/mem/memory_config.hpp>
19 : :
20 : : namespace hpactor::sched {
21 : :
22 : : // Thread-local pointer to the current worker's frame pool
23 : : thread_local CoroutineFramePool* tl_frame_pool = nullptr;
24 : :
25 : 11 : WorkerThread::WorkerThread(const Config& config)
26 : 11 : : config_(config), local_queue_(config.priority_levels) {
27 : 11 : allocator_ = new mem::ThreadLocalAllocator();
28 : 11 : }
29 : :
30 : 11 : WorkerThread::~WorkerThread() {
31 : 11 : stop();
32 : 11 : delete allocator_;
33 : 11 : }
34 : :
35 : 3 : void WorkerThread::start() {
36 : 3 : if (running_.load(std::memory_order_acquire)) {
37 : 1 : return;
38 : : }
39 : 2 : running_.store(true, std::memory_order_release);
40 : 2 : stop_requested_.store(false, std::memory_order_release);
41 : 4 : thread_ = std::thread([this] {
42 : 2 : if (frame_pool_) {
43 : 0 : tl_frame_pool = frame_pool_;
44 : : }
45 : 2 : if (allocator_) {
46 : 2 : mem::set_thread_allocator(allocator_);
47 : : }
48 : 2 : thread_loop();
49 : 2 : });
50 : : }
51 : :
52 : 13 : void WorkerThread::stop() {
53 : 13 : stop_requested_.store(true, std::memory_order_release);
54 : 13 : running_.store(false, std::memory_order_release);
55 : 13 : if (thread_.joinable()) {
56 : 2 : thread_.join();
57 : : }
58 : 13 : }
59 : :
60 : 4 : void WorkerThread::push(uint8_t priority, WorkItem item) {
61 : 4 : local_queue_.push(priority, item);
62 : 4 : }
63 : :
64 : 3 : bool WorkerThread::pop(WorkItem& out) {
65 : 3 : return local_queue_.pop(out);
66 : : }
67 : :
68 : 2 : bool WorkerThread::steal(WorkItem& out) {
69 : : // Steal from the highest priority queue first
70 : 6 : for (uint32_t i = 0; i < local_queue_.num_levels(); ++i) {
71 : 5 : if (local_queue_.steal(out)) {
72 : 1 : return true;
73 : : }
74 : : }
75 : 1 : return false;
76 : : }
77 : :
78 : 1 : void WorkerThread::process(const WorkItem& item) {
79 : : // Process the actor - actual implementation would call actor's receive
80 : : // This is a placeholder that would be wired to ActorSystem
81 : : (void)item;
82 : 1 : }
83 : :
84 : 1 : size_t WorkerThread::depth() const {
85 : 1 : return local_queue_.depth_approx();
86 : : }
87 : :
88 : 2 : CoroutineFramePool::Frame* WorkerThread::acquire_frame() {
89 : 2 : if (frame_pool_) {
90 : 1 : return frame_pool_->acquire();
91 : : }
92 : 1 : return nullptr;
93 : : }
94 : :
95 : 2 : void WorkerThread::release_frame(CoroutineFramePool::Frame* frame) {
96 : 2 : if (frame_pool_ && frame) {
97 : 1 : frame_pool_->release(frame);
98 : : }
99 : 2 : }
100 : :
101 : 0 : bool WorkerThread::try_steal(WorkItem& out) {
102 : 0 : if (!owner_) {
103 : 0 : return false;
104 : : }
105 : :
106 : 0 : uint32_t my_id = config_.worker_index;
107 : :
108 : : // Try up to victim_scan_limit victims
109 : 0 : for (uint32_t attempt = 0; attempt < config_.victim_scan_limit; ++attempt) {
110 : 0 : uint32_t victim_idx = owner_->a2ws().get_victim(my_id);
111 : :
112 : 0 : if (victim_idx >= owner_->workers().size()) {
113 : 0 : break;
114 : : }
115 : :
116 : 0 : auto& victim = owner_->workers()[victim_idx];
117 : :
118 : : // Try EDF queue first
119 : 0 : if (victim.edf_queue.pop(out)) {
120 : 0 : owner_->a2ws().record_steal(my_id, victim_idx);
121 : 0 : return true;
122 : : }
123 : :
124 : : // Try each priority level
125 : 0 : for (uint32_t p = 0; p < owner_->num_priorities_; ++p) {
126 : 0 : if (victim.queues[p].steal_top(out)) {
127 : 0 : owner_->a2ws().record_steal(my_id, victim_idx);
128 : 0 : return true;
129 : : }
130 : : }
131 : :
132 : 0 : owner_->a2ws().record_attempt(my_id, victim_idx, false);
133 : : }
134 : 0 : return false;
135 : : }
136 : :
137 : 2 : void WorkerThread::thread_loop() {
138 : 2 : while (!stop_requested_.load(std::memory_order_acquire) &&
139 : 0 : running_.load(std::memory_order_acquire)) {
140 : 0 : WorkItem item;
141 : :
142 : : // Try to pop from local queue first (owner pop - fast path)
143 : 0 : if (pop(item)) {
144 : 0 : process(item);
145 : 0 : continue;
146 : : }
147 : :
148 : : // Local queue empty - this worker is a donation candidate
149 : 0 : increment_donations();
150 : :
151 : : // TODO: Work-stealing would be implemented here
152 : : // - Select victim using round-robin
153 : : // - Try to steal from victim's queue
154 : : // - If steal succeeds, process the item
155 : :
156 : : // Backoff when no work available
157 : : // In a real implementation, this would use exponential backoff
158 : : // or yield/pause instructions
159 : : }
160 : 2 : }
161 : :
162 : : } // namespace hpactor::sched
|