timber_rust/service/loki/service.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 Dante Doménech Martinez dante19031999@gmail.com
3
4#![cfg(feature = "loki")]
5#![cfg_attr(docsrs, doc(cfg(feature = "loki")))]
6
7use crate::service::{HttpError, LokiData, LokiMessage, ServiceError};
8use crate::{Fallback, LokiLogger, Message};
9use chrono::{SecondsFormat, Utc};
10use reqwest::blocking::Response;
11use serde_json::json;
12use std::sync::Arc;
13use std::sync::mpsc::Receiver;
14use std::time::UNIX_EPOCH;
15
16/// The core logic provider for Loki interactions.
17///
18/// This trait defines how batches of logs are collected, formatted, and pushed.
19/// By implementing this trait, you can customize the batching strategy or
20/// adding custom logic (like GZIP compression) before sending.
21///
22/// ### Required Hierarchy
23/// Must implement [`LokiFallback`][`Fallback`] to handle delivery failures.
24pub trait Loki: Fallback {
25 /// Orchestrates the background worker loop.
26 ///
27 /// This method is the entry point for the worker thread. It implements a
28 /// **double-buffer/drain** strategy:
29 /// 1. It blocks on `receiver.recv()` until at least one message is available,
30 /// ensuring zero CPU usage during idle periods.
31 /// 2. Once a message arrives, it performs a non-blocking `try_recv` loop to
32 /// "drain" the channel and form a batch.
33 /// 3. It groups messages by their log level into a `BTreeMap` to maintain
34 /// consistent stream ordering before calling [`work_batch`][`Self::work_batch()`].
35 ///
36 /// This design assumes that network I/O is the bottleneck, allowing the
37 /// in-memory collection of logs to be extremely fast.
38 fn work(&self, receiver: Receiver<LokiMessage>, data: Arc<LokiData>) {
39 let mut messages = std::collections::BTreeMap::<String, Vec<LokiMessage>>::new();
40
41 // Get first message
42 while let Ok(message) = receiver.recv() {
43 // Deal with message
44 let level = message.message.level().to_string();
45 messages.entry(level).or_insert_with(Vec::new).push(message);
46
47 // Get batch
48 while let Ok(message) = receiver.try_recv() {
49 let level = message.message.level().to_string();
50 messages.entry(level).or_insert_with(Vec::new).push(message);
51 }
52
53 // Process batch
54 for (level, batch) in &mut messages {
55 if !batch.is_empty() {
56 let _ = self.work_batch(level, batch, &data);
57 }
58 } // Process batch
59 } // Receive message
60 }
61
62 /// Handles the transformation and transmission of a specific log batch.
63 ///
64 /// This method performs the following steps:
65 /// 1. **Serialization**: Converts the batch of [`LokiMessage`] into a
66 /// JSON payload compatible with the Loki Push API.
67 /// 2. **Transmission**: Sends the payload via a POST request.
68 /// 3. **Retry Logic**: If the request fails or returns a non-success status,
69 /// it enters a retry loop up to the `max_retries` limit defined in [`LokiConfig`][`crate::service::LokiConfig`].
70 /// 4. **Safety Net**: If all retries fail, it triggers the [`LokiFallback`][`Fallback`]
71 /// mechanism for every message in the batch.
72 /// 5. **Cleanup**: Clears the batch vector to prepare it for the next iteration,
73 /// reusing the allocated memory capacity.
74 ///
75 /// # Returns
76 ///
77 /// Returns the last [`Response`] received from the server, or a [`ServiceError`]
78 /// if the transmission was impossible.
79 fn work_batch(
80 &self,
81 level: &str,
82 batch: &mut Vec<LokiMessage>,
83 data: &Arc<LokiData>,
84 ) -> Result<Response, ServiceError> {
85 // Prepare messages for loki
86 let payload_batch = batch
87 .iter()
88 .map(|message| {
89 let timestamp = message
90 .timestamp
91 .duration_since(UNIX_EPOCH)
92 .expect("Time went backwards")
93 .as_nanos();
94 json!([timestamp.to_string(), message.message.content().to_string()])
95 })
96 .collect::<Vec<_>>();
97
98 // Payload
99 let payload = json!({
100 "streams": [{
101 "stream": {
102 "app": data.config.get_app(),
103 "job": data.config.get_job(),
104 "env": data.config.get_env(),
105 "level": level,
106 },
107 "values": payload_batch
108 }]
109 });
110
111 // First attempt
112 let txt_payload = payload.to_string();
113 let mut response = LokiLogger::request_post(txt_payload, &data);
114
115 // Check if retry
116 let needs_retry = match &response {
117 Ok(response) => !response.status().is_success(),
118 Err(_) => true,
119 };
120
121 // Attempt until end of lives
122 if needs_retry {
123 let mut i = 0usize;
124 let payload = payload.to_string();
125
126 response = loop {
127 let current_response = LokiLogger::request_post(payload.clone(), &data);
128
129 // Check if retry
130 let is_success = match ¤t_response {
131 Ok(current_response) => current_response.status().is_success(),
132 Err(_) => false,
133 };
134
135 // End if retry
136 if is_success {
137 break current_response;
138 }
139
140 if i >= data.config.get_max_retries() {
141 match current_response {
142 Ok(current_response) => {
143 break Err(ServiceError::Http(HttpError::new(
144 current_response.status().as_u16(),
145 )));
146 }
147 Err(_) => {
148 break current_response;
149 }
150 }
151 }
152
153 i += 1;
154 } // Loop
155 } // If not first OK
156
157 // Last fallback
158 if let Err(error) = &response {
159 for message in &*batch {
160 self.fallback(error, &message.message);
161 }
162 }
163
164 // Clear message trail
165 batch.clear();
166
167 response
168 }
169}
170
171/// The standard implementation of the Loki transport logic.
172///
173/// [`StandardLokiService`][`StandardLoki`] provides the baseline behavior for the logging pipeline,
174/// using the standard batching and retry mechanisms defined in the [`LokiService`][`Loki`]
175/// default trait methods.
176///
177/// ### Behavior
178/// - **Transport**: Uses the `reqwest` blocking client to push JSON payloads.
179/// - **Fallback**: Inherits the default [`LokiFallback`][`Fallback`] behavior, which redirects
180/// failed logs to the system's standard error and output streams.
181///
182/// This struct is stateless, acting primarily as a marker to satisfy the trait
183/// requirements of the [`LokiLogger`].
184pub struct StandardLoki {}
185
186impl Loki for StandardLoki {}
187
188impl Fallback for StandardLoki {
189 /// Handles the ultimate delivery failure for a log message.
190 ///
191 /// This method is the "safety net" of the logging pipeline. It is invoked when
192 /// a message cannot be sent to Loki after exhausting all retry attempts or
193 /// encountering a non-recoverable error (like a 400 Bad Request).
194 ///
195 /// ### Default Behavior
196 /// The default implementation performs a **Best-Effort** recovery:
197 /// 1. Formats the failure reason (HTTP status or Network error) to `stderr`.
198 /// 2. Prints the original log content to `stdout` with its timestamp,
199 /// level, and payload.
200 ///
201 /// This ensures that even in a total network collapse, the logs are
202 /// captured by the system's standard output streams (useful for Docker/K8s
203 /// logs collectors).
204 ///
205 /// ### Thread Safety
206 /// This is called from the background worker thread. Any custom implementation
207 /// must be non-blocking and thread-safe to avoid stalling the entire
208 /// logging pipeline.
209 fn fallback(&self, error: &ServiceError, msg: &Message) {
210 let now: chrono::DateTime<Utc> = msg.instant().into();
211 let now = now.to_rfc3339_opts(SecondsFormat::Nanos, true);
212
213 match error {
214 ServiceError::Http(e) => eprintln!("Loki rejected log: Status {}", e.status_code()),
215 ServiceError::Network(e) => eprintln!("Loki network error: {}", e),
216 _ => eprintln!("Loki service failure: {}", error),
217 }
218
219 println!("{} [{}] | {}", now, msg.level(), msg.content());
220 }
221}