LCOV - code coverage report
Current view: top level - include/hpactor/sched - work_queue.hpp (source / functions) Coverage Total Hit
Test: HPActor Coverage Lines: 96.5 % 86 83
Test Date: 2026-05-20 02:24:49 Functions: 96.2 % 26 25
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: - 0 0

             Branch data     Line data    Source code
       1                 :             : // Copyright 2026 HPActor Contributors
       2                 :             : //
       3                 :             : // Licensed under the Apache License, Version 2.0 (the "License");
       4                 :             : // you may not use this file except in compliance with the License.
       5                 :             : // You may obtain a copy of the License at
       6                 :             : //
       7                 :             : //     http://www.apache.org/licenses/LICENSE-2.0
       8                 :             : //
       9                 :             : // Unless required by applicable law or agreed to in writing, software
      10                 :             : // distributed under the License is distributed on an "AS IS" BASIS,
      11                 :             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12                 :             : // See the License for the specific language governing permissions and
      13                 :             : // limitations under the License.
      14                 :             : 
      15                 :             : #pragma once
      16                 :             : 
      17                 :             : #include <algorithm>
      18                 :             : #include <atomic>
      19                 :             : #include <cstddef>
      20                 :             : #include <cstdint>
      21                 :             : #include <vector>
      22                 :             : 
      23                 :             : #include <hpactor/types/types.hpp>
      24                 :             : 
      25                 :             : namespace hpactor::sched {
      26                 :             : 
      27                 :             : struct WorkItem {
      28                 :             :     ActorId actor;
      29                 :             :     int64_t deadline_ns;
      30                 :             :     uint64_t sequence;
      31                 :             : };
      32                 :             : 
      33                 :             : template <typename T> class ChaselevDeque {
      34                 :             :   public:
      35                 :             :     explicit ChaselevDeque(size_t initial_capacity = 256);
      36                 :             :     ~ChaselevDeque();
      37                 :             : 
      38                 :             :     ChaselevDeque(const ChaselevDeque&) = delete;
      39                 :             :     ChaselevDeque& operator=(const ChaselevDeque&) = delete;
      40                 :             :     ChaselevDeque(ChaselevDeque&&) = delete;
      41                 :             :     ChaselevDeque& operator=(ChaselevDeque&&) = delete;
      42                 :             : 
      43                 :             :     void push_bottom(T item);
      44                 :             :     bool pop_bottom(T& out);
      45                 :             :     bool steal_top(T& out);
      46                 :             :     size_t size_approx() const;
      47                 :             : 
      48                 :             :   private:
      49                 :             :     struct CircularArray {
      50                 :             :         std::vector<std::atomic<T>> buf;
      51                 :             :         size_t mask;
      52                 :             :         explicit CircularArray(size_t cap);
      53                 :             : 
      54                 :        1871 :         T get(int64_t i) const {
      55                 :        1871 :             return buf[static_cast<size_t>(i) & mask].load(std::memory_order_relaxed);
      56                 :             :         }
      57                 :        1872 :         void put(int64_t i, T v) {
      58                 :        1872 :             buf[static_cast<size_t>(i) & mask].store(v, std::memory_order_relaxed);
      59                 :        1872 :         }
      60                 :             :         CircularArray* grow(int64_t bottom, int64_t top) const;
      61                 :             :     };
      62                 :             : 
      63                 :             :     std::atomic<int64_t> top_{0};
      64                 :             :     std::atomic<int64_t> bottom_{0};
      65                 :             :     std::atomic<CircularArray*> array_;
      66                 :             :     std::vector<CircularArray*> garbage_;
      67                 :             : };
      68                 :             : 
      69                 :             : template <typename T>
      70                 :         457 : ChaselevDeque<T>::CircularArray::CircularArray(size_t cap)
      71                 :         914 :     : buf(cap), mask(cap - 1) {
      72                 :      116421 :     for (auto& slot : buf) {
      73                 :      115964 :         slot.store(T{}, std::memory_order_relaxed);
      74                 :             :     }
      75                 :         457 : }
      76                 :             : 
      77                 :             : template <typename T>
      78                 :             : typename ChaselevDeque<T>::CircularArray*
      79                 :           6 : ChaselevDeque<T>::CircularArray::grow(int64_t bottom, int64_t top) const {
      80                 :           6 :     size_t new_cap = buf.size() * 2;
      81                 :           6 :     auto* new_arr = new CircularArray(new_cap);
      82                 :         386 :     for (int64_t i = top; i < bottom; ++i) {
      83                 :         380 :         new_arr->put(i, get(i));
      84                 :             :     }
      85                 :           6 :     return new_arr;
      86                 :             : }
      87                 :             : 
      88                 :             : template <typename T>
      89                 :         451 : ChaselevDeque<T>::ChaselevDeque(size_t initial_capacity)
      90                 :         451 :     : array_(new CircularArray(initial_capacity)) {}
      91                 :             : 
      92                 :         451 : template <typename T> ChaselevDeque<T>::~ChaselevDeque() {
      93                 :         451 :     delete array_.load(std::memory_order_acquire);
      94                 :         457 :     for (auto* arr : garbage_) {
      95                 :           6 :         delete arr;
      96                 :             :     }
      97                 :         451 : }
      98                 :             : 
      99                 :        1492 : template <typename T> void ChaselevDeque<T>::push_bottom(T item) {
     100                 :        1492 :     int64_t b = bottom_.load(std::memory_order_relaxed);
     101                 :        1492 :     int64_t t = top_.load(std::memory_order_acquire);
     102                 :        1492 :     auto* arr = array_.load(std::memory_order_acquire);
     103                 :             : 
     104                 :        1492 :     if (b - t > static_cast<int64_t>(arr->mask)) {
     105                 :           6 :         auto* new_arr = arr->grow(b, t);
     106                 :           6 :         garbage_.push_back(arr);
     107                 :           6 :         array_.store(new_arr, std::memory_order_release);
     108                 :           6 :         arr = new_arr;
     109                 :             :     }
     110                 :             : 
     111                 :        1492 :     arr->put(b, std::move(item));
     112                 :        1492 :     bottom_.store(b + 1, std::memory_order_release);
     113                 :        1492 : }
     114                 :             : 
     115                 :      102798 : template <typename T> bool ChaselevDeque<T>::pop_bottom(T& out) {
     116                 :      102798 :     int64_t b = bottom_.fetch_sub(1, std::memory_order_acq_rel) - 1;
     117                 :      102798 :     int64_t t = top_.load(std::memory_order_acquire);
     118                 :             : 
     119                 :      102798 :     auto* arr = array_.load(std::memory_order_acquire);
     120                 :      102798 :     if (b - t < 0) {
     121                 :      102615 :         bottom_.store(b + 1, std::memory_order_release);
     122                 :      102615 :         return false;
     123                 :             :     }
     124                 :             : 
     125                 :         183 :     out = arr->get(b);
     126                 :             : 
     127                 :         183 :     if (b == t) {
     128                 :          42 :         if (!top_.compare_exchange_strong(t, t + 1, std::memory_order_acq_rel,
     129                 :             :                                           std::memory_order_acquire)) {
     130                 :           0 :             bottom_.store(b + 1, std::memory_order_release);
     131                 :           0 :             return false;
     132                 :             :         }
     133                 :             :         // CAS succeeded: we removed the last item (at b == t). Update bottom_
     134                 :             :         // to keep the deque consistent. If we don't, bottom_ < top_ causes push
     135                 :             :         // to overwrite the stolen slot when it sees b < t and thinks the deque
     136                 :             :         // is empty.
     137                 :          21 :         bottom_.store(b + 1, std::memory_order_release);
     138                 :          21 :         return true;
     139                 :             :     }
     140                 :             : 
     141                 :         162 :     return true;
     142                 :             : }
     143                 :             : 
     144                 :      254878 : template <typename T> bool ChaselevDeque<T>::steal_top(T& out) {
     145                 :      254878 :     int64_t t = top_.load(std::memory_order_acquire);
     146                 :      254878 :     int64_t b = bottom_.load(std::memory_order_acquire);
     147                 :             : 
     148                 :      254878 :     if (b - t <= 0) {
     149                 :      253570 :         return false;
     150                 :             :     }
     151                 :             : 
     152                 :        1308 :     auto* arr = array_.load(std::memory_order_acquire);
     153                 :        1308 :     out = arr->get(t);
     154                 :             : 
     155                 :        2616 :     if (!top_.compare_exchange_strong(t, t + 1, std::memory_order_acq_rel,
     156                 :             :                                       std::memory_order_acquire)) {
     157                 :           0 :         return false;
     158                 :             :     }
     159                 :             : 
     160                 :        1308 :     return true;
     161                 :             : }
     162                 :             : 
     163                 :          17 : template <typename T> size_t ChaselevDeque<T>::size_approx() const {
     164                 :          17 :     int64_t b = bottom_.load(std::memory_order_relaxed);
     165                 :          17 :     int64_t t = top_.load(std::memory_order_relaxed);
     166                 :          17 :     return static_cast<size_t>(b >= t ? b - t : 0);
     167                 :             : }
     168                 :             : 
     169                 :             : class MultiPriorityWorkQueue {
     170                 :             :   public:
     171                 :          12 :     explicit MultiPriorityWorkQueue(uint32_t priority_levels = 4)
     172                 :          24 :         : levels_(priority_levels) {}
     173                 :             : 
     174                 :           8 :     void push(uint8_t priority, WorkItem item) {
     175                 :           8 :         levels_[priority].push_bottom(item);
     176                 :           8 :     }
     177                 :             : 
     178                 :           8 :     bool pop(WorkItem& out) {
     179                 :          22 :         for (uint32_t i = 0; i < levels_.size(); ++i) {
     180                 :          20 :             if (levels_[i].pop_bottom(out)) {
     181                 :           6 :                 return true;
     182                 :             :             }
     183                 :             :         }
     184                 :           2 :         return false;
     185                 :             :     }
     186                 :             : 
     187                 :             :     // Steal from the highest priority queue first
     188                 :           5 :     bool steal(WorkItem& out) {
     189                 :          21 :         for (uint32_t i = 0; i < levels_.size(); ++i) {
     190                 :          17 :             if (levels_[i].steal_top(out)) {
     191                 :           1 :                 return true;
     192                 :             :             }
     193                 :             :         }
     194                 :           4 :         return false;
     195                 :             :     }
     196                 :             : 
     197                 :           3 :     size_t depth_approx() const {
     198                 :           3 :         size_t total = 0;
     199                 :          15 :         for (const auto& level : levels_) {
     200                 :          12 :             total += level.size_approx();
     201                 :             :         }
     202                 :           3 :         return total;
     203                 :             :     }
     204                 :             : 
     205                 :           6 :     uint32_t num_levels() const {
     206                 :           6 :         return static_cast<uint32_t>(levels_.size());
     207                 :             :     }
     208                 :             : 
     209                 :             :   private:
     210                 :             :     std::vector<ChaselevDeque<WorkItem>> levels_;
     211                 :             : };
     212                 :             : 
     213                 :             : } // namespace hpactor::sched
        

Generated by: LCOV version 2.0-1