timber_rust/logger/loki.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 Dante Doménech Martinez dante19031999@gmail.com
3
4#![cfg(feature = "loki")]
5#![cfg_attr(docsrs, doc(cfg(feature = "loki")))]
6
7use crate::service::{ LokiConfig, LokiData, LokiMessage, ServiceError};
8use crate::{LoggerImpl, LoggerStatus, Message};
9use reqwest::blocking::{Client, RequestBuilder, Response};
10use std::any::Any;
11use std::ops::AddAssign;
12use std::sync::mpsc::Sender;
13use std::sync::{Arc, Mutex};
14use std::thread;
15use std::thread::JoinHandle;
16use std::time::{Duration, SystemTime};
17use crate::service::Loki as LokiService;
18use crate::service::StandardLoki as StandardLokiService;
19
20/// A persistent, asynchronous logger implementation for Grafana Loki.
21///
22/// `LokiLogger` manages a dedicated background worker thread that handles batching,
23/// retries, and HTTP transmission. It is designed to be high-performance, ensuring
24/// that the main application thread is never blocked by network latency.
25///
26/// ### Important Design Notes
27/// - **Monotonic Dating**: This logger uses an internal "highwater" marking system.
28/// The original date of the `Message` is ignored in favor of a strictly increasing
29/// timestamp generated at the moment of dispatch. This prevents Loki from rejecting
30/// out-of-order logs during high-frequency bursts.
31/// - **Worker Lifecycle**: A background thread is spawned on creation. Logs are
32/// transmitted via an MPSC channel. Dropping the logger will signal the worker
33/// to finish pending tasks before shutting down.
34/// - **Performance**: Internal operations (batching, grouping by level) are performed
35/// in-memory, assuming that network I/O is the primary bottleneck.
36pub struct Loki {
37 highwater: Mutex<SystemTime>,
38 worker: Option<JoinHandle<()>>,
39 sender: Option<Sender<LokiMessage>>,
40 data: Arc<LokiData>,
41 service: Arc<dyn LokiService + Send + Sync>,
42}
43
44impl Loki {
45 /// Creates a new [`LokiLogger`][`Loki`] using the [`StandardLokiService`].
46 ///
47 /// This is a convenience wrapper around [`Self::with_service`].
48 pub fn new(config: LokiConfig) -> Box<Loki> {
49 Self::with_service(config, Box::new(StandardLokiService {}))
50 }
51
52 /// Primary constructor that initializes the logger with a custom [`LokiService`].
53 ///
54 /// This method:
55 /// 1. Computes the full Loki Push API endpoint.
56 /// 2. Wraps the provided service in an [`Arc`] for shared access between the logger and the worker.
57 /// 3. Spawns a background worker thread to process the log queue.
58 ///
59 /// ### Dependency Injection
60 /// By providing a custom [`LokiService`], you can override how batches are processed,
61 /// enabling features like compression, custom filtering, or specialized error handling.
62 ///
63 /// # Threading
64 /// Spawns a dedicated OS thread. The worker handles all network I/O and will
65 /// gracefully shut down when the [`LokiLogger`][`Loki`] instance is dropped.
66 ///
67 /// # Panics
68 /// Panics if the internal [`Client`][reqwest::blocking::Client] cannot be initialized.
69 pub fn with_service(
70 config: LokiConfig,
71 service: Box<dyn LokiService + Send + Sync>,
72 ) -> Box<Loki> {
73 let mut post_url = config.url.clone();
74 post_url.push_str("loki/api/v1/push");
75
76 // Loki data
77 let data = Arc::new(LokiData {
78 client: Self::build_client(&config),
79 config,
80 post_url,
81 });
82
83 let work_data = data.clone();
84 let service: Arc<dyn LokiService + Send + Sync> = Arc::from(service);
85 let work_service = service.clone();
86 let (sender, receiver) = std::sync::mpsc::channel::<LokiMessage>();
87
88 Box::new(Loki {
89 highwater: Mutex::new(SystemTime::now()),
90 worker: Some(thread::spawn(move || {
91 work_service.work(receiver, work_data)
92 })),
93 sender: Some(sender),
94 data,
95 service,
96 })
97 }
98
99 /// Constructs a pre-configured HTTP client for the Loki service.
100 ///
101 /// This client is built with the specific connection and request timeouts
102 /// defined in the [`LokiConfig`]. It is intended to be used within an [`Arc`]
103 /// to share a connection pool across the worker's lifecycle.
104 ///
105 /// # Panics
106 /// - Panics if the TLS backend cannot be initialized or if the system
107 /// configuration prevents creating a secure socket.
108 pub fn build_client(config: &LokiConfig) -> Client {
109 Client::builder()
110 .connect_timeout(config.connection_timeout)
111 .timeout(config.request_timeout)
112 .build()
113 .expect("Failed to build reqwest client")
114 }
115
116 /// Performs a synchronous POST request to the Loki push API.
117 ///
118 /// This method handles the low-level HTTP transmission, including setting
119 /// the `Content-Type` header and attaching authentication credentials.
120 ///
121 /// # Errors
122 /// Returns a [`ServiceError::Network`] if the server is unreachable,
123 /// DNS resolution fails, or the connection times out.
124 pub fn request_post(payload: String, data: &Arc<LokiData>) -> Result<Response, ServiceError> {
125 let mut request = data
126 .client
127 .post(data.post_url.as_str())
128 .header("Content-Type", "application/json")
129 .body(payload); // payload is already a String, no need for .to_string()
130
131 request = Loki::request_auth(request, &data);
132
133 request.send().map_err(ServiceError::Network)
134 }
135
136 /// Decorates a [`RequestBuilder`] with the configured authentication method.
137 ///
138 /// Supports both **Basic Auth** (username/password) and **Bearer Token** /// authentication. If both are configured, they will both be applied to
139 /// the request (though Loki usually expects only one).
140 ///
141 /// # Parameters
142 /// - `request`: The initial request builder to decorate.
143 /// - `data`: The shared data containing authentication credentials.
144 pub fn request_auth(mut request: RequestBuilder, data: &Arc<LokiData>) -> RequestBuilder {
145 if let Some(auth) = &data.config.basic_auth {
146 request = request.basic_auth(&auth.username, auth.password.as_deref());
147 }
148 if let Some(token) = &data.config.bearer_auth {
149 request = request.bearer_auth(token);
150 }
151 request
152 }
153}
154
155impl LoggerImpl for Loki {
156 /// Returns the current operational status of the Loki service.
157 ///
158 /// This method performs a live health check by hitting the `/loki/status` endpoint.
159 /// It uses a functional pipeline to transform the network result into a [`LoggerStatus`].
160 ///
161 /// ### Performance Note:
162 /// This call is **blocking**. If the network is slow or the Loki server is hanging,
163 /// this method will block the calling thread until the default timeout is reached.
164 ///
165 /// # Returns
166 /// * [`LoggerStatus::Running`] - Server is reachable and returned a success code.
167 /// * [`LoggerStatus::Broken`] - Any failure occurred (DNS, Connection Refused, 404, 500, etc.).
168 fn status(&self) -> LoggerStatus {
169 // Fetch url
170 let mut url = self.data.config.url.clone();
171 url.push_str("ready");
172
173 // Build request
174 let mut request = self.data.client.get(&url);
175 request = Self::request_auth(request, &self.data);
176
177 // Perform request
178 request
179 .send()
180 .ok()
181 .map(|response| match response.status().is_success() {
182 true => LoggerStatus::Running,
183 false => LoggerStatus::Broken,
184 })
185 .unwrap_or(LoggerStatus::Broken)
186 }
187
188 /// Enqueues a [`Message`] for asynchronous processing and delivery.
189 ///
190 /// This is the primary entry point for recording logs. It performs two critical tasks:
191 /// 1. **Monotonic Timestamping**: Uses a "highwater" Mutex to ensure every message
192 /// has a strictly increasing timestamp, preventing Loki out-of-order errors.
193 /// 2. **Asynchronous Dispatch**: Sends the message through an MPSC channel to the
194 /// background worker.
195 ///
196 /// ### Thread Safety & Performance
197 /// This method is non-blocking (except for a very brief lock on the highwater
198 /// timestamp). If the background worker is overloaded or the channel is
199 /// disconnected, it triggers the [`Fallback`][`crate::service::Fallback`] immediately to avoid data loss.
200 ///
201 /// # Parameters
202 /// - `message`: The log entry containing level, target, and content.
203 fn log(&self, message: Message) {
204 // Process highwater
205 let mut timestamp = SystemTime::now();
206 if let Ok(mut highwater) = self.highwater.lock() {
207 if timestamp <= *highwater {
208 highwater.add_assign(Duration::new(0, 1));
209 timestamp = *highwater
210 }
211 }
212
213 // Add to queue
214 if let Some(sender) = &self.sender {
215 if let Err(error) = sender.send(LokiMessage { timestamp, message }) {
216 let message = error.0;
217 self.service
218 .fallback(&ServiceError::LockPoisoned, &message.message);
219 }
220 } else {
221 self.service.fallback(&ServiceError::LockPoisoned, &message);
222 }
223 }
224
225 /// Returns a reference to the underlying type as [Any] for downcasting.
226 fn as_any(&self) -> &dyn Any {
227 self
228 }
229}
230
231/// Ensures a graceful shutdown of the logging pipeline.
232///
233/// When the `LokiLogger` goes out of scope, the following sequence occurs:
234/// 1. **Channel Closure**: The `sender` is dropped (`None`). This signals the background
235/// worker that no more messages will be sent.
236/// 2. **Worker Drain**: The worker's `receiver.recv()` will return an error once
237/// the channel is empty, allowing its loop to terminate naturally.
238/// 3. **Thread Join**: The main thread blocks until the worker thread has finished
239/// processing and sending the final batch of logs.
240///
241/// This mechanism prevents data loss during application shutdown or restarts.
242impl Drop for Loki {
243 fn drop(&mut self) {
244 // Drop the sender first to close the MPSC channel
245 self.sender = None;
246
247 // Wait for the worker thread to finish its last batch
248 if let Some(worker) = self.worker.take() {
249 let _ = worker.join();
250 }
251 }
252}