Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : // Licensed under the Apache License, Version 2.0
3 : : #include <hpactor/sched/calendar_queue.hpp>
4 : :
5 : : #include <algorithm>
6 : : #include <cstdlib>
7 : :
8 : : namespace hpactor::sched {
9 : :
10 : 1030 : void CalendarQueue::BucketList::push_back(Timer* t) {
11 : 1030 : t->next = nullptr;
12 : 1030 : t->prev = tail;
13 : 1030 : if (tail) {
14 : 1008 : tail->next = t;
15 : : } else {
16 : 22 : head = t;
17 : : }
18 : 1030 : tail = t;
19 : 1030 : count++;
20 : 1030 : }
21 : :
22 : 1029 : void CalendarQueue::BucketList::unlink(Timer* t) {
23 : 1029 : if (t->prev) {
24 : 0 : t->prev->next = t->next;
25 : : } else {
26 : 1029 : head = t->next;
27 : : }
28 : 1029 : if (t->next) {
29 : 1008 : t->next->prev = t->prev;
30 : : } else {
31 : 21 : tail = t->prev;
32 : : }
33 : 1029 : t->next = nullptr;
34 : 1029 : t->prev = nullptr;
35 : 1029 : count--;
36 : 1029 : }
37 : :
38 : 14 : CalendarQueue::CalendarQueue(const CalendarQueueConfig& cfg)
39 : 14 : : fine_bucket_ns_(cfg.fine_bucket_ns)
40 : 14 : , coarse_bucket_ns_(cfg.fine_bucket_ns * cfg.fine_buckets)
41 : 14 : , remote_bucket_ns_(coarse_bucket_ns_ * cfg.coarse_buckets)
42 : 14 : , max_advance_buckets_(cfg.max_advance_buckets)
43 : : {
44 : 14 : if ((cfg.fine_buckets & (cfg.fine_buckets - 1)) != 0 ||
45 : 14 : (cfg.coarse_buckets & (cfg.coarse_buckets - 1)) != 0 ||
46 : 14 : (cfg.remote_buckets & (cfg.remote_buckets - 1)) != 0) {
47 : 0 : std::abort();
48 : : }
49 : 14 : fine_mask_ = cfg.fine_buckets - 1;
50 : 14 : coarse_mask_ = cfg.coarse_buckets - 1;
51 : 14 : remote_mask_ = cfg.remote_buckets - 1;
52 : 14 : fine_wheel_.resize(cfg.fine_buckets);
53 : 14 : coarse_wheel_.resize(cfg.coarse_buckets);
54 : 14 : remote_wheel_.resize(cfg.remote_buckets);
55 : 14 : }
56 : :
57 : 14 : CalendarQueue::~CalendarQueue() {
58 : 15 : for (auto& [id, timer] : timer_map_) {
59 : 1 : delete timer;
60 : : }
61 : 14 : }
62 : :
63 : 1023 : uint64_t CalendarQueue::schedule(int64_t delay_ns, TimerCallback cb) {
64 : 1023 : int64_t now = last_advance_ns_;
65 : 1023 : int64_t expire_ns = now + delay_ns;
66 : 1023 : if (delay_ns <= 0) {
67 : 1 : expire_ns = now + fine_bucket_ns_;
68 : : }
69 : 1023 : return schedule_at(expire_ns, std::move(cb));
70 : : }
71 : :
72 : 1023 : uint64_t CalendarQueue::schedule_at(int64_t expire_ns, TimerCallback cb) {
73 : 1023 : std::lock_guard<std::recursive_mutex> lock(mutex_);
74 : 1023 : auto* timer = new Timer;
75 : 1023 : timer->expire_ns = expire_ns;
76 : 1023 : timer->id = next_id_.fetch_add(1, std::memory_order_relaxed);
77 : 1023 : timer->callback = std::move(cb);
78 : 1023 : timer_map_[timer->id] = timer;
79 : 1023 : int64_t now = last_advance_ns_;
80 : 1023 : insert_timer(timer, now);
81 : 2046 : return timer->id;
82 : 1023 : }
83 : :
84 : 1030 : void CalendarQueue::insert_timer(Timer* timer, int64_t now_ns) {
85 : 1030 : int64_t expire = std::max(timer->expire_ns, now_ns + fine_bucket_ns_);
86 : :
87 : 1030 : if (expire < now_ns + fine_bucket_ns_ * static_cast<int64_t>(fine_wheel_.size())) {
88 : 1023 : uint32_t b = static_cast<uint32_t>(expire / fine_bucket_ns_) & fine_mask_;
89 : 1023 : timer->bucket_idx = b;
90 : 1023 : timer->wheel_level = 0;
91 : 1023 : fine_wheel_[b].push_back(timer);
92 : 7 : } else if (expire < now_ns + coarse_bucket_ns_ * static_cast<int64_t>(coarse_wheel_.size())) {
93 : 6 : uint32_t b = static_cast<uint32_t>(expire / coarse_bucket_ns_) & coarse_mask_;
94 : 6 : timer->bucket_idx = b;
95 : 6 : timer->wheel_level = 1;
96 : 6 : coarse_wheel_[b].push_back(timer);
97 : : } else {
98 : 1 : uint32_t b = static_cast<uint32_t>(expire / remote_bucket_ns_) & remote_mask_;
99 : 1 : timer->bucket_idx = b;
100 : 1 : timer->wheel_level = 2;
101 : 1 : remote_wheel_[b].push_back(timer);
102 : : }
103 : 1030 : }
104 : :
105 : 6 : bool CalendarQueue::cancel(uint64_t timer_id) {
106 : 6 : std::lock_guard<std::recursive_mutex> lock(mutex_);
107 : 6 : auto it = timer_map_.find(timer_id);
108 : 6 : if (it == timer_map_.end()) return false;
109 : 3 : Timer* timer = it->second;
110 : 3 : timer_map_.erase(it);
111 : 3 : switch (timer->wheel_level) {
112 : 3 : case 0: fine_wheel_[timer->bucket_idx].unlink(timer); break;
113 : 0 : case 1: coarse_wheel_[timer->bucket_idx].unlink(timer); break;
114 : 0 : case 2: remote_wheel_[timer->bucket_idx].unlink(timer); break;
115 : 0 : default: break;
116 : : }
117 : 3 : delete timer;
118 : 3 : return true;
119 : 6 : }
120 : :
121 : 12 : void CalendarQueue::cascade_coarse(int64_t now_ns) {
122 : 12 : auto& bucket = coarse_wheel_[current_coarse_];
123 : 12 : Timer* t = bucket.head;
124 : 18 : while (t) {
125 : 6 : Timer* next = t->next;
126 : 6 : bucket.unlink(t);
127 : 6 : insert_timer(t, now_ns);
128 : 6 : t = next;
129 : : }
130 : 12 : }
131 : :
132 : 2 : void CalendarQueue::cascade_remote(int64_t now_ns) {
133 : 2 : auto& bucket = remote_wheel_[current_remote_];
134 : 2 : Timer* t = bucket.head;
135 : 3 : while (t) {
136 : 1 : Timer* next = t->next;
137 : 1 : bucket.unlink(t);
138 : 1 : insert_timer(t, now_ns);
139 : 1 : t = next;
140 : : }
141 : 2 : }
142 : :
143 : 100 : uint32_t CalendarQueue::advance(int64_t now_ns) {
144 : 100 : std::lock_guard<std::recursive_mutex> lock(mutex_);
145 : 100 : if (now_ns <= last_advance_ns_) return 0;
146 : 99 : if (last_advance_ns_ == 0) {
147 : 12 : last_advance_ns_ = now_ns;
148 : 12 : return 0;
149 : : }
150 : :
151 : 87 : uint32_t fired = 0;
152 : 87 : uint32_t buckets_processed = 0;
153 : :
154 : 183 : while (last_advance_ns_ + fine_bucket_ns_ <= now_ns &&
155 : 96 : buckets_processed < max_advance_buckets_) {
156 : :
157 : 96 : auto& bucket = fine_wheel_[current_fine_];
158 : 96 : Timer* t = bucket.head;
159 : 1115 : while (t) {
160 : 1019 : Timer* next = t->next;
161 : 1019 : bucket.unlink(t);
162 : 1019 : timer_map_.erase(t->id);
163 : 1019 : if (t->expire_ns <= now_ns) {
164 : 1019 : t->callback();
165 : 1019 : fired++;
166 : : }
167 : 1019 : delete t;
168 : 1019 : t = next;
169 : : }
170 : :
171 : 96 : last_advance_ns_ += fine_bucket_ns_;
172 : 96 : current_fine_ = (current_fine_ + 1) & fine_mask_;
173 : 96 : buckets_processed++;
174 : :
175 : 96 : if (current_fine_ == 0) {
176 : 12 : cascade_coarse(now_ns);
177 : 12 : current_coarse_ = (current_coarse_ + 1) & coarse_mask_;
178 : 12 : if (current_coarse_ == 0) {
179 : 2 : cascade_remote(now_ns);
180 : 2 : current_remote_ = (current_remote_ + 1) & remote_mask_;
181 : : }
182 : : }
183 : : }
184 : 87 : return fired;
185 : 100 : }
186 : :
187 : 3 : bool CalendarQueue::empty() const {
188 : 3 : std::lock_guard<std::recursive_mutex> lock(mutex_);
189 : 3 : return timer_map_.empty();
190 : 3 : }
191 : :
192 : : } // namespace hpactor::sched
|