Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #pragma once
16 : :
17 : : #include "hpactor/log/log_event.hpp"
18 : : #include <atomic>
19 : : #include <cstdint>
20 : : #include <cstdio>
21 : : #include <cstdlib>
22 : : #include <memory>
23 : :
24 : : namespace hpactor::log {
25 : :
26 : : class LogRingBuffer {
27 : : public:
28 : 116 : explicit LogRingBuffer(size_t capacity)
29 : 116 : : capacity_(capacity), mask_(capacity - 1) {
30 : 116 : if (capacity == 0 || (capacity & (capacity - 1)) != 0) {
31 : 0 : std::fprintf(stderr,
32 : : "LogRingBuffer capacity must be a power of two, got "
33 : : "%zu\n",
34 : : capacity);
35 : 0 : std::abort();
36 : : }
37 : 116 : buffer_ = std::make_unique<LogEvent[]>(capacity);
38 : 116 : }
39 : :
40 : 40571 : bool try_push(const LogEvent& value) noexcept {
41 : 40571 : uint64_t w = write_cursor_.load(std::memory_order_relaxed);
42 : 40571 : uint64_t r = read_cursor_.load(std::memory_order_acquire);
43 : :
44 : 40571 : if (w - r >= capacity_) {
45 : 38961 : events_lost_.fetch_add(1, std::memory_order_relaxed);
46 : 38961 : return false;
47 : : }
48 : :
49 : 4446 : while (!write_cursor_.compare_exchange_weak(
50 : : w, w + 1, std::memory_order_acquire, std::memory_order_relaxed)) {
51 : 614 : r = read_cursor_.load(std::memory_order_acquire);
52 : 614 : if (w - r >= capacity_) {
53 : 1 : events_lost_.fetch_add(1, std::memory_order_relaxed);
54 : 1 : return false;
55 : : }
56 : : }
57 : :
58 : 1609 : buffer_[w & mask_] = value;
59 : : std::atomic_thread_fence(std::memory_order_release);
60 : 1609 : return true;
61 : : }
62 : :
63 : 335 : template <typename Fn> size_t drain(Fn&& callback) {
64 : 335 : uint64_t r = read_cursor_.load(std::memory_order_relaxed);
65 : 670 : uint64_t w = write_cursor_.load(std::memory_order_acquire);
66 : : std::atomic_thread_fence(std::memory_order_acquire);
67 : :
68 : 335 : size_t count = 0;
69 : 1940 : while (r < w) {
70 : 1605 : callback(buffer_[r & mask_]);
71 : 1605 : ++r;
72 : 1605 : ++count;
73 : : }
74 : :
75 : 335 : read_cursor_.store(r, std::memory_order_release);
76 : 335 : return count;
77 : : }
78 : :
79 : 5 : uint64_t events_lost() const noexcept {
80 : 10 : return events_lost_.load(std::memory_order_relaxed);
81 : : }
82 : :
83 : 6 : size_t size() const noexcept {
84 : 6 : uint64_t w = write_cursor_.load(std::memory_order_acquire);
85 : 6 : uint64_t r = read_cursor_.load(std::memory_order_acquire);
86 : 6 : return static_cast<size_t>(w - r);
87 : : }
88 : :
89 : 4 : bool empty() const noexcept {
90 : 4 : return size() == 0;
91 : : }
92 : :
93 : : private:
94 : : size_t capacity_;
95 : : size_t mask_;
96 : : std::unique_ptr<LogEvent[]> buffer_;
97 : :
98 : : alignas(64) std::atomic<uint64_t> write_cursor_{0};
99 : : alignas(64) std::atomic<uint64_t> read_cursor_{0};
100 : : alignas(64) std::atomic<uint64_t> events_lost_{0};
101 : : };
102 : :
103 : : } // namespace hpactor::log
|