Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #pragma once
16 : :
17 : : #include <hpactor/mem/std_allocator.hpp>
18 : :
19 : : #include <atomic>
20 : : #include <cstdint>
21 : : #include <functional>
22 : : #include <mutex>
23 : : #include <unordered_map>
24 : : #include <vector>
25 : :
26 : : namespace hpactor::sched {
27 : :
28 : : struct CalendarQueueConfig {
29 : : int64_t fine_bucket_ns = 1'000'000; // 1ms
30 : : uint32_t fine_buckets = 256; // power of 2
31 : : uint32_t coarse_buckets = 256; // power of 2
32 : : uint32_t remote_buckets = 256; // power of 2
33 : : uint32_t max_advance_buckets = 4096; // ~4s cap per advance()
34 : : };
35 : :
36 : : class CalendarQueue {
37 : : public:
38 : : using TimerCallback = std::function<void()>;
39 : :
40 : : explicit CalendarQueue(const CalendarQueueConfig& cfg = {});
41 : : ~CalendarQueue();
42 : :
43 : : CalendarQueue(const CalendarQueue&) = delete;
44 : : CalendarQueue& operator=(const CalendarQueue&) = delete;
45 : : CalendarQueue(CalendarQueue&&) = delete;
46 : : CalendarQueue& operator=(CalendarQueue&&) = delete;
47 : :
48 : : [[nodiscard]] uint64_t schedule(int64_t delay_ns, TimerCallback cb);
49 : : [[nodiscard]] uint64_t schedule_at(int64_t expire_ns, TimerCallback cb);
50 : : bool cancel(uint64_t timer_id);
51 : : uint32_t advance(int64_t now_ns);
52 : :
53 : : bool empty() const;
54 : 11 : size_t size() const { return timer_map_.size(); }
55 : :
56 : : private:
57 : : struct Timer : mem::SlabAllocated<Timer> {
58 : : int64_t expire_ns;
59 : : uint64_t id;
60 : : TimerCallback callback;
61 : : Timer* next = nullptr;
62 : : Timer* prev = nullptr;
63 : : uint32_t bucket_idx = 0;
64 : : uint8_t wheel_level = 0; // 0=fine, 1=coarse, 2=remote
65 : : };
66 : :
67 : : struct BucketList {
68 : : Timer* head = nullptr;
69 : : Timer* tail = nullptr;
70 : : uint32_t count = 0;
71 : :
72 : : void push_back(Timer* t);
73 : : void unlink(Timer* t);
74 : : };
75 : :
76 : : // Re-insert a timer at the appropriate wheel level given current time
77 : : void insert_timer(Timer* timer, int64_t now_ns);
78 : :
79 : : // Cascade one bucket from coarse wheel into fine wheel
80 : : void cascade_coarse(int64_t now_ns);
81 : :
82 : : // Cascade one bucket from remote wheel into coarse wheel
83 : : void cascade_remote(int64_t now_ns);
84 : :
85 : : std::vector<BucketList> fine_wheel_;
86 : : std::vector<BucketList> coarse_wheel_;
87 : : std::vector<BucketList> remote_wheel_;
88 : : std::unordered_map<uint64_t, Timer*> timer_map_;
89 : :
90 : : int64_t fine_bucket_ns_;
91 : : int64_t coarse_bucket_ns_;
92 : : int64_t remote_bucket_ns_;
93 : : uint32_t fine_mask_;
94 : : uint32_t coarse_mask_;
95 : : uint32_t remote_mask_;
96 : : uint32_t max_advance_buckets_;
97 : :
98 : : uint32_t current_fine_ = 0;
99 : : uint32_t current_coarse_ = 0;
100 : : uint32_t current_remote_ = 0;
101 : : int64_t last_advance_ns_ = 0;
102 : :
103 : : std::atomic<uint64_t> next_id_{1};
104 : : mutable std::recursive_mutex mutex_;
105 : : };
106 : :
107 : : } // namespace hpactor::sched
|