Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #include "hpactor/log/log_drain.hpp"
16 : :
17 : : #include <chrono>
18 : : #include <thread>
19 : :
20 : : #include "hpactor/log/log_config.hpp"
21 : : #include "hpactor/log/log_event.hpp"
22 : : #include "hpactor/log/log_formatter.hpp"
23 : : #include "hpactor/log/log_level.hpp"
24 : : #include "hpactor/log/log_ring_buffer.hpp"
25 : : #include "hpactor/log/log_sink.hpp"
26 : :
27 : : namespace hpactor::log {
28 : :
29 : 112 : LogDrain::LogDrain(LogRingBuffer& buffer, ILogFormatter& formatter,
30 : : std::vector<std::unique_ptr<ILogSink>> sinks,
31 : 112 : const LogConfig& config) noexcept
32 : 112 : : buffer_(buffer), formatter_(formatter), sinks_(std::move(sinks)),
33 : 112 : config_(config) {}
34 : :
35 : 112 : LogDrain::~LogDrain() {
36 : 112 : stop();
37 : 112 : }
38 : :
39 : 111 : void LogDrain::start() {
40 : 111 : running_.store(true, std::memory_order_release);
41 : 111 : thread_ = std::thread(&LogDrain::run, this);
42 : 111 : }
43 : :
44 : 335 : void LogDrain::stop() noexcept {
45 : 335 : bool expected = true;
46 : 335 : if (!running_.compare_exchange_strong(expected, false,
47 : : std::memory_order_acq_rel)) {
48 : 224 : return; // Wasn't running — nothing to stop.
49 : : }
50 : :
51 : 111 : if (thread_.joinable()) {
52 : 111 : thread_.join();
53 : : }
54 : :
55 : : // Final drain of any remaining events that arrived during shutdown.
56 : 111 : std::string buf;
57 : 111 : buf.reserve(4096);
58 : 111 : buffer_.drain([&](const LogEvent& event) {
59 : 542 : buf.clear();
60 : 542 : formatter_.format(event, buf);
61 : 1084 : for (auto& sink : sinks_) {
62 : 542 : auto result = sink->write(buf);
63 : 542 : if (!result.has_value()) {
64 : 0 : sink_errors_.fetch_add(1, std::memory_order_relaxed);
65 : : }
66 : 542 : }
67 : 542 : });
68 : :
69 : 222 : for (auto& sink : sinks_) {
70 : 111 : sink->flush();
71 : : }
72 : 111 : }
73 : :
74 : 0 : void LogDrain::nudge() noexcept {
75 : : // No-op. The drain thread polls every 100 ms, so a wake-up signal is not
76 : : // needed for this simplified initial implementation.
77 : 0 : }
78 : :
79 : 111 : void LogDrain::run() {
80 : 111 : std::string buf;
81 : 111 : buf.reserve(4096);
82 : :
83 : 332 : while (running_.load(std::memory_order_relaxed)) {
84 : 221 : buffer_.drain([&](const LogEvent& event) {
85 : 30 : buf.clear();
86 : 30 : formatter_.format(event, buf);
87 : 60 : for (auto& sink : sinks_) {
88 : 30 : auto result = sink->write(buf);
89 : 30 : if (!result.has_value()) {
90 : 0 : sink_errors_.fetch_add(1, std::memory_order_relaxed);
91 : : }
92 : 30 : }
93 : 30 : if (static_cast<uint8_t>(event.level) <=
94 : 30 : static_cast<uint8_t>(config_.flush_on_level)) {
95 : 0 : for (auto& sink : sinks_) {
96 : 0 : sink->flush();
97 : : }
98 : : }
99 : 30 : });
100 : :
101 : 221 : std::this_thread::sleep_for(std::chrono::milliseconds(100));
102 : : }
103 : 111 : }
104 : :
105 : : } // namespace hpactor::log
|