Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #pragma once
16 : :
17 : : #include <atomic>
18 : :
19 : : namespace hpactor::mailbox {
20 : :
21 : : // Intrusive Vyukov MPSC queue for actor mailboxes.
22 : : // T must provide: std::atomic<T*> mpsc_next
23 : : //
24 : : // Uses head/tail pointers with a dummy stub node. The dummy's mpsc_next
25 : : // always points to the oldest element (or nullptr when empty).
26 : : //
27 : : // Queue structure:
28 : : // head_ (dummy) -> oldest -> ... -> newest -> nullptr
29 : : // tail_ always points to the dummy stub
30 : : // count_ tracks number of elements (producer increments, consumer decrements)
31 : : //
32 : : // The dummy node is a T (inherited) so &stub_ is a valid T*.
33 : : //
34 : : template <typename T> class MPSCMailbox {
35 : : public:
36 : 134 : MPSCMailbox() {
37 : 134 : stub_.mpsc_next.store(nullptr, std::memory_order_relaxed);
38 : 134 : head_.store(&stub_, std::memory_order_relaxed);
39 : 134 : tail_.store(&stub_, std::memory_order_relaxed);
40 : 134 : count_.store(0, std::memory_order_relaxed);
41 : 134 : }
42 : :
43 : : // Producer: enqueue node at head (wait-free)
44 : : // Chain: head_ -> node -> old_head -> ... -> oldest -> nullptr
45 : 1455 : void enqueue(T* node) noexcept {
46 : 1455 : node->mpsc_next.store(nullptr, std::memory_order_relaxed);
47 : 1455 : T* prev = head_.exchange(node, std::memory_order_acq_rel);
48 : 1455 : prev->mpsc_next.store(node, std::memory_order_release);
49 : 1455 : count_.fetch_add(1, std::memory_order_release);
50 : 1455 : }
51 : :
52 : : // Consumer: dequeue oldest node, or nullptr if empty.
53 : 1217 : T* dequeue() noexcept {
54 : 0 : for (;;) {
55 : : // Quick return path for empty queue.
56 : 2434 : if (count_.load(std::memory_order_acquire) == 0)
57 : 1217 : return nullptr;
58 : :
59 : 1171 : T* tail = tail_.load(std::memory_order_acquire);
60 : 1171 : T* next = tail->mpsc_next.load(std::memory_order_acquire);
61 : 1171 : if (next == nullptr) {
62 : : // Count says non-empty but stub->next is null:
63 : : // elements are orphaned. Try to recover through
64 : : // the last-dequeued node's forward link.
65 : 0 : T* ld = last_dequeued_.load(std::memory_order_acquire);
66 : 0 : if (ld) {
67 : 0 : next = ld->mpsc_next.load(std::memory_order_acquire);
68 : 0 : if (next) {
69 : : // Restore the chain: stub -> recovered
70 : 0 : tail->mpsc_next.store(next,
71 : : std::memory_order_release);
72 : 0 : continue; // retry from the top
73 : : }
74 : : }
75 : 0 : return nullptr;
76 : : }
77 : :
78 : 1171 : T* next_next = next->mpsc_next.load(std::memory_order_relaxed);
79 : 1171 : if (next_next == nullptr) {
80 : 60 : T* head = head_.load(std::memory_order_acquire);
81 : 60 : if (head != next) {
82 : : do {
83 : 0 : next_next = next->mpsc_next.load(
84 : : std::memory_order_acquire);
85 : 0 : } while (next_next == nullptr);
86 : : }
87 : : }
88 : :
89 : 1171 : tail->mpsc_next.store(next_next, std::memory_order_release);
90 : :
91 : 1171 : if (next_next == nullptr) {
92 : 60 : T* head = head_.load(std::memory_order_acquire);
93 : 60 : if (head != next) {
94 : 0 : next_next = next->mpsc_next.load(
95 : : std::memory_order_acquire);
96 : 0 : if (next_next) {
97 : 0 : tail->mpsc_next.store(next_next,
98 : : std::memory_order_release);
99 : : }
100 : : }
101 : : }
102 : :
103 : 1171 : if (next_next == tail) {
104 : 0 : tail->mpsc_next.store(nullptr, std::memory_order_release);
105 : : }
106 : :
107 : : // Save last dequeued for orphan recovery.
108 : 1171 : last_dequeued_.store(next, std::memory_order_release);
109 : :
110 : : // Fix dangling head_ pointer: if head_ still points to the
111 : : // node we are about to free, reset it to the stub so that
112 : : // the next producer's head_.exchange() does not write to
113 : : // freed memory (mpsc_next.store).
114 : : {
115 : 1171 : T* h = head_.load(std::memory_order_acquire);
116 : 1171 : if (h == next) {
117 : 60 : head_.compare_exchange_strong(
118 : : next, static_cast<T*>(&stub_),
119 : : std::memory_order_release, std::memory_order_relaxed);
120 : : }
121 : : }
122 : :
123 : 1171 : count_.fetch_sub(1, std::memory_order_release);
124 : 1171 : return next;
125 : : }
126 : : }
127 : :
128 : : // Consumer: try dequeue once (non-blocking) — same as dequeue
129 : : T* try_dequeue() noexcept {
130 : : return dequeue();
131 : : }
132 : :
133 : 2970 : bool empty() const noexcept {
134 : 5940 : return count_.load(std::memory_order_acquire) == 0;
135 : : }
136 : :
137 : 126258 : int64_t count() const noexcept {
138 : 252516 : return count_.load(std::memory_order_acquire);
139 : : }
140 : :
141 : : private:
142 : : struct Stub : public T {
143 : 134 : Stub() : T() {}
144 : : };
145 : :
146 : : alignas(64) std::atomic<T*> head_; // newest element
147 : : alignas(64) std::atomic<T*> tail_; // always stub (dummy)
148 : : alignas(64) Stub stub_; // dummy anchor
149 : : std::atomic<int64_t> count_; // element count
150 : : std::atomic<T*> last_dequeued_{nullptr}; // for orphan recovery
151 : : };
152 : :
153 : : } // namespace hpactor::mailbox
|