Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #pragma once
16 : :
17 : : #include <atomic>
18 : : #include <cstddef>
19 : : #include <cstdint>
20 : : #include <vector>
21 : :
22 : : namespace hpactor::metrics {
23 : :
24 : : // Lock-free MPSC ring buffer. Multi-producer, single-consumer.
25 : : // Capacity must be a power of two.
26 : : // Producer: try_push(value) writes the slot then CAS-claims it with a release fence.
27 : : // Consumer: drain(callback) reads all committed slots since last drain.
28 : : template <typename T, size_t Capacity = 65536>
29 : : class MpscRingBuffer {
30 : : static_assert((Capacity & (Capacity - 1)) == 0,
31 : : "Capacity must be a power of two");
32 : :
33 : : public:
34 : : static constexpr size_t kDefaultCapacity = Capacity;
35 : :
36 : 232 : MpscRingBuffer() : buffer_(Capacity) {}
37 : :
38 : : // Producer: write the value, then CAS-claim the slot with a release fence
39 : : // to ensure the write is visible before the consumer sees the increment.
40 : 6655 : bool try_push(const T& value) noexcept {
41 : 13310 : uint64_t w = write_cursor_.load(std::memory_order_relaxed);
42 : : do {
43 : 19692 : if (w - read_cursor_.load(std::memory_order_acquire) >= Capacity) {
44 : 1 : events_lost_.fetch_add(1, std::memory_order_relaxed);
45 : 1 : return false;
46 : : }
47 : 19690 : } while (!write_cursor_.compare_exchange_weak(
48 : : w, w + 1, std::memory_order_acquire, std::memory_order_relaxed));
49 : 6654 : buffer_[w & mask_] = value;
50 : : std::atomic_thread_fence(std::memory_order_release);
51 : 6654 : return true;
52 : : }
53 : :
54 : : // Consumer: drain all committed slots since last drain.
55 : : template <typename Fn>
56 : 3148 : size_t drain(Fn&& callback) {
57 : 3148 : uint64_t r = read_cursor_.load(std::memory_order_relaxed);
58 : 6296 : uint64_t w = write_cursor_.load(std::memory_order_acquire);
59 : : std::atomic_thread_fence(std::memory_order_acquire);
60 : 3148 : size_t count = 0;
61 : 8165 : while (r < w) {
62 : 5017 : callback(buffer_[r & mask_]);
63 : 5017 : ++r;
64 : 5017 : ++count;
65 : : }
66 : 3148 : read_cursor_.store(r, std::memory_order_release);
67 : 3148 : return count;
68 : : }
69 : :
70 : 0 : uint64_t events_lost() const noexcept {
71 : 0 : return events_lost_.load(std::memory_order_relaxed);
72 : : }
73 : :
74 : 6 : size_t size() const noexcept {
75 : 6 : uint64_t w = write_cursor_.load(std::memory_order_acquire);
76 : 6 : uint64_t r = read_cursor_.load(std::memory_order_relaxed);
77 : 6 : return static_cast<size_t>(w - r);
78 : : }
79 : :
80 : 4 : bool empty() const noexcept { return size() == 0; }
81 : :
82 : : private:
83 : : static constexpr size_t mask_ = Capacity - 1;
84 : : alignas(64) std::atomic<uint64_t> write_cursor_{0};
85 : : alignas(64) std::atomic<uint64_t> read_cursor_{0};
86 : : alignas(64) std::atomic<uint64_t> events_lost_{0};
87 : : std::vector<T> buffer_;
88 : : };
89 : :
90 : : } // namespace hpactor::metrics
|