Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : #pragma once
3 : :
4 : : #include <hpactor/core/actor_system.hpp>
5 : : #include <hpactor/net/async_io_fwd.hpp>
6 : : #include <hpactor/net/reactor_backend.hpp>
7 : : #include <hpactor/types/types.hpp>
8 : :
9 : : #include <functional>
10 : : #include <unordered_map>
11 : :
12 : : namespace hpactor {
13 : :
14 : : namespace net {
15 : :
16 : : // ProactorDispatcher - dispatches proactor (async I/O) completion events
17 : : // to the appropriate actor mailbox or timer system.
18 : : //
19 : : // In proactor mode, async operations complete asynchronously and the
20 : : // dispatcher routes the completion to the correct destination:
21 : : // - TimerFired events -> timer handler callback
22 : : // - I/O completions (Send/Recv/Accept/Connect/RecvFrom/SendTo) -> ActorSystem
23 : : class ProactorDispatcher {
24 : : public:
25 : : // Timer handler callback - called when a TimerFired completion arrives.
26 : : // The user_data from the completion identifies which timer expired.
27 : : using timer_handler = std::function<void(uint64_t user_data)>;
28 : :
29 : 77 : void set_timer_handler(timer_handler handler) {
30 : 77 : timer_handler_ = std::move(handler);
31 : 77 : }
32 : :
33 : 0 : void set_actor_system(ActorSystem* system) {
34 : 0 : system_ = system;
35 : 0 : }
36 : :
37 : : // Test-only: capture completions for verification.
38 : : // When set, delivery goes to this callback instead of ActorSystem.
39 : : using completion_callback = std::function<void(OpCompletion)>;
40 : 8 : void set_completion_callback(completion_callback cb) {
41 : 8 : completion_callback_ = std::move(cb);
42 : 8 : }
43 : :
44 : : // Track an active I/O operation for an fd.
45 : : // Used for optional bookkeeping (e.g., to check if an fd has
46 : : // an in-flight operation).
47 : 2 : void register_io(int fd, ActorId actor, OpType type) {
48 : 2 : active_ops_[fd] = {actor, type};
49 : 2 : }
50 : :
51 : : // Remove tracking for a completed or cancelled operation.
52 : 1 : void unregister_io(int fd) {
53 : 1 : active_ops_.erase(fd);
54 : 1 : }
55 : :
56 : : // Check if an fd has a tracked operation.
57 : 4 : bool has_active_io(int fd) const {
58 : 4 : return active_ops_.find(fd) != active_ops_.end();
59 : : }
60 : :
61 : : // Called by the proactor backend when an async operation completes.
62 : : // Routes the completion to the appropriate destination.
63 : 48 : void on_completion(OpCompletion completion) {
64 : 48 : switch (completion.type) {
65 : 36 : case OpType::TimerFired:
66 : 36 : if (timer_handler_) {
67 : 35 : timer_handler_(completion.user_data);
68 : : }
69 : 36 : break;
70 : :
71 : 12 : case OpType::Send:
72 : : case OpType::Recv:
73 : : case OpType::Accept:
74 : : case OpType::Connect:
75 : : case OpType::RecvFrom:
76 : : case OpType::SendTo:
77 : 12 : deliver_to_actor(completion);
78 : 12 : break;
79 : : }
80 : 48 : }
81 : :
82 : : private:
83 : 12 : void deliver_to_actor(const OpCompletion& completion) {
84 : 12 : active_ops_.erase(completion.fd);
85 : 12 : if (completion_callback_) {
86 : 7 : completion_callback_(completion);
87 : 5 : } else if (system_) {
88 : 0 : system_->enqueue_completion(completion);
89 : : }
90 : 12 : }
91 : :
92 : : struct ActiveOp {
93 : : ActorId actor;
94 : : OpType type;
95 : : };
96 : :
97 : : ActorSystem* system_ = nullptr;
98 : : timer_handler timer_handler_;
99 : : completion_callback completion_callback_;
100 : : std::unordered_map<int, ActiveOp> active_ops_;
101 : : };
102 : :
103 : : } // namespace net
104 : : } // namespace hpactor
|