LCOV - code coverage report
Current view: top level - src/sched - dedicated_thread_pool.cpp (source / functions) Coverage Total Hit
Test: HPActor Coverage Lines: 86.3 % 51 44
Test Date: 2026-05-20 02:24:49 Functions: 85.7 % 7 6
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: - 0 0

             Branch data     Line data    Source code
       1                 :             : // Copyright 2026 HPActor Contributors
       2                 :             : //
       3                 :             : // Licensed under the Apache License, Version 2.0 (the "License");
       4                 :             : // you may not use this file except in compliance with the License.
       5                 :             : // You may obtain a copy of the License at
       6                 :             : //
       7                 :             : //     http://www.apache.org/licenses/LICENSE-2.0
       8                 :             : //
       9                 :             : // Unless required by applicable law or agreed to in writing, software
      10                 :             : // distributed under the License is distributed on an "AS IS" BASIS,
      11                 :             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12                 :             : // See the License for the specific language governing permissions and
      13                 :             : // limitations under the License.
      14                 :             : 
      15                 :             : #include <hpactor/sched/dedicated_thread_pool.hpp>
      16                 :             : 
      17                 :             : #include <thread>
      18                 :             : 
      19                 :             : namespace hpactor::sched {
      20                 :             : 
      21                 :           5 : DedicatedThreadPool::DedicatedThreadPool(uint32_t num_threads)
      22                 :           5 :     : num_threads_(num_threads == 0 ? 1 : num_threads) {
      23                 :          17 :     for (uint32_t i = 0; i < num_threads_; ++i) {
      24                 :          12 :         workers_.push_back(std::make_unique<WorkerState>());
      25                 :             :     }
      26                 :           5 : }
      27                 :             : 
      28                 :           5 : DedicatedThreadPool::~DedicatedThreadPool() {
      29                 :           5 :     stop();
      30                 :           5 : }
      31                 :             : 
      32                 :           5 : void DedicatedThreadPool::start() {
      33                 :           5 :     if (running_.load(std::memory_order_acquire)) return;
      34                 :           5 :     running_.store(true, std::memory_order_release);
      35                 :             : 
      36                 :          17 :     for (uint32_t i = 0; i < num_threads_; ++i) {
      37                 :          12 :         threads_.emplace_back(&DedicatedThreadPool::worker_loop, this, i);
      38                 :             :     }
      39                 :             : }
      40                 :             : 
      41                 :           8 : void DedicatedThreadPool::stop() {
      42                 :           8 :     if (!running_.load(std::memory_order_acquire)) return;
      43                 :           5 :     running_.store(false, std::memory_order_release);
      44                 :             : 
      45                 :             :     // Wake all workers by clearing their queues
      46                 :          17 :     for (auto& worker : workers_) {
      47                 :          12 :         std::lock_guard<std::mutex> lock(worker->mutex);
      48                 :          12 :         worker->queue.clear();
      49                 :          12 :     }
      50                 :             : 
      51                 :          17 :     for (auto& t : threads_) {
      52                 :          12 :         if (t.joinable()) t.join();
      53                 :             :     }
      54                 :           5 :     threads_.clear();
      55                 :             : }
      56                 :             : 
      57                 :          24 : void DedicatedThreadPool::enqueue(ActorId /*actor*/, WorkFn work) {
      58                 :          24 :     if (!work) return;
      59                 :             : 
      60                 :             :     // Round-robin distribution
      61                 :          24 :     uint32_t idx = next_worker_.fetch_add(1, std::memory_order_relaxed) % num_threads_;
      62                 :          24 :     auto& worker = workers_[idx];
      63                 :             :     {
      64                 :          24 :         std::lock_guard<std::mutex> lock(worker->mutex);
      65                 :          24 :         worker->queue.push_back(std::move(work));
      66                 :          24 :     }
      67                 :             : }
      68                 :             : 
      69                 :           0 : size_t DedicatedThreadPool::pending() const {
      70                 :           0 :     size_t total = 0;
      71                 :           0 :     for (auto& worker : workers_) {
      72                 :           0 :         std::lock_guard<std::mutex> lock(worker->mutex);
      73                 :           0 :         total += worker->queue.size();
      74                 :           0 :     }
      75                 :           0 :     return total;
      76                 :             : }
      77                 :             : 
      78                 :          12 : void DedicatedThreadPool::worker_loop(uint32_t worker_id) {
      79                 :          12 :     auto& worker = workers_[worker_id];
      80                 :      155037 :     while (running_.load(std::memory_order_acquire)) {
      81                 :      155025 :         WorkFn work;
      82                 :             :         {
      83                 :      155025 :             std::lock_guard<std::mutex> lock(worker->mutex);
      84                 :      155025 :             if (!worker->queue.empty()) {
      85                 :          24 :                 work = std::move(worker->queue.front());
      86                 :          24 :                 worker->queue.erase(worker->queue.begin());
      87                 :             :             }
      88                 :      155025 :         }
      89                 :      155025 :         if (work) {
      90                 :          24 :             work();
      91                 :             :         } else {
      92                 :      155001 :             std::this_thread::yield();
      93                 :             :         }
      94                 :      155025 :     }
      95                 :          12 : }
      96                 :             : 
      97                 :             : } // namespace hpactor::sched
        

Generated by: LCOV version 2.0-1