LCOV - code coverage report
Current view: top level - include/hpactor/mailbox - mpsc_actor_mailbox.hpp (source / functions) Coverage Total Hit
Test: HPActor Coverage Lines: 80.8 % 261 211
Test Date: 2026-05-20 02:24:49 Functions: 100.0 % 26 26
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: - 0 0

             Branch data     Line data    Source code
       1                 :             : // Copyright 2026 HPActor Contributors
       2                 :             : //
       3                 :             : // Licensed under the Apache License, Version 2.0 (the "License");
       4                 :             : // you may not use this file except in compliance with the License.
       5                 :             : // You may obtain a copy of the License at
       6                 :             : //
       7                 :             : //     http://www.apache.org/licenses/LICENSE-2.0
       8                 :             : //
       9                 :             : // Unless required by applicable law or agreed to in writing, software
      10                 :             : // distributed under the License is distributed on an "AS IS" BASIS,
      11                 :             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12                 :             : // See the License for the specific language governing permissions and
      13                 :             : // limitations under the License.
      14                 :             : 
      15                 :             : #pragma once
      16                 :             : 
      17                 :             : #include <hpactor/actor/typed_message.hpp>
      18                 :             : #include <hpactor/cli/cli_types.hpp>
      19                 :             : #include <hpactor/log/logger.hpp>
      20                 :             : #include <hpactor/mailbox/mailbox_policy.hpp>
      21                 :             : #include <hpactor/mailbox/mpsc_mailbox.hpp>
      22                 :             : #include <hpactor/mem/memory_config.hpp>
      23                 :             : #include <hpactor/metrics/metrics_event.hpp>
      24                 :             : #include <hpactor/metrics/metrics_ring_buffer.hpp>
      25                 :             : #include <hpactor/sched/scheduler.hpp>
      26                 :             : 
      27                 :             : #include <atomic>
      28                 :             : #include <functional>
      29                 :             : #include <type_traits>
      30                 :             : 
      31                 :             : namespace hpactor::mailbox {
      32                 :             : 
      33                 :             : // Continuation callback type - called when actor should be resumed
      34                 :             : using ActorContinuationCallback = std::function<void()>;
      35                 :             : 
      36                 :             : template <typename T> class MPSCActorMailbox {
      37                 :             :   public:
      38                 :         134 :     MPSCActorMailbox(ActorId actor_id, sched::IScheduler* scheduler,
      39                 :             :                      MailboxConfig config = {}) noexcept
      40                 :         134 :         : actor_id_(actor_id), scheduler_(scheduler), config_(config) {
      41                 :         134 :         if (config_.capacity.max_messages == 0) {
      42                 :           0 :             config_.capacity.max_messages = 1024;
      43                 :             :         }
      44                 :         134 :     }
      45                 :             : 
      46                 :             :     // Set the continuation callback to resume the actor's coroutine
      47                 :           4 :     void set_continuation_callback(ActorContinuationCallback callback) {
      48                 :           4 :         continuation_callback_ = std::move(callback);
      49                 :           4 :     }
      50                 :             : 
      51                 :             :     // Runtime config updates.
      52                 :             :     // Not safe to call concurrently with try_push/enqueue.
      53                 :             :     void set_config(const MailboxConfig& cfg) noexcept {
      54                 :             :         config_ = cfg;
      55                 :             :         if (config_.capacity.max_messages == 0) {
      56                 :             :             config_.capacity.max_messages = 1024;
      57                 :             :         }
      58                 :             :     }
      59                 :           2 :     const MailboxConfig& config() const noexcept {
      60                 :           2 :         return config_;
      61                 :             :     }
      62                 :             : 
      63                 :             :     // Bounded admission: try to enqueue with full feedback.
      64                 :             :     //
      65                 :             :     // Reserves a slot via CAS, allocates and enqueues the node, then returns
      66                 :             :     // an EnqueueResult describing the outcome.  Returns Rejected when the
      67                 :             :     // mailbox is at hard capacity.
      68                 :      120639 :     EnqueueResult try_push(T&& msg, MailboxEnvelopeMeta meta = {}) noexcept {
      69                 :      120639 :         if (meta.estimated_bytes == 0) {
      70                 :      120638 :             meta.estimated_bytes = estimate_node_bytes(msg);
      71                 :             :         }
      72                 :             : 
      73                 :             :         // Primary reservation attempt through normal capacity pool.
      74                 :      120639 :         if (!try_reserve(meta.estimated_bytes)) {
      75                 :             :             // System messages get a second chance through the protected
      76                 :             :             // reserve.
      77                 :      119244 :             bool sys_reserved = false;
      78                 :      119244 :             if (is_system_message(meta.type_tag)) {
      79                 :           4 :                 sys_reserved = try_reserve_system(meta.estimated_bytes);
      80                 :             :             }
      81                 :             : 
      82                 :      119244 :             if (!sys_reserved) {
      83                 :             :                 // Apply overflow policy when both pools are exhausted.
      84                 :      119241 :                 switch (config_.overflow_policy) {
      85                 :       39747 :                     case OverflowPolicy::DropNewest:
      86                 :       39747 :                         total_dropped_.fetch_add(1, std::memory_order_relaxed);
      87                 :       39747 :                         if (metrics_ring_buffer_) [[unlikely]] {
      88                 :           0 :                             metrics::MetricEvent evt{};
      89                 :           0 :                             evt.actor_id = actor_id_;
      90                 :           0 :                             evt.event_type =
      91                 :             :                                 metrics::MetricEventType::kMailboxDropped;
      92                 :           0 :                             evt.value_hi = 1;
      93                 :           0 :                             metrics_ring_buffer_->try_push(evt);
      94                 :             :                         }
      95                 :       39747 :                         return make_result(EnqueueResultCode::DroppedNewest);
      96                 :             : 
      97                 :           1 :                     case OverflowPolicy::DropOldest:
      98                 :           1 :                         if (drop_one_oldest()) {
      99                 :             :                             // Freed a slot — retry normal reservation.
     100                 :           1 :                             if (!try_reserve(meta.estimated_bytes)) {
     101                 :           0 :                                 total_rejected_.fetch_add(
     102                 :             :                                     1, std::memory_order_relaxed);
     103                 :           0 :                                 if (metrics_ring_buffer_) [[unlikely]] {
     104                 :           0 :                                     metrics::MetricEvent evt{};
     105                 :           0 :                                     evt.actor_id = actor_id_;
     106                 :           0 :                                     evt.event_type =
     107                 :             :                                         metrics::MetricEventType::kMailboxRejected;
     108                 :           0 :                                     evt.value_hi = 1;
     109                 :           0 :                                     metrics_ring_buffer_->try_push(evt);
     110                 :             :                                 }
     111                 :           0 :                                 return make_result(EnqueueResultCode::Rejected);
     112                 :             :                             }
     113                 :             :                             // Fall through to enqueue below.
     114                 :           1 :                             break;
     115                 :             :                         }
     116                 :           0 :                         total_rejected_.fetch_add(1, std::memory_order_relaxed);
     117                 :           0 :                         if (metrics_ring_buffer_) [[unlikely]] {
     118                 :           0 :                             metrics::MetricEvent evt{};
     119                 :           0 :                             evt.actor_id = actor_id_;
     120                 :           0 :                             evt.event_type =
     121                 :             :                                 metrics::MetricEventType::kMailboxRejected;
     122                 :           0 :                             evt.value_hi = 1;
     123                 :           0 :                             metrics_ring_buffer_->try_push(evt);
     124                 :             :                         }
     125                 :           0 :                         return make_result(EnqueueResultCode::Rejected);
     126                 :             : 
     127                 :       39745 :                     case OverflowPolicy::DeadLetter:
     128                 :       39745 :                         total_dead_letters_.fetch_add(1, std::memory_order_relaxed);
     129                 :       39745 :                         if (metrics_ring_buffer_) [[unlikely]] {
     130                 :           0 :                             metrics::MetricEvent evt{};
     131                 :           0 :                             evt.actor_id = actor_id_;
     132                 :           0 :                             evt.event_type =
     133                 :             :                                 metrics::MetricEventType::kMailboxDeadLetter;
     134                 :           0 :                             evt.value_hi = 1;
     135                 :           0 :                             metrics_ring_buffer_->try_push(evt);
     136                 :             :                         }
     137                 :       39745 :                         return make_result(EnqueueResultCode::ReroutedToDeadLetter);
     138                 :             : 
     139                 :       39748 :                     default:
     140                 :             :                         // RejectNewest, DropLowestPriority,
     141                 :             :                         // SpillToOverflowQueue, SignalOnly,
     142                 :             :                         // BlockWhenAllowed
     143                 :       39748 :                         total_rejected_.fetch_add(1, std::memory_order_relaxed);
     144                 :       39748 :                         if (metrics_ring_buffer_) [[unlikely]] {
     145                 :           2 :                             metrics::MetricEvent evt{};
     146                 :           2 :                             evt.actor_id = actor_id_;
     147                 :           2 :                             evt.event_type =
     148                 :             :                                 metrics::MetricEventType::kMailboxRejected;
     149                 :           2 :                             evt.value_hi = 1;
     150                 :           2 :                             metrics_ring_buffer_->try_push(evt);
     151                 :             :                         }
     152                 :       39748 :                         return make_result(EnqueueResultCode::Rejected);
     153                 :             :                 }
     154                 :             :             }
     155                 :             :         }
     156                 :             : 
     157                 :        1399 :         void* raw = mem::allocate(mem::RegionType::kMessage, sizeof(T), actor_id_);
     158                 :        1399 :         auto* node = new (raw) T(std::move(msg));
     159                 :        1399 :         enqueue_reserved(node, meta);
     160                 :             : 
     161                 :        1399 :         return make_result(pressure_code_after_accept());
     162                 :             :     }
     163                 :             : 
     164                 :             :     // Convenience: enqueue from a Message<T> rvalue (heap-allocates via custom
     165                 :             :     // allocator).  Delegates to try_push so bounded admission is always
     166                 :             :     // applied; the result is discarded for callers that don't need feedback.
     167                 :           1 :     void push(T&& msg) noexcept {
     168                 :           1 :         (void)try_push(std::move(msg));
     169                 :           1 :     }
     170                 :             : 
     171                 :             :     // Low-level: enqueue an already-allocated node.
     172                 :             :     //
     173                 :             :     // Attempts reservation before enqueuing.  If the mailbox is at hard
     174                 :             :     // capacity the node is NOT enqueued (caller is responsible for the
     175                 :             :     // memory).  Prefer try_push() for new code.
     176                 :           5 :     void enqueue(T* node) noexcept {
     177                 :           5 :         uint64_t bytes = estimate_node_bytes(*node);
     178                 :           5 :         if (!try_reserve(bytes)) {
     179                 :           0 :             total_rejected_.fetch_add(1, std::memory_order_relaxed);
     180                 :           0 :             if (metrics_ring_buffer_) [[unlikely]] {
     181                 :           0 :                 metrics::MetricEvent evt{};
     182                 :           0 :                 evt.actor_id = actor_id_;
     183                 :           0 :                 evt.event_type = metrics::MetricEventType::kMailboxRejected;
     184                 :           0 :                 evt.value_hi = 1;
     185                 :           0 :                 metrics_ring_buffer_->try_push(evt);
     186                 :             :             }
     187                 :           0 :             return;
     188                 :             :         }
     189                 :           5 :         MailboxEnvelopeMeta meta;
     190                 :           5 :         meta.estimated_bytes = bytes;
     191                 :           5 :         enqueue_reserved(node, meta);
     192                 :             :     }
     193                 :             : 
     194                 :             :     // Enqueue an already-reserved node (reservation was done externally).
     195                 :        1404 :     void enqueue_reserved(T* node, const MailboxEnvelopeMeta& meta) noexcept {
     196                 :        1404 :         bool was_empty = empty();
     197                 :        1404 :         mailbox_.enqueue(node);
     198                 :        1404 :         total_enqueued_.fetch_add(1, std::memory_order_relaxed);
     199                 :        1404 :         queued_bytes_.fetch_add(meta.estimated_bytes, std::memory_order_relaxed);
     200                 :        1404 :         update_max_depth();
     201                 :        1404 :         update_pressure_state();
     202                 :             : 
     203                 :        1404 :         int64_t depth = mailbox_.count();
     204                 :        1404 :         if (depth > 1024) [[unlikely]] {
     205                 :           0 :             HPACTOR_LOG_WARNING(
     206                 :             :                 log::LogCategory::kMailbox, actor_id_,
     207                 :             :                 static_cast<uint32_t>(log::LogEventId::kMailboxDepthHigh),
     208                 :             :                 "mailbox depth high",
     209                 :             :                 log::field("depth", static_cast<uint64_t>(depth)));
     210                 :             :         }
     211                 :             : 
     212                 :        1404 :         if (metrics_ring_buffer_) [[unlikely]] {
     213                 :         352 :             metrics::MetricEvent evt{};
     214                 :         352 :             evt.actor_id = actor_id_;
     215                 :         352 :             evt.event_type = metrics::MetricEventType::kMailboxEnqueue;
     216                 :         352 :             evt.value_hi = 1;
     217                 :         352 :             metrics_ring_buffer_->try_push(evt);
     218                 :             :         }
     219                 :        1404 :         if (was_empty) {
     220                 :          64 :             bool expected = true;
     221                 :          64 :             if (mailbox_was_empty_.compare_exchange_strong(
     222                 :             :                     expected, false, std::memory_order_acq_rel,
     223                 :             :                     std::memory_order_acquire)) {
     224                 :             :                 // Directly resume the actor's continuation if available
     225                 :             :                 // This avoids the latency of queuing and later pickup
     226                 :          63 :                 if (continuation_callback_) {
     227                 :           1 :                     continuation_callback_();
     228                 :             :                 }
     229                 :             :                 // Also notify scheduler for bookkeeping and potential requeue
     230                 :          63 :                 scheduler_->notify_ready(actor_id_, meta.priority, meta.deadline_ns);
     231                 :             :             }
     232                 :             :         }
     233                 :        1404 :     }
     234                 :             : 
     235                 :             :     // Consumer: dequeue one message
     236                 :        1216 :     T* dequeue() noexcept {
     237                 :        1216 :         T* node = mailbox_.dequeue();
     238                 :        1216 :         if (node != nullptr) {
     239                 :             :             // Release from the correct reservation pool.
     240                 :             :             if constexpr (std::is_same_v<T, TypedMessage>) {
     241                 :        1189 :                 if (is_system_message(node->type_id()) &&
     242                 :          38 :                     reserved_system_messages_.load(std::memory_order_acquire) > 0) {
     243                 :           1 :                     release_system_reservation(estimate_node_bytes(*node));
     244                 :             :                 } else {
     245                 :        1169 :                     release_reservation(estimate_node_bytes(*node));
     246                 :             :                 }
     247                 :             :             } else {
     248                 :             :                 release_reservation(estimate_node_bytes(*node));
     249                 :             :             }
     250                 :        1170 :             total_dequeued_.fetch_add(1, std::memory_order_relaxed);
     251                 :        1170 :             if (empty()) {
     252                 :          59 :                 mailbox_was_empty_.store(true, std::memory_order_release);
     253                 :             :             }
     254                 :             :         }
     255                 :        1216 :         if (metrics_ring_buffer_) [[unlikely]] {
     256                 :         430 :             metrics::MetricEvent evt{};
     257                 :         430 :             evt.actor_id = actor_id_;
     258                 :         430 :             evt.event_type = metrics::MetricEventType::kMailboxDequeue;
     259                 :         430 :             evt.value_hi = 1;
     260                 :         430 :             metrics_ring_buffer_->try_push(evt);
     261                 :             :         }
     262                 :        1216 :         return node;
     263                 :             :     }
     264                 :             : 
     265                 :             :     // Non-blocking pop matching ActorMailbox interface
     266                 :        1200 :     bool try_pop(T& out) noexcept {
     267                 :        1200 :         T* node = dequeue();
     268                 :        1200 :         if (!node)
     269                 :          46 :             return false;
     270                 :        1154 :         out = std::move(*node);
     271                 :        1154 :         node->~T();
     272                 :        1154 :         mem::deallocate(node);
     273                 :        1154 :         return true;
     274                 :             :     }
     275                 :             : 
     276                 :        2970 :     bool empty() const noexcept {
     277                 :        2970 :         return mailbox_.empty();
     278                 :             :     }
     279                 :             : 
     280                 :             :     // For MailboxAwaiter: was_empty before suspension?
     281                 :          31 :     bool was_empty() const noexcept {
     282                 :          31 :         return mailbox_was_empty_.load(std::memory_order_acquire);
     283                 :             :     }
     284                 :             : 
     285                 :             :     // Reset edge-trigger (called when actor suspends via await_suspend)
     286                 :           9 :     void set_was_empty(bool val) noexcept {
     287                 :           9 :         mailbox_was_empty_.store(val, std::memory_order_release);
     288                 :           9 :     }
     289                 :             : 
     290                 :             :     void
     291                 :         108 :     set_metrics_ring_buffer(metrics::MpscRingBuffer<metrics::MetricEvent>* buf) noexcept {
     292                 :         108 :         metrics_ring_buffer_ = buf;
     293                 :         108 :     }
     294                 :             : 
     295                 :         108 :     void set_logger(log::Logger* logger) noexcept {
     296                 :         108 :         logger_ = logger;
     297                 :         108 :     }
     298                 :             : 
     299                 :             :     // Inject a message for testing (bypasses scheduler notify_ready).
     300                 :             :     // Must update reservation counters to keep dequeue accounting consistent.
     301                 :          51 :     void inject_for_test(T* node) noexcept {
     302                 :          51 :         reserved_messages_.fetch_add(1, std::memory_order_relaxed);
     303                 :          51 :         total_enqueued_.fetch_add(1, std::memory_order_relaxed);
     304                 :          51 :         mailbox_.enqueue(node);
     305                 :          51 :         mailbox_was_empty_.store(false, std::memory_order_release);
     306                 :          51 :     }
     307                 :             : 
     308                 :             :     // Return a snapshot of current mailbox stats.
     309                 :           8 :     cli::MboxSnapshot snapshot() const {
     310                 :           8 :         cli::MboxSnapshot s;
     311                 :           8 :         s.depth = static_cast<uint32_t>(mailbox_.count());
     312                 :           8 :         s.capacity = config_.capacity.max_messages;
     313                 :           8 :         s.queued_bytes = queued_bytes_.load(std::memory_order_acquire);
     314                 :           8 :         s.byte_capacity = config_.capacity.max_bytes;
     315                 :             : 
     316                 :           8 :         if (s.capacity > 0) {
     317                 :           8 :             double ratio =
     318                 :           8 :                 static_cast<double>(s.depth) / static_cast<double>(s.capacity);
     319                 :           8 :             s.pressure_ratio_ppm = static_cast<uint32_t>(ratio * 1'000'000.0);
     320                 :             :         }
     321                 :             : 
     322                 :           8 :         s.total_enqueued = total_enqueued_.load(std::memory_order_acquire);
     323                 :           8 :         s.total_dequeued = total_dequeued_.load(std::memory_order_acquire);
     324                 :           8 :         s.total_rejected = total_rejected_.load(std::memory_order_acquire);
     325                 :           8 :         s.total_dropped = total_dropped_.load(std::memory_order_acquire);
     326                 :           8 :         s.total_dead_letters = total_dead_letters_.load(std::memory_order_acquire);
     327                 :           8 :         s.max_depth = max_depth_.load(std::memory_order_acquire);
     328                 :           8 :         s.high_priority_depth = 0;
     329                 :           8 :         s.pressure_state =
     330                 :             :             to_string(pressure_state_.load(std::memory_order_acquire));
     331                 :           8 :         s.overflow_policy = to_string(config_.overflow_policy);
     332                 :           8 :         return s;
     333                 :             :     }
     334                 :             : 
     335                 :             :   private:
     336                 :             :     // Try to reserve one message slot via CAS.
     337                 :             :     // Byte budget tracking is deferred; bytes param reserved for future use.
     338                 :             :     // Returns false when at hard capacity.
     339                 :      120645 :     bool try_reserve(uint64_t /*bytes*/) noexcept {
     340                 :      120645 :         uint32_t cap = config_.capacity.max_messages;
     341                 :      120645 :         if (cap == 0)
     342                 :           0 :             return true; // unlimited
     343                 :             : 
     344                 :      241290 :         uint32_t current = reserved_messages_.load(std::memory_order_acquire);
     345                 :             :         while (true) {
     346                 :      120667 :             if (current >= cap) {
     347                 :      119244 :                 return false;
     348                 :             :             }
     349                 :        2846 :             if (reserved_messages_.compare_exchange_weak(
     350                 :             :                     current, current + 1, std::memory_order_acq_rel,
     351                 :             :                     std::memory_order_acquire)) {
     352                 :        1401 :                 return true;
     353                 :             :             }
     354                 :             :         }
     355                 :             :     }
     356                 :             : 
     357                 :             :     // Release a previously reserved slot + bytes.
     358                 :        1170 :     void release_reservation(uint64_t bytes) noexcept {
     359                 :        1170 :         reserved_messages_.fetch_sub(1, std::memory_order_release);
     360                 :        1170 :         if (bytes > 0) {
     361                 :        1170 :             queued_bytes_.fetch_sub(bytes, std::memory_order_release);
     362                 :             :         }
     363                 :        1170 :     }
     364                 :             : 
     365                 :             :     // Try to reserve a system message slot via CAS on the protected reserve.
     366                 :             :     // Only used when the normal capacity pool is exhausted.
     367                 :           4 :     bool try_reserve_system(uint64_t /*bytes*/) noexcept {
     368                 :           4 :         uint32_t limit = config_.protected_system_messages;
     369                 :           4 :         if (limit == 0)
     370                 :           0 :             return false;
     371                 :             : 
     372                 :           4 :         uint32_t current =
     373                 :           8 :             reserved_system_messages_.load(std::memory_order_acquire);
     374                 :             :         while (true) {
     375                 :           4 :             if (current >= limit) {
     376                 :           1 :                 return false;
     377                 :             :             }
     378                 :           6 :             if (reserved_system_messages_.compare_exchange_weak(
     379                 :             :                     current, current + 1, std::memory_order_acq_rel,
     380                 :             :                     std::memory_order_acquire)) {
     381                 :           3 :                 return true;
     382                 :             :             }
     383                 :             :         }
     384                 :             :     }
     385                 :             : 
     386                 :             :     // Release a previously reserved system slot + bytes.
     387                 :           1 :     void release_system_reservation(uint64_t bytes) noexcept {
     388                 :           1 :         reserved_system_messages_.fetch_sub(1, std::memory_order_release);
     389                 :           1 :         if (bytes > 0 && config_.capacity.max_bytes > 0) {
     390                 :           0 :             queued_bytes_.fetch_sub(bytes, std::memory_order_release);
     391                 :             :         }
     392                 :           1 :     }
     393                 :             : 
     394                 :             :     // Drop the oldest message from the mailbox to free a slot.
     395                 :             :     // Returns true if a message was successfully dropped.
     396                 :           1 :     bool drop_one_oldest() noexcept {
     397                 :           1 :         T* node = mailbox_.dequeue();
     398                 :           1 :         if (!node)
     399                 :           0 :             return false;
     400                 :             : 
     401                 :             :         // Release from the correct pool and update drop counter.
     402                 :             :         if constexpr (std::is_same_v<T, TypedMessage>) {
     403                 :           1 :             if (is_system_message(node->type_id()) &&
     404                 :           0 :                 reserved_system_messages_.load(std::memory_order_acquire) > 0) {
     405                 :           0 :                 release_system_reservation(estimate_node_bytes(*node));
     406                 :             :             } else {
     407                 :           1 :                 release_reservation(estimate_node_bytes(*node));
     408                 :             :             }
     409                 :             :         } else {
     410                 :             :             release_reservation(estimate_node_bytes(*node));
     411                 :             :         }
     412                 :             : 
     413                 :           1 :         total_dropped_.fetch_add(1, std::memory_order_relaxed);
     414                 :           1 :         if (metrics_ring_buffer_) [[unlikely]] {
     415                 :           0 :             metrics::MetricEvent evt{};
     416                 :           0 :             evt.actor_id = actor_id_;
     417                 :           0 :             evt.event_type = metrics::MetricEventType::kMailboxDropped;
     418                 :           0 :             evt.value_hi = 1;
     419                 :           0 :             metrics_ring_buffer_->try_push(evt);
     420                 :             :         }
     421                 :             : 
     422                 :           1 :         node->~T();
     423                 :           1 :         mem::deallocate(node);
     424                 :             : 
     425                 :           1 :         if (empty()) {
     426                 :           1 :             mailbox_was_empty_.store(true, std::memory_order_release);
     427                 :             :         }
     428                 :             : 
     429                 :           1 :         return true;
     430                 :             :     }
     431                 :             : 
     432                 :             :     // Determine the result code after accepting a message, based on current
     433                 :             :     // watermarks.
     434                 :        1399 :     EnqueueResultCode pressure_code_after_accept() const noexcept {
     435                 :        1399 :         uint32_t cap = config_.capacity.max_messages;
     436                 :        1399 :         if (cap == 0)
     437                 :           0 :             return EnqueueResultCode::Accepted;
     438                 :        1399 :         uint32_t depth = static_cast<uint32_t>(mailbox_.count());
     439                 :        1399 :         double ratio = static_cast<double>(depth) / static_cast<double>(cap);
     440                 :        1399 :         if (ratio >= config_.high_watermark) {
     441                 :         224 :             return EnqueueResultCode::AcceptedWithSoftPressure;
     442                 :             :         }
     443                 :        1175 :         return EnqueueResultCode::Accepted;
     444                 :             :     }
     445                 :             : 
     446                 :             :     // Fill an EnqueueResult from the given code and current state.
     447                 :      120639 :     EnqueueResult make_result(EnqueueResultCode code) const noexcept {
     448                 :      120639 :         EnqueueResult r;
     449                 :      120639 :         r.code = code;
     450                 :      120639 :         r.target = actor_id_;
     451                 :      120639 :         r.depth = static_cast<uint32_t>(mailbox_.count());
     452                 :      120639 :         r.capacity = config_.capacity.max_messages;
     453                 :      120639 :         if (r.capacity > 0) {
     454                 :      120639 :             r.pressure_ratio =
     455                 :      120639 :                 static_cast<double>(r.depth) / static_cast<double>(r.capacity);
     456                 :             :         }
     457                 :      120639 :         return r;
     458                 :             :     }
     459                 :             : 
     460                 :             :     // Update max_depth_ tracking via CAS.
     461                 :        1404 :     void update_max_depth() noexcept {
     462                 :        1404 :         uint64_t depth = static_cast<uint64_t>(mailbox_.count());
     463                 :        1404 :         uint64_t prev = max_depth_.load(std::memory_order_acquire);
     464                 :        1426 :         while (depth > prev) {
     465                 :        2774 :             if (max_depth_.compare_exchange_weak(prev, depth, std::memory_order_acq_rel,
     466                 :             :                                                  std::memory_order_acquire)) {
     467                 :        1365 :                 break;
     468                 :             :             }
     469                 :             :         }
     470                 :        1404 :     }
     471                 :             : 
     472                 :             :     // Update pressure state based on current depth vs watermarks.
     473                 :        1404 :     void update_pressure_state() noexcept {
     474                 :        1404 :         uint32_t cap = config_.capacity.max_messages;
     475                 :        1404 :         if (cap == 0) {
     476                 :           0 :             pressure_state_.store(MailboxPressureState::Normal,
     477                 :             :                                   std::memory_order_release);
     478                 :           0 :             return;
     479                 :             :         }
     480                 :        1404 :         uint32_t depth = static_cast<uint32_t>(mailbox_.count());
     481                 :        1404 :         double ratio = static_cast<double>(depth) / static_cast<double>(cap);
     482                 :        1404 :         if (ratio >= config_.high_watermark) {
     483                 :         224 :             pressure_state_.store(MailboxPressureState::SoftPressure,
     484                 :             :                                   std::memory_order_release);
     485                 :             :         } else {
     486                 :        1180 :             pressure_state_.store(MailboxPressureState::Normal,
     487                 :             :                                   std::memory_order_release);
     488                 :             :         }
     489                 :             :     }
     490                 :             : 
     491                 :             :     // Estimate bytes for a node.  Uses the TypedMessage-specific helper when T
     492                 :             :     // is TypedMessage, otherwise falls back to sizeof(T).
     493                 :      121814 :     static uint64_t estimate_node_bytes(const T& node) noexcept {
     494                 :             :         if constexpr (std::is_same_v<T, TypedMessage>) {
     495                 :      121814 :             return estimate_message_bytes(node);
     496                 :             :         } else {
     497                 :             :             return sizeof(T);
     498                 :             :         }
     499                 :             :     }
     500                 :             : 
     501                 :             :     ActorId actor_id_;
     502                 :             :     sched::IScheduler* scheduler_;
     503                 :             :     MPSCMailbox<T> mailbox_;
     504                 :             :     MailboxConfig config_;
     505                 :             :     std::atomic<bool> mailbox_was_empty_{true};
     506                 :             :     std::atomic<uint32_t> reserved_messages_{0};
     507                 :             :     std::atomic<uint32_t> reserved_system_messages_{0};
     508                 :             :     std::atomic<uint64_t> queued_bytes_{0};
     509                 :             :     std::atomic<uint64_t> total_enqueued_{0};
     510                 :             :     std::atomic<uint64_t> total_dequeued_{0};
     511                 :             :     std::atomic<uint64_t> total_rejected_{0};
     512                 :             :     std::atomic<uint64_t> total_dropped_{0};
     513                 :             :     std::atomic<uint64_t> total_dead_letters_{0};
     514                 :             :     std::atomic<uint64_t> max_depth_{0};
     515                 :             :     std::atomic<MailboxPressureState> pressure_state_{MailboxPressureState::Normal};
     516                 :             :     ActorContinuationCallback continuation_callback_;
     517                 :             :     metrics::MpscRingBuffer<metrics::MetricEvent>* metrics_ring_buffer_{nullptr};
     518                 :             :     log::Logger* logger_ = nullptr;
     519                 :             : };
     520                 :             : 
     521                 :             : } // namespace hpactor::mailbox
        

Generated by: LCOV version 2.0-1