Branch data Line data Source code
1 : : #include <hpactor/actor/typed_message.hpp>
2 : : #include <hpactor/actor_context.hpp>
3 : : #include <hpactor/tracing/json_exporter.hpp>
4 : : #include <hpactor/tracing/memory_exporter.hpp>
5 : : #include <hpactor/tracing/otlp_exporter.hpp>
6 : : #include <hpactor/tracing/trace_manager.hpp>
7 : :
8 : : #include <chrono>
9 : :
10 : : namespace hpactor::tracing {
11 : :
12 : 5 : TraceManager::TraceManager(TraceConfig config, ActorSystem* system,
13 : 5 : std::unique_ptr<SpanExporter> exporter)
14 : 10 : : config_(std::move(config)), system_(system), sampler_(make_sampler()),
15 : 10 : exporter_(std::move(exporter)) {
16 : 5 : if (!exporter_) {
17 : 2 : switch (config_.exporter) {
18 : 2 : case TraceExporterKind::kNoop:
19 : : case TraceExporterKind::kMemory:
20 : 2 : exporter_ = std::make_unique<MemoryExporter>();
21 : 2 : break;
22 : 0 : case TraceExporterKind::kJsonFile:
23 : : exporter_ =
24 : 0 : std::make_unique<JsonFileExporter>(config_.json_file_path);
25 : 0 : break;
26 : 0 : case TraceExporterKind::kOtlpHttp:
27 : : exporter_ =
28 : 0 : std::make_unique<OtlpHttpExporter>(config_.otlp_endpoint);
29 : 0 : break;
30 : : }
31 : : }
32 : 5 : }
33 : :
34 : 5 : TraceManager::~TraceManager() {
35 : 5 : stop();
36 : 5 : }
37 : :
38 : 5 : std::unique_ptr<Sampler> TraceManager::make_sampler() const {
39 : 5 : switch (config_.sampler) {
40 : 0 : case SamplerKind::kAlwaysOff:
41 : 0 : return std::make_unique<AlwaysOffSampler>();
42 : 4 : case SamplerKind::kAlwaysOn:
43 : 4 : return std::make_unique<AlwaysOnSampler>();
44 : 0 : case SamplerKind::kTraceIdRatio:
45 : 0 : return std::make_unique<TraceIdRatioSampler>(config_.sample_ratio);
46 : 1 : case SamplerKind::kParentBasedTraceIdRatio:
47 : 1 : return std::make_unique<ParentBasedSampler>(config_.sample_ratio);
48 : : }
49 : 0 : return std::make_unique<AlwaysOffSampler>();
50 : : }
51 : :
52 : 4 : uint64_t TraceManager::now_ns() noexcept {
53 : 4 : auto now = std::chrono::steady_clock::now().time_since_epoch();
54 : : return static_cast<uint64_t>(
55 : 4 : std::chrono::duration_cast<std::chrono::nanoseconds>(now).count());
56 : : }
57 : :
58 : 4 : void TraceManager::start() {
59 : 4 : if (!config_.enabled || running_.exchange(true)) {
60 : 0 : return;
61 : : }
62 : 8 : drain_thread_ = std::thread([this]() {
63 : 7 : while (running_.load(std::memory_order_acquire)) {
64 : 3 : std::this_thread::sleep_for(config_.export_interval);
65 : 3 : drain_once();
66 : : }
67 : 4 : });
68 : : }
69 : :
70 : 9 : void TraceManager::stop() {
71 : 9 : if (!running_.exchange(false)) {
72 : 5 : return;
73 : : }
74 : 4 : if (drain_thread_.joinable()) {
75 : 4 : drain_thread_.join();
76 : : }
77 : 4 : drain_once();
78 : 4 : if (exporter_) {
79 : 4 : exporter_->shutdown();
80 : : }
81 : : }
82 : :
83 : 2 : void TraceManager::force_flush() {
84 : 2 : drain_once();
85 : 2 : }
86 : :
87 : 3 : TraceContext TraceManager::create_root_context(std::string_view /*operation*/) {
88 : 3 : TraceContext ctx;
89 : 3 : ctx.trace_id = ids_.next_trace_id();
90 : 3 : ctx.span_id = ids_.next_span_id();
91 : 3 : SamplingParameters params;
92 : 3 : params.trace_id = ctx.trace_id;
93 : 3 : auto decision = sampler_->should_sample(params);
94 : 3 : ctx.flags.set_sampled(decision.sampled);
95 : 3 : return ctx;
96 : : }
97 : :
98 : 1 : TraceContext TraceManager::child_context(const TraceContext& parent) {
99 : 1 : TraceContext child = parent;
100 : 1 : child.span_id = ids_.next_span_id();
101 : 1 : return child;
102 : : }
103 : :
104 : 3 : SpanHandle TraceManager::start_span(const SpanStart& start) {
105 : 3 : SpanHandle handle;
106 : 3 : if (!config_.enabled) {
107 : 1 : return handle;
108 : : }
109 : :
110 : 2 : TraceContext ctx;
111 : 2 : if (start.has_parent && start.parent.valid()) {
112 : 1 : ctx = child_context(start.parent);
113 : 1 : SamplingParameters params;
114 : 1 : params.trace_id = ctx.trace_id;
115 : 1 : params.has_parent = true;
116 : 1 : params.parent_sampled = start.parent.sampled();
117 : 1 : ctx.flags.set_sampled(sampler_->should_sample(params).sampled);
118 : 1 : handle.parent_span_id = start.parent.span_id;
119 : : } else {
120 : 1 : ctx = create_root_context(start.name);
121 : : }
122 : :
123 : 2 : handle.context = ctx;
124 : 2 : handle.start_ns = now_ns();
125 : 2 : handle.kind = start.kind;
126 : 2 : handle.actor_id = start.actor_id;
127 : 2 : handle.sender_actor_id = start.sender_actor_id;
128 : 2 : handle.type_tag = start.type_tag;
129 : 2 : handle.message_id = start.message_id;
130 : 2 : handle.payload_size = start.payload_size;
131 : 2 : handle.recording = ctx.sampled();
132 : 2 : return handle;
133 : : }
134 : :
135 : 2 : void TraceManager::finish_span(SpanHandle& span, SpanStatus status) noexcept {
136 : 2 : if (!span.recording) {
137 : 0 : return;
138 : : }
139 : 2 : SpanRecord record;
140 : 2 : record.trace_id = span.context.trace_id;
141 : 2 : record.span_id = span.context.span_id;
142 : 2 : record.parent_span_id = span.parent_span_id;
143 : 2 : record.actor_id = span.actor_id;
144 : 2 : record.sender_actor_id = span.sender_actor_id;
145 : 2 : record.type_tag = static_cast<uint32_t>(span.type_tag);
146 : 2 : record.message_id = span.message_id.value();
147 : 2 : record.start_ns = span.start_ns;
148 : 2 : record.end_ns = now_ns();
149 : 2 : record.payload_size = span.payload_size;
150 : 2 : record.kind = span.kind;
151 : 2 : record.status = status;
152 : 2 : if (!ring_.try_push(record)) {
153 : 0 : spans_dropped_.fetch_add(1, std::memory_order_relaxed);
154 : : }
155 : 2 : span.recording = false;
156 : : }
157 : :
158 : 1 : void TraceManager::inject_message_context(TypedMessage& msg, const ActorContext* ctx,
159 : : bool allow_root) {
160 : 1 : if (!config_.enabled || msg.has_trace_context()) {
161 : 0 : return;
162 : : }
163 : 1 : if (ctx != nullptr && ctx->has_current_trace_context()) {
164 : 0 : msg.set_trace_context(ctx->current_trace_context());
165 : 0 : return;
166 : : }
167 : 1 : if (allow_root) {
168 : 1 : msg.set_trace_context(create_root_context("hpactor.message"));
169 : : }
170 : : }
171 : :
172 : 9 : void TraceManager::drain_once() {
173 : 9 : if (!exporter_) {
174 : 0 : return;
175 : : }
176 : 9 : std::vector<SpanRecord> batch;
177 : 9 : batch.reserve(config_.max_export_batch_size);
178 : 9 : ring_.drain([&](const SpanRecord& record) {
179 : 2 : batch.push_back(record);
180 : 2 : if (batch.size() >= config_.max_export_batch_size) {
181 : 0 : (void)exporter_->export_batch(
182 : : std::span<const SpanRecord>(batch.data(), batch.size()));
183 : 0 : batch.clear();
184 : : }
185 : 2 : });
186 : 9 : if (!batch.empty()) {
187 : 2 : (void)exporter_->export_batch(
188 : : std::span<const SpanRecord>(batch.data(), batch.size()));
189 : : }
190 : 9 : }
191 : :
192 : : } // namespace hpactor::tracing
|