Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #pragma once
16 : :
17 : : #include <hpactor/actor/typed_message.hpp>
18 : : #include <hpactor/cli/cli_types.hpp>
19 : : #include <hpactor/log/logger.hpp>
20 : : #include <hpactor/mailbox/mailbox_policy.hpp>
21 : : #include <hpactor/mailbox/mpsc_mailbox.hpp>
22 : : #include <hpactor/mem/memory_config.hpp>
23 : : #include <hpactor/metrics/metrics_event.hpp>
24 : : #include <hpactor/metrics/metrics_ring_buffer.hpp>
25 : : #include <hpactor/sched/scheduler.hpp>
26 : :
27 : : #include <atomic>
28 : : #include <functional>
29 : : #include <type_traits>
30 : :
31 : : namespace hpactor::mailbox {
32 : :
33 : : // Continuation callback type - called when actor should be resumed
34 : : using ActorContinuationCallback = std::function<void()>;
35 : :
36 : : template <typename T> class MPSCActorMailbox {
37 : : public:
38 : 134 : MPSCActorMailbox(ActorId actor_id, sched::IScheduler* scheduler,
39 : : MailboxConfig config = {}) noexcept
40 : 134 : : actor_id_(actor_id), scheduler_(scheduler), config_(config) {
41 : 134 : if (config_.capacity.max_messages == 0) {
42 : 0 : config_.capacity.max_messages = 1024;
43 : : }
44 : 134 : }
45 : :
46 : : // Set the continuation callback to resume the actor's coroutine
47 : 4 : void set_continuation_callback(ActorContinuationCallback callback) {
48 : 4 : continuation_callback_ = std::move(callback);
49 : 4 : }
50 : :
51 : : // Runtime config updates.
52 : : // Not safe to call concurrently with try_push/enqueue.
53 : : void set_config(const MailboxConfig& cfg) noexcept {
54 : : config_ = cfg;
55 : : if (config_.capacity.max_messages == 0) {
56 : : config_.capacity.max_messages = 1024;
57 : : }
58 : : }
59 : 2 : const MailboxConfig& config() const noexcept {
60 : 2 : return config_;
61 : : }
62 : :
63 : : // Bounded admission: try to enqueue with full feedback.
64 : : //
65 : : // Reserves a slot via CAS, allocates and enqueues the node, then returns
66 : : // an EnqueueResult describing the outcome. Returns Rejected when the
67 : : // mailbox is at hard capacity.
68 : 120639 : EnqueueResult try_push(T&& msg, MailboxEnvelopeMeta meta = {}) noexcept {
69 : 120639 : if (meta.estimated_bytes == 0) {
70 : 120638 : meta.estimated_bytes = estimate_node_bytes(msg);
71 : : }
72 : :
73 : : // Primary reservation attempt through normal capacity pool.
74 : 120639 : if (!try_reserve(meta.estimated_bytes)) {
75 : : // System messages get a second chance through the protected
76 : : // reserve.
77 : 119244 : bool sys_reserved = false;
78 : 119244 : if (is_system_message(meta.type_tag)) {
79 : 4 : sys_reserved = try_reserve_system(meta.estimated_bytes);
80 : : }
81 : :
82 : 119244 : if (!sys_reserved) {
83 : : // Apply overflow policy when both pools are exhausted.
84 : 119241 : switch (config_.overflow_policy) {
85 : 39747 : case OverflowPolicy::DropNewest:
86 : 39747 : total_dropped_.fetch_add(1, std::memory_order_relaxed);
87 : 39747 : if (metrics_ring_buffer_) [[unlikely]] {
88 : 0 : metrics::MetricEvent evt{};
89 : 0 : evt.actor_id = actor_id_;
90 : 0 : evt.event_type =
91 : : metrics::MetricEventType::kMailboxDropped;
92 : 0 : evt.value_hi = 1;
93 : 0 : metrics_ring_buffer_->try_push(evt);
94 : : }
95 : 39747 : return make_result(EnqueueResultCode::DroppedNewest);
96 : :
97 : 1 : case OverflowPolicy::DropOldest:
98 : 1 : if (drop_one_oldest()) {
99 : : // Freed a slot — retry normal reservation.
100 : 1 : if (!try_reserve(meta.estimated_bytes)) {
101 : 0 : total_rejected_.fetch_add(
102 : : 1, std::memory_order_relaxed);
103 : 0 : if (metrics_ring_buffer_) [[unlikely]] {
104 : 0 : metrics::MetricEvent evt{};
105 : 0 : evt.actor_id = actor_id_;
106 : 0 : evt.event_type =
107 : : metrics::MetricEventType::kMailboxRejected;
108 : 0 : evt.value_hi = 1;
109 : 0 : metrics_ring_buffer_->try_push(evt);
110 : : }
111 : 0 : return make_result(EnqueueResultCode::Rejected);
112 : : }
113 : : // Fall through to enqueue below.
114 : 1 : break;
115 : : }
116 : 0 : total_rejected_.fetch_add(1, std::memory_order_relaxed);
117 : 0 : if (metrics_ring_buffer_) [[unlikely]] {
118 : 0 : metrics::MetricEvent evt{};
119 : 0 : evt.actor_id = actor_id_;
120 : 0 : evt.event_type =
121 : : metrics::MetricEventType::kMailboxRejected;
122 : 0 : evt.value_hi = 1;
123 : 0 : metrics_ring_buffer_->try_push(evt);
124 : : }
125 : 0 : return make_result(EnqueueResultCode::Rejected);
126 : :
127 : 39745 : case OverflowPolicy::DeadLetter:
128 : 39745 : total_dead_letters_.fetch_add(1, std::memory_order_relaxed);
129 : 39745 : if (metrics_ring_buffer_) [[unlikely]] {
130 : 0 : metrics::MetricEvent evt{};
131 : 0 : evt.actor_id = actor_id_;
132 : 0 : evt.event_type =
133 : : metrics::MetricEventType::kMailboxDeadLetter;
134 : 0 : evt.value_hi = 1;
135 : 0 : metrics_ring_buffer_->try_push(evt);
136 : : }
137 : 39745 : return make_result(EnqueueResultCode::ReroutedToDeadLetter);
138 : :
139 : 39748 : default:
140 : : // RejectNewest, DropLowestPriority,
141 : : // SpillToOverflowQueue, SignalOnly,
142 : : // BlockWhenAllowed
143 : 39748 : total_rejected_.fetch_add(1, std::memory_order_relaxed);
144 : 39748 : if (metrics_ring_buffer_) [[unlikely]] {
145 : 2 : metrics::MetricEvent evt{};
146 : 2 : evt.actor_id = actor_id_;
147 : 2 : evt.event_type =
148 : : metrics::MetricEventType::kMailboxRejected;
149 : 2 : evt.value_hi = 1;
150 : 2 : metrics_ring_buffer_->try_push(evt);
151 : : }
152 : 39748 : return make_result(EnqueueResultCode::Rejected);
153 : : }
154 : : }
155 : : }
156 : :
157 : 1399 : void* raw = mem::allocate(mem::RegionType::kMessage, sizeof(T), actor_id_);
158 : 1399 : auto* node = new (raw) T(std::move(msg));
159 : 1399 : enqueue_reserved(node, meta);
160 : :
161 : 1399 : return make_result(pressure_code_after_accept());
162 : : }
163 : :
164 : : // Convenience: enqueue from a Message<T> rvalue (heap-allocates via custom
165 : : // allocator). Delegates to try_push so bounded admission is always
166 : : // applied; the result is discarded for callers that don't need feedback.
167 : 1 : void push(T&& msg) noexcept {
168 : 1 : (void)try_push(std::move(msg));
169 : 1 : }
170 : :
171 : : // Low-level: enqueue an already-allocated node.
172 : : //
173 : : // Attempts reservation before enqueuing. If the mailbox is at hard
174 : : // capacity the node is NOT enqueued (caller is responsible for the
175 : : // memory). Prefer try_push() for new code.
176 : 5 : void enqueue(T* node) noexcept {
177 : 5 : uint64_t bytes = estimate_node_bytes(*node);
178 : 5 : if (!try_reserve(bytes)) {
179 : 0 : total_rejected_.fetch_add(1, std::memory_order_relaxed);
180 : 0 : if (metrics_ring_buffer_) [[unlikely]] {
181 : 0 : metrics::MetricEvent evt{};
182 : 0 : evt.actor_id = actor_id_;
183 : 0 : evt.event_type = metrics::MetricEventType::kMailboxRejected;
184 : 0 : evt.value_hi = 1;
185 : 0 : metrics_ring_buffer_->try_push(evt);
186 : : }
187 : 0 : return;
188 : : }
189 : 5 : MailboxEnvelopeMeta meta;
190 : 5 : meta.estimated_bytes = bytes;
191 : 5 : enqueue_reserved(node, meta);
192 : : }
193 : :
194 : : // Enqueue an already-reserved node (reservation was done externally).
195 : 1404 : void enqueue_reserved(T* node, const MailboxEnvelopeMeta& meta) noexcept {
196 : 1404 : bool was_empty = empty();
197 : 1404 : mailbox_.enqueue(node);
198 : 1404 : total_enqueued_.fetch_add(1, std::memory_order_relaxed);
199 : 1404 : queued_bytes_.fetch_add(meta.estimated_bytes, std::memory_order_relaxed);
200 : 1404 : update_max_depth();
201 : 1404 : update_pressure_state();
202 : :
203 : 1404 : int64_t depth = mailbox_.count();
204 : 1404 : if (depth > 1024) [[unlikely]] {
205 : 0 : HPACTOR_LOG_WARNING(
206 : : log::LogCategory::kMailbox, actor_id_,
207 : : static_cast<uint32_t>(log::LogEventId::kMailboxDepthHigh),
208 : : "mailbox depth high",
209 : : log::field("depth", static_cast<uint64_t>(depth)));
210 : : }
211 : :
212 : 1404 : if (metrics_ring_buffer_) [[unlikely]] {
213 : 352 : metrics::MetricEvent evt{};
214 : 352 : evt.actor_id = actor_id_;
215 : 352 : evt.event_type = metrics::MetricEventType::kMailboxEnqueue;
216 : 352 : evt.value_hi = 1;
217 : 352 : metrics_ring_buffer_->try_push(evt);
218 : : }
219 : 1404 : if (was_empty) {
220 : 64 : bool expected = true;
221 : 64 : if (mailbox_was_empty_.compare_exchange_strong(
222 : : expected, false, std::memory_order_acq_rel,
223 : : std::memory_order_acquire)) {
224 : : // Directly resume the actor's continuation if available
225 : : // This avoids the latency of queuing and later pickup
226 : 63 : if (continuation_callback_) {
227 : 1 : continuation_callback_();
228 : : }
229 : : // Also notify scheduler for bookkeeping and potential requeue
230 : 63 : scheduler_->notify_ready(actor_id_, meta.priority, meta.deadline_ns);
231 : : }
232 : : }
233 : 1404 : }
234 : :
235 : : // Consumer: dequeue one message
236 : 1216 : T* dequeue() noexcept {
237 : 1216 : T* node = mailbox_.dequeue();
238 : 1216 : if (node != nullptr) {
239 : : // Release from the correct reservation pool.
240 : : if constexpr (std::is_same_v<T, TypedMessage>) {
241 : 1189 : if (is_system_message(node->type_id()) &&
242 : 38 : reserved_system_messages_.load(std::memory_order_acquire) > 0) {
243 : 1 : release_system_reservation(estimate_node_bytes(*node));
244 : : } else {
245 : 1169 : release_reservation(estimate_node_bytes(*node));
246 : : }
247 : : } else {
248 : : release_reservation(estimate_node_bytes(*node));
249 : : }
250 : 1170 : total_dequeued_.fetch_add(1, std::memory_order_relaxed);
251 : 1170 : if (empty()) {
252 : 59 : mailbox_was_empty_.store(true, std::memory_order_release);
253 : : }
254 : : }
255 : 1216 : if (metrics_ring_buffer_) [[unlikely]] {
256 : 430 : metrics::MetricEvent evt{};
257 : 430 : evt.actor_id = actor_id_;
258 : 430 : evt.event_type = metrics::MetricEventType::kMailboxDequeue;
259 : 430 : evt.value_hi = 1;
260 : 430 : metrics_ring_buffer_->try_push(evt);
261 : : }
262 : 1216 : return node;
263 : : }
264 : :
265 : : // Non-blocking pop matching ActorMailbox interface
266 : 1200 : bool try_pop(T& out) noexcept {
267 : 1200 : T* node = dequeue();
268 : 1200 : if (!node)
269 : 46 : return false;
270 : 1154 : out = std::move(*node);
271 : 1154 : node->~T();
272 : 1154 : mem::deallocate(node);
273 : 1154 : return true;
274 : : }
275 : :
276 : 2970 : bool empty() const noexcept {
277 : 2970 : return mailbox_.empty();
278 : : }
279 : :
280 : : // For MailboxAwaiter: was_empty before suspension?
281 : 31 : bool was_empty() const noexcept {
282 : 31 : return mailbox_was_empty_.load(std::memory_order_acquire);
283 : : }
284 : :
285 : : // Reset edge-trigger (called when actor suspends via await_suspend)
286 : 9 : void set_was_empty(bool val) noexcept {
287 : 9 : mailbox_was_empty_.store(val, std::memory_order_release);
288 : 9 : }
289 : :
290 : : void
291 : 108 : set_metrics_ring_buffer(metrics::MpscRingBuffer<metrics::MetricEvent>* buf) noexcept {
292 : 108 : metrics_ring_buffer_ = buf;
293 : 108 : }
294 : :
295 : 108 : void set_logger(log::Logger* logger) noexcept {
296 : 108 : logger_ = logger;
297 : 108 : }
298 : :
299 : : // Inject a message for testing (bypasses scheduler notify_ready).
300 : : // Must update reservation counters to keep dequeue accounting consistent.
301 : 51 : void inject_for_test(T* node) noexcept {
302 : 51 : reserved_messages_.fetch_add(1, std::memory_order_relaxed);
303 : 51 : total_enqueued_.fetch_add(1, std::memory_order_relaxed);
304 : 51 : mailbox_.enqueue(node);
305 : 51 : mailbox_was_empty_.store(false, std::memory_order_release);
306 : 51 : }
307 : :
308 : : // Return a snapshot of current mailbox stats.
309 : 8 : cli::MboxSnapshot snapshot() const {
310 : 8 : cli::MboxSnapshot s;
311 : 8 : s.depth = static_cast<uint32_t>(mailbox_.count());
312 : 8 : s.capacity = config_.capacity.max_messages;
313 : 8 : s.queued_bytes = queued_bytes_.load(std::memory_order_acquire);
314 : 8 : s.byte_capacity = config_.capacity.max_bytes;
315 : :
316 : 8 : if (s.capacity > 0) {
317 : 8 : double ratio =
318 : 8 : static_cast<double>(s.depth) / static_cast<double>(s.capacity);
319 : 8 : s.pressure_ratio_ppm = static_cast<uint32_t>(ratio * 1'000'000.0);
320 : : }
321 : :
322 : 8 : s.total_enqueued = total_enqueued_.load(std::memory_order_acquire);
323 : 8 : s.total_dequeued = total_dequeued_.load(std::memory_order_acquire);
324 : 8 : s.total_rejected = total_rejected_.load(std::memory_order_acquire);
325 : 8 : s.total_dropped = total_dropped_.load(std::memory_order_acquire);
326 : 8 : s.total_dead_letters = total_dead_letters_.load(std::memory_order_acquire);
327 : 8 : s.max_depth = max_depth_.load(std::memory_order_acquire);
328 : 8 : s.high_priority_depth = 0;
329 : 8 : s.pressure_state =
330 : : to_string(pressure_state_.load(std::memory_order_acquire));
331 : 8 : s.overflow_policy = to_string(config_.overflow_policy);
332 : 8 : return s;
333 : : }
334 : :
335 : : private:
336 : : // Try to reserve one message slot via CAS.
337 : : // Byte budget tracking is deferred; bytes param reserved for future use.
338 : : // Returns false when at hard capacity.
339 : 120645 : bool try_reserve(uint64_t /*bytes*/) noexcept {
340 : 120645 : uint32_t cap = config_.capacity.max_messages;
341 : 120645 : if (cap == 0)
342 : 0 : return true; // unlimited
343 : :
344 : 241290 : uint32_t current = reserved_messages_.load(std::memory_order_acquire);
345 : : while (true) {
346 : 120667 : if (current >= cap) {
347 : 119244 : return false;
348 : : }
349 : 2846 : if (reserved_messages_.compare_exchange_weak(
350 : : current, current + 1, std::memory_order_acq_rel,
351 : : std::memory_order_acquire)) {
352 : 1401 : return true;
353 : : }
354 : : }
355 : : }
356 : :
357 : : // Release a previously reserved slot + bytes.
358 : 1170 : void release_reservation(uint64_t bytes) noexcept {
359 : 1170 : reserved_messages_.fetch_sub(1, std::memory_order_release);
360 : 1170 : if (bytes > 0) {
361 : 1170 : queued_bytes_.fetch_sub(bytes, std::memory_order_release);
362 : : }
363 : 1170 : }
364 : :
365 : : // Try to reserve a system message slot via CAS on the protected reserve.
366 : : // Only used when the normal capacity pool is exhausted.
367 : 4 : bool try_reserve_system(uint64_t /*bytes*/) noexcept {
368 : 4 : uint32_t limit = config_.protected_system_messages;
369 : 4 : if (limit == 0)
370 : 0 : return false;
371 : :
372 : 4 : uint32_t current =
373 : 8 : reserved_system_messages_.load(std::memory_order_acquire);
374 : : while (true) {
375 : 4 : if (current >= limit) {
376 : 1 : return false;
377 : : }
378 : 6 : if (reserved_system_messages_.compare_exchange_weak(
379 : : current, current + 1, std::memory_order_acq_rel,
380 : : std::memory_order_acquire)) {
381 : 3 : return true;
382 : : }
383 : : }
384 : : }
385 : :
386 : : // Release a previously reserved system slot + bytes.
387 : 1 : void release_system_reservation(uint64_t bytes) noexcept {
388 : 1 : reserved_system_messages_.fetch_sub(1, std::memory_order_release);
389 : 1 : if (bytes > 0 && config_.capacity.max_bytes > 0) {
390 : 0 : queued_bytes_.fetch_sub(bytes, std::memory_order_release);
391 : : }
392 : 1 : }
393 : :
394 : : // Drop the oldest message from the mailbox to free a slot.
395 : : // Returns true if a message was successfully dropped.
396 : 1 : bool drop_one_oldest() noexcept {
397 : 1 : T* node = mailbox_.dequeue();
398 : 1 : if (!node)
399 : 0 : return false;
400 : :
401 : : // Release from the correct pool and update drop counter.
402 : : if constexpr (std::is_same_v<T, TypedMessage>) {
403 : 1 : if (is_system_message(node->type_id()) &&
404 : 0 : reserved_system_messages_.load(std::memory_order_acquire) > 0) {
405 : 0 : release_system_reservation(estimate_node_bytes(*node));
406 : : } else {
407 : 1 : release_reservation(estimate_node_bytes(*node));
408 : : }
409 : : } else {
410 : : release_reservation(estimate_node_bytes(*node));
411 : : }
412 : :
413 : 1 : total_dropped_.fetch_add(1, std::memory_order_relaxed);
414 : 1 : if (metrics_ring_buffer_) [[unlikely]] {
415 : 0 : metrics::MetricEvent evt{};
416 : 0 : evt.actor_id = actor_id_;
417 : 0 : evt.event_type = metrics::MetricEventType::kMailboxDropped;
418 : 0 : evt.value_hi = 1;
419 : 0 : metrics_ring_buffer_->try_push(evt);
420 : : }
421 : :
422 : 1 : node->~T();
423 : 1 : mem::deallocate(node);
424 : :
425 : 1 : if (empty()) {
426 : 1 : mailbox_was_empty_.store(true, std::memory_order_release);
427 : : }
428 : :
429 : 1 : return true;
430 : : }
431 : :
432 : : // Determine the result code after accepting a message, based on current
433 : : // watermarks.
434 : 1399 : EnqueueResultCode pressure_code_after_accept() const noexcept {
435 : 1399 : uint32_t cap = config_.capacity.max_messages;
436 : 1399 : if (cap == 0)
437 : 0 : return EnqueueResultCode::Accepted;
438 : 1399 : uint32_t depth = static_cast<uint32_t>(mailbox_.count());
439 : 1399 : double ratio = static_cast<double>(depth) / static_cast<double>(cap);
440 : 1399 : if (ratio >= config_.high_watermark) {
441 : 224 : return EnqueueResultCode::AcceptedWithSoftPressure;
442 : : }
443 : 1175 : return EnqueueResultCode::Accepted;
444 : : }
445 : :
446 : : // Fill an EnqueueResult from the given code and current state.
447 : 120639 : EnqueueResult make_result(EnqueueResultCode code) const noexcept {
448 : 120639 : EnqueueResult r;
449 : 120639 : r.code = code;
450 : 120639 : r.target = actor_id_;
451 : 120639 : r.depth = static_cast<uint32_t>(mailbox_.count());
452 : 120639 : r.capacity = config_.capacity.max_messages;
453 : 120639 : if (r.capacity > 0) {
454 : 120639 : r.pressure_ratio =
455 : 120639 : static_cast<double>(r.depth) / static_cast<double>(r.capacity);
456 : : }
457 : 120639 : return r;
458 : : }
459 : :
460 : : // Update max_depth_ tracking via CAS.
461 : 1404 : void update_max_depth() noexcept {
462 : 1404 : uint64_t depth = static_cast<uint64_t>(mailbox_.count());
463 : 1404 : uint64_t prev = max_depth_.load(std::memory_order_acquire);
464 : 1426 : while (depth > prev) {
465 : 2774 : if (max_depth_.compare_exchange_weak(prev, depth, std::memory_order_acq_rel,
466 : : std::memory_order_acquire)) {
467 : 1365 : break;
468 : : }
469 : : }
470 : 1404 : }
471 : :
472 : : // Update pressure state based on current depth vs watermarks.
473 : 1404 : void update_pressure_state() noexcept {
474 : 1404 : uint32_t cap = config_.capacity.max_messages;
475 : 1404 : if (cap == 0) {
476 : 0 : pressure_state_.store(MailboxPressureState::Normal,
477 : : std::memory_order_release);
478 : 0 : return;
479 : : }
480 : 1404 : uint32_t depth = static_cast<uint32_t>(mailbox_.count());
481 : 1404 : double ratio = static_cast<double>(depth) / static_cast<double>(cap);
482 : 1404 : if (ratio >= config_.high_watermark) {
483 : 224 : pressure_state_.store(MailboxPressureState::SoftPressure,
484 : : std::memory_order_release);
485 : : } else {
486 : 1180 : pressure_state_.store(MailboxPressureState::Normal,
487 : : std::memory_order_release);
488 : : }
489 : : }
490 : :
491 : : // Estimate bytes for a node. Uses the TypedMessage-specific helper when T
492 : : // is TypedMessage, otherwise falls back to sizeof(T).
493 : 121814 : static uint64_t estimate_node_bytes(const T& node) noexcept {
494 : : if constexpr (std::is_same_v<T, TypedMessage>) {
495 : 121814 : return estimate_message_bytes(node);
496 : : } else {
497 : : return sizeof(T);
498 : : }
499 : : }
500 : :
501 : : ActorId actor_id_;
502 : : sched::IScheduler* scheduler_;
503 : : MPSCMailbox<T> mailbox_;
504 : : MailboxConfig config_;
505 : : std::atomic<bool> mailbox_was_empty_{true};
506 : : std::atomic<uint32_t> reserved_messages_{0};
507 : : std::atomic<uint32_t> reserved_system_messages_{0};
508 : : std::atomic<uint64_t> queued_bytes_{0};
509 : : std::atomic<uint64_t> total_enqueued_{0};
510 : : std::atomic<uint64_t> total_dequeued_{0};
511 : : std::atomic<uint64_t> total_rejected_{0};
512 : : std::atomic<uint64_t> total_dropped_{0};
513 : : std::atomic<uint64_t> total_dead_letters_{0};
514 : : std::atomic<uint64_t> max_depth_{0};
515 : : std::atomic<MailboxPressureState> pressure_state_{MailboxPressureState::Normal};
516 : : ActorContinuationCallback continuation_callback_;
517 : : metrics::MpscRingBuffer<metrics::MetricEvent>* metrics_ring_buffer_{nullptr};
518 : : log::Logger* logger_ = nullptr;
519 : : };
520 : :
521 : : } // namespace hpactor::mailbox
|