Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #include <hpactor/mailbox/dead_letter_queue.hpp>
16 : :
17 : : namespace hpactor::mailbox {
18 : :
19 : 114 : DeadLetterQueue::DeadLetterQueue(DeadLetterConfig config) : config_(config) {
20 : 114 : if (config_.capacity == 0) {
21 : 0 : config_.capacity = 4096;
22 : : }
23 : 114 : }
24 : :
25 : 37 : void DeadLetterQueue::trim_payload(DeadLetterRecord& record) const {
26 : 37 : record.payload_size = static_cast<uint32_t>(record.payload_sample.size());
27 : 37 : if (!config_.store_payload) {
28 : 0 : record.payload_sample.clear();
29 : 0 : return;
30 : : }
31 : 37 : if (record.payload_sample.size() > config_.max_payload_sample_bytes) {
32 : 1 : record.payload_sample.resize(config_.max_payload_sample_bytes);
33 : : }
34 : : }
35 : :
36 : 37 : bool DeadLetterQueue::try_push(DeadLetterRecord&& record) noexcept {
37 : 37 : if (!config_.enabled) {
38 : 0 : return false;
39 : : }
40 : :
41 : 37 : trim_payload(record);
42 : :
43 : 37 : std::lock_guard<std::mutex> lock(mutex_);
44 : 37 : if (records_.size() >= config_.capacity) {
45 : 1 : switch (config_.overflow_policy) {
46 : 1 : case DeadLetterOverflowPolicy::DropOldestRecord:
47 : 1 : records_.pop_front();
48 : 1 : total_lost_++;
49 : 1 : break;
50 : 0 : case DeadLetterOverflowPolicy::DropNewestRecord:
51 : 0 : total_lost_++;
52 : 0 : return false;
53 : 0 : case DeadLetterOverflowPolicy::MetadataOnly:
54 : 0 : record.payload_sample.clear();
55 : 0 : if (records_.size() >= config_.capacity) {
56 : 0 : records_.pop_front();
57 : 0 : total_lost_++;
58 : : }
59 : 0 : break;
60 : : }
61 : : }
62 : :
63 : 37 : records_.push_back(std::move(record));
64 : 37 : total_pushed_++;
65 : 37 : return true;
66 : 37 : }
67 : :
68 : 26 : bool DeadLetterQueue::try_pop(DeadLetterRecord& out) noexcept {
69 : 26 : std::lock_guard<std::mutex> lock(mutex_);
70 : 26 : if (records_.empty()) {
71 : 5 : return false;
72 : : }
73 : 21 : out = std::move(records_.front());
74 : 21 : records_.pop_front();
75 : 21 : total_popped_++;
76 : 21 : return true;
77 : 26 : }
78 : :
79 : 6 : DeadLetterQueueSnapshot DeadLetterQueue::snapshot() const noexcept {
80 : 6 : std::lock_guard<std::mutex> lock(mutex_);
81 : 6 : DeadLetterQueueSnapshot s;
82 : 6 : s.depth = static_cast<uint32_t>(records_.size());
83 : 6 : s.capacity = config_.capacity;
84 : 6 : s.total_pushed = total_pushed_;
85 : 6 : s.total_popped = total_popped_;
86 : 6 : s.total_lost = total_lost_;
87 : 6 : return s;
88 : 6 : }
89 : :
90 : : } // namespace hpactor::mailbox
|