LCOV - code coverage report
Current view: top level - include/hpactor/sched - coroutine_awaiters.hpp (source / functions) Coverage Total Hit
Test: HPActor Coverage Lines: 94.7 % 19 18
Test Date: 2026-05-20 02:24:49 Functions: 100.0 % 4 4
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: - 0 0

             Branch data     Line data    Source code
       1                 :             : // Copyright 2026 HPActor Contributors
       2                 :             : //
       3                 :             : // Licensed under the Apache License, Version 2.0 (the "License");
       4                 :             : // you may not use this file except in compliance with the License.
       5                 :             : // You may obtain a copy of the License at
       6                 :             : //
       7                 :             : //     http://www.apache.org/licenses/LICENSE-2.0
       8                 :             : //
       9                 :             : // Unless required by applicable law or agreed to in writing, software
      10                 :             : // distributed under the License is distributed on an "AS IS" BASIS,
      11                 :             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12                 :             : // See the License for the specific language governing permissions and
      13                 :             : // limitations under the License.
      14                 :             : 
      15                 :             : #pragma once
      16                 :             : 
      17                 :             : #include <hpactor/actor/typed_message.hpp>
      18                 :             : #include <hpactor/hpactor_config.hpp>
      19                 :             : #include <hpactor/mailbox/mpsc_actor_mailbox.hpp>
      20                 :             : #include <hpactor/sched/coroutine_task.hpp>
      21                 :             : #include <hpactor/sched/scheduler.hpp>
      22                 :             : 
      23                 :             : #include <atomic>
      24                 :             : #include <cstdint>
      25                 :             : 
      26                 :             : #if HPACTOR_SUPPORT_COROUTINES
      27                 :             : 
      28                 :             : #    include <coroutine>
      29                 :             : 
      30                 :             : namespace hpactor::sched {
      31                 :             : 
      32                 :             : // MailboxAwaiter: awaitable for co_await actor.receive()
      33                 :             : // Suspends when mailbox is empty, resumes when message arrives
      34                 :             : // T is the message type (e.g., Message<MessageVariant>)
      35                 :             : template <typename T> class MailboxAwaiter {
      36                 :             :   public:
      37                 :          22 :     explicit MailboxAwaiter(CoroutinePromise& promise,
      38                 :             :                             mailbox::MPSCActorMailbox<T>* mailbox) noexcept
      39                 :          22 :         : promise_(promise), mailbox_(mailbox) {}
      40                 :             : 
      41                 :             :     // Return true if message already available (don't suspend)
      42                 :          20 :     bool await_ready() const noexcept {
      43                 :             :         // Check if message arrived between last suspension and now
      44                 :          20 :         return !mailbox_->was_empty();
      45                 :             :     }
      46                 :             : 
      47                 :             :     // Called when suspending
      48                 :          10 :     bool await_suspend(std::coroutine_handle<> continuation) noexcept {
      49                 :             :         // Check emptiness at this moment — a message may have arrived since
      50                 :             :         // await_ready(). If a message arrived while we were deciding, the
      51                 :             :         // sender already claimed the wakeup via CAS(true, false) on was_empty —
      52                 :             :         // don't suspend.
      53                 :          10 :         bool was_empty = mailbox_->was_empty();
      54                 :          10 :         if (!was_empty) {
      55                 :           1 :             return false;
      56                 :             :         }
      57                 :             : 
      58                 :             :         // Mailbox is still empty — safely reset edge-trigger so the next
      59                 :             :         // enqueue (after we suspend) can claim the wakeup.
      60                 :           9 :         mailbox_->set_was_empty(true);
      61                 :             : 
      62                 :             :         // Transition: Running → Idle
      63                 :           9 :         uint32_t expected = ActorState::kRunning;
      64                 :           9 :         if (promise_.actor_state->cas(expected, ActorState::kIdle)) {
      65                 :           8 :             promise_.continuation = continuation;
      66                 :           8 :             return true; // successfully suspended
      67                 :             :         }
      68                 :             :         // State was not Running — actor may have already terminated
      69                 :           1 :         return false; // don't suspend
      70                 :             :     }
      71                 :             : 
      72                 :             :     // Called when resuming (message arrived)
      73                 :          12 :     T await_resume() noexcept {
      74                 :             :         // Dequeue and return the message
      75                 :          12 :         auto* msg = mailbox_->dequeue();
      76                 :          12 :         if (msg) {
      77                 :             :             // Return by moving the Message out
      78                 :          12 :             return std::move(*msg);
      79                 :             :         }
      80                 :             :         // Return empty message if dequeue failed
      81                 :           0 :         return T{};
      82                 :             :     }
      83                 :             : 
      84                 :             :   private:
      85                 :             :     CoroutinePromise& promise_;
      86                 :             :     mailbox::MPSCActorMailbox<T>* mailbox_;
      87                 :             : };
      88                 :             : 
      89                 :             : // TimerAwaiter: awaitable for co_await scheduler.schedule_after(delay)
      90                 :             : // Wires to HybridScheduler::schedule_timer() for real timer integration
      91                 :             : class TimerAwaiter {
      92                 :             :   public:
      93                 :             :     TimerAwaiter(int64_t delay_ns, HybridScheduler& scheduler, ActorId actor_id,
      94                 :             :                  uint8_t priority = 0) noexcept
      95                 :             :         : scheduler_(scheduler), actor_id_(actor_id), delay_ns_(delay_ns),
      96                 :             :           priority_(priority) {}
      97                 :             : 
      98                 :             :     bool await_ready() const noexcept {
      99                 :             :         return false;
     100                 :             :     }
     101                 :             : 
     102                 :             :     bool await_suspend(std::coroutine_handle<> continuation) noexcept {
     103                 :             :         continuation_ = continuation;
     104                 :             : 
     105                 :             :         // Set promise to IOWaiting
     106                 :             :         auto& promise = std::coroutine_handle<CoroutinePromise>::from_address(
     107                 :             :                             continuation.address())
     108                 :             :                             .promise();
     109                 :             :         promise.set_io_waiting();
     110                 :             : 
     111                 :             :         // Schedule timer — on expiry, actor is re-woken via notify_ready
     112                 :             :         timer_id_ = scheduler_.schedule_timer(delay_ns_, [this] {
     113                 :             :             scheduler_.notify_ready(actor_id_, priority_, INT64_MAX);
     114                 :             :         });
     115                 :             : 
     116                 :             :         return true;
     117                 :             :     }
     118                 :             : 
     119                 :             :     void await_resume() noexcept {
     120                 :             :         // Timer fired; actor has been re-woken
     121                 :             :     }
     122                 :             : 
     123                 :             :     void await_cancel() noexcept {
     124                 :             :         scheduler_.cancel_timer(TimerHandle{timer_id_});
     125                 :             :     }
     126                 :             : 
     127                 :             :   private:
     128                 :             :     HybridScheduler& scheduler_;
     129                 :             :     ActorId actor_id_;
     130                 :             :     int64_t delay_ns_;
     131                 :             :     uint8_t priority_;
     132                 :             :     uint64_t timer_id_{0};
     133                 :             :     std::coroutine_handle<> continuation_;
     134                 :             : };
     135                 :             : 
     136                 :             : // BlockingMailboxAwaiter: for blocking receive with stackful coroutines
     137                 :             : // T is the message type (e.g., Message<MessageVariant>)
     138                 :             : template <typename T> class BlockingMailboxAwaiter {
     139                 :             :   public:
     140                 :             :     BlockingMailboxAwaiter(CoroutinePromise& promise,
     141                 :             :                            mailbox::MPSCActorMailbox<T>* mailbox,
     142                 :             :                            std::coroutine_handle<> continuation) noexcept
     143                 :             :         : promise_(promise), mailbox_(mailbox), continuation_(continuation) {}
     144                 :             : 
     145                 :             :     bool await_ready() const noexcept {
     146                 :             :         return !mailbox_->was_empty();
     147                 :             :     }
     148                 :             : 
     149                 :             :     bool await_suspend(std::coroutine_handle<> continuation) noexcept {
     150                 :             :         // Check emptiness at this moment — a message may have arrived since
     151                 :             :         // await_ready()
     152                 :             :         bool was_empty = mailbox_->was_empty();
     153                 :             :         if (!was_empty)
     154                 :             :             return false; // message arrived between await_ready() and here
     155                 :             : 
     156                 :             :         // Only reset edge-trigger if mailbox was empty at entry.
     157                 :             :         // If a message arrived while we were deciding, the sender already
     158                 :             :         // claimed the wakeup via CAS(true, false) on was_empty.
     159                 :             :         if (was_empty) {
     160                 :             :             mailbox_->set_was_empty(true);
     161                 :             :         }
     162                 :             : 
     163                 :             :         promise_.continuation = continuation;
     164                 :             :         promise_.set_idle();
     165                 :             :         return true;
     166                 :             :     }
     167                 :             : 
     168                 :             :     void await_resume() noexcept {
     169                 :             :         // Returns the message
     170                 :             :     }
     171                 :             : 
     172                 :             :   private:
     173                 :             :     CoroutinePromise& promise_;
     174                 :             :     mailbox::MPSCActorMailbox<T>* mailbox_;
     175                 :             :     std::coroutine_handle<> continuation_;
     176                 :             : };
     177                 :             : 
     178                 :             : } // namespace hpactor::sched
     179                 :             : 
     180                 :             : #endif // HPACTOR_SUPPORT_COROUTINES
        

Generated by: LCOV version 2.0-1