Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #pragma once
16 : :
17 : : #include <algorithm>
18 : : #include <atomic>
19 : : #include <cstddef>
20 : : #include <cstdint>
21 : : #include <vector>
22 : :
23 : : #include <hpactor/types/types.hpp>
24 : :
25 : : namespace hpactor::sched {
26 : :
27 : : struct WorkItem {
28 : : ActorId actor;
29 : : int64_t deadline_ns;
30 : : uint64_t sequence;
31 : : };
32 : :
33 : : template <typename T> class ChaselevDeque {
34 : : public:
35 : : explicit ChaselevDeque(size_t initial_capacity = 256);
36 : : ~ChaselevDeque();
37 : :
38 : : ChaselevDeque(const ChaselevDeque&) = delete;
39 : : ChaselevDeque& operator=(const ChaselevDeque&) = delete;
40 : : ChaselevDeque(ChaselevDeque&&) = delete;
41 : : ChaselevDeque& operator=(ChaselevDeque&&) = delete;
42 : :
43 : : void push_bottom(T item);
44 : : bool pop_bottom(T& out);
45 : : bool steal_top(T& out);
46 : : size_t size_approx() const;
47 : :
48 : : private:
49 : : struct CircularArray {
50 : : std::vector<std::atomic<T>> buf;
51 : : size_t mask;
52 : : explicit CircularArray(size_t cap);
53 : :
54 : 1871 : T get(int64_t i) const {
55 : 1871 : return buf[static_cast<size_t>(i) & mask].load(std::memory_order_relaxed);
56 : : }
57 : 1872 : void put(int64_t i, T v) {
58 : 1872 : buf[static_cast<size_t>(i) & mask].store(v, std::memory_order_relaxed);
59 : 1872 : }
60 : : CircularArray* grow(int64_t bottom, int64_t top) const;
61 : : };
62 : :
63 : : std::atomic<int64_t> top_{0};
64 : : std::atomic<int64_t> bottom_{0};
65 : : std::atomic<CircularArray*> array_;
66 : : std::vector<CircularArray*> garbage_;
67 : : };
68 : :
69 : : template <typename T>
70 : 457 : ChaselevDeque<T>::CircularArray::CircularArray(size_t cap)
71 : 914 : : buf(cap), mask(cap - 1) {
72 : 116421 : for (auto& slot : buf) {
73 : 115964 : slot.store(T{}, std::memory_order_relaxed);
74 : : }
75 : 457 : }
76 : :
77 : : template <typename T>
78 : : typename ChaselevDeque<T>::CircularArray*
79 : 6 : ChaselevDeque<T>::CircularArray::grow(int64_t bottom, int64_t top) const {
80 : 6 : size_t new_cap = buf.size() * 2;
81 : 6 : auto* new_arr = new CircularArray(new_cap);
82 : 386 : for (int64_t i = top; i < bottom; ++i) {
83 : 380 : new_arr->put(i, get(i));
84 : : }
85 : 6 : return new_arr;
86 : : }
87 : :
88 : : template <typename T>
89 : 451 : ChaselevDeque<T>::ChaselevDeque(size_t initial_capacity)
90 : 451 : : array_(new CircularArray(initial_capacity)) {}
91 : :
92 : 451 : template <typename T> ChaselevDeque<T>::~ChaselevDeque() {
93 : 451 : delete array_.load(std::memory_order_acquire);
94 : 457 : for (auto* arr : garbage_) {
95 : 6 : delete arr;
96 : : }
97 : 451 : }
98 : :
99 : 1492 : template <typename T> void ChaselevDeque<T>::push_bottom(T item) {
100 : 1492 : int64_t b = bottom_.load(std::memory_order_relaxed);
101 : 1492 : int64_t t = top_.load(std::memory_order_acquire);
102 : 1492 : auto* arr = array_.load(std::memory_order_acquire);
103 : :
104 : 1492 : if (b - t > static_cast<int64_t>(arr->mask)) {
105 : 6 : auto* new_arr = arr->grow(b, t);
106 : 6 : garbage_.push_back(arr);
107 : 6 : array_.store(new_arr, std::memory_order_release);
108 : 6 : arr = new_arr;
109 : : }
110 : :
111 : 1492 : arr->put(b, std::move(item));
112 : 1492 : bottom_.store(b + 1, std::memory_order_release);
113 : 1492 : }
114 : :
115 : 102798 : template <typename T> bool ChaselevDeque<T>::pop_bottom(T& out) {
116 : 102798 : int64_t b = bottom_.fetch_sub(1, std::memory_order_acq_rel) - 1;
117 : 102798 : int64_t t = top_.load(std::memory_order_acquire);
118 : :
119 : 102798 : auto* arr = array_.load(std::memory_order_acquire);
120 : 102798 : if (b - t < 0) {
121 : 102615 : bottom_.store(b + 1, std::memory_order_release);
122 : 102615 : return false;
123 : : }
124 : :
125 : 183 : out = arr->get(b);
126 : :
127 : 183 : if (b == t) {
128 : 42 : if (!top_.compare_exchange_strong(t, t + 1, std::memory_order_acq_rel,
129 : : std::memory_order_acquire)) {
130 : 0 : bottom_.store(b + 1, std::memory_order_release);
131 : 0 : return false;
132 : : }
133 : : // CAS succeeded: we removed the last item (at b == t). Update bottom_
134 : : // to keep the deque consistent. If we don't, bottom_ < top_ causes push
135 : : // to overwrite the stolen slot when it sees b < t and thinks the deque
136 : : // is empty.
137 : 21 : bottom_.store(b + 1, std::memory_order_release);
138 : 21 : return true;
139 : : }
140 : :
141 : 162 : return true;
142 : : }
143 : :
144 : 254878 : template <typename T> bool ChaselevDeque<T>::steal_top(T& out) {
145 : 254878 : int64_t t = top_.load(std::memory_order_acquire);
146 : 254878 : int64_t b = bottom_.load(std::memory_order_acquire);
147 : :
148 : 254878 : if (b - t <= 0) {
149 : 253570 : return false;
150 : : }
151 : :
152 : 1308 : auto* arr = array_.load(std::memory_order_acquire);
153 : 1308 : out = arr->get(t);
154 : :
155 : 2616 : if (!top_.compare_exchange_strong(t, t + 1, std::memory_order_acq_rel,
156 : : std::memory_order_acquire)) {
157 : 0 : return false;
158 : : }
159 : :
160 : 1308 : return true;
161 : : }
162 : :
163 : 17 : template <typename T> size_t ChaselevDeque<T>::size_approx() const {
164 : 17 : int64_t b = bottom_.load(std::memory_order_relaxed);
165 : 17 : int64_t t = top_.load(std::memory_order_relaxed);
166 : 17 : return static_cast<size_t>(b >= t ? b - t : 0);
167 : : }
168 : :
169 : : class MultiPriorityWorkQueue {
170 : : public:
171 : 12 : explicit MultiPriorityWorkQueue(uint32_t priority_levels = 4)
172 : 24 : : levels_(priority_levels) {}
173 : :
174 : 8 : void push(uint8_t priority, WorkItem item) {
175 : 8 : levels_[priority].push_bottom(item);
176 : 8 : }
177 : :
178 : 8 : bool pop(WorkItem& out) {
179 : 22 : for (uint32_t i = 0; i < levels_.size(); ++i) {
180 : 20 : if (levels_[i].pop_bottom(out)) {
181 : 6 : return true;
182 : : }
183 : : }
184 : 2 : return false;
185 : : }
186 : :
187 : : // Steal from the highest priority queue first
188 : 5 : bool steal(WorkItem& out) {
189 : 21 : for (uint32_t i = 0; i < levels_.size(); ++i) {
190 : 17 : if (levels_[i].steal_top(out)) {
191 : 1 : return true;
192 : : }
193 : : }
194 : 4 : return false;
195 : : }
196 : :
197 : 3 : size_t depth_approx() const {
198 : 3 : size_t total = 0;
199 : 15 : for (const auto& level : levels_) {
200 : 12 : total += level.size_approx();
201 : : }
202 : 3 : return total;
203 : : }
204 : :
205 : 6 : uint32_t num_levels() const {
206 : 6 : return static_cast<uint32_t>(levels_.size());
207 : : }
208 : :
209 : : private:
210 : : std::vector<ChaselevDeque<WorkItem>> levels_;
211 : : };
212 : :
213 : : } // namespace hpactor::sched
|