timber_rust/logger/
loki.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 Dante Doménech Martinez dante19031999@gmail.com
3
4#![cfg(feature = "loki")]
5#![cfg_attr(docsrs, doc(cfg(feature = "loki")))]
6
7use crate::service::{ LokiConfig, LokiData, LokiMessage,  ServiceError};
8use crate::{LoggerImpl, LoggerStatus, Message};
9use reqwest::blocking::{Client, RequestBuilder, Response};
10use std::any::Any;
11use std::ops::AddAssign;
12use std::sync::mpsc::Sender;
13use std::sync::{Arc, Mutex};
14use std::thread;
15use std::thread::JoinHandle;
16use std::time::{Duration, SystemTime};
17use crate::service::Loki as LokiService;
18use crate::service::StandardLoki as StandardLokiService;
19
20/// A persistent, asynchronous logger implementation for Grafana Loki.
21///
22/// `LokiLogger` manages a dedicated background worker thread that handles batching,
23/// retries, and HTTP transmission. It is designed to be high-performance, ensuring
24/// that the main application thread is never blocked by network latency.
25///
26/// ### Important Design Notes
27/// - **Monotonic Dating**: This logger uses an internal "highwater" marking system.
28///   The original date of the `Message` is ignored in favor of a strictly increasing
29///   timestamp generated at the moment of dispatch. This prevents Loki from rejecting
30///   out-of-order logs during high-frequency bursts.
31/// - **Worker Lifecycle**: A background thread is spawned on creation. Logs are
32///   transmitted via an MPSC channel. Dropping the logger will signal the worker
33///   to finish pending tasks before shutting down.
34/// - **Performance**: Internal operations (batching, grouping by level) are performed
35///   in-memory, assuming that network I/O is the primary bottleneck.
36pub struct Loki {
37    highwater: Mutex<SystemTime>,
38    worker: Option<JoinHandle<()>>,
39    sender: Option<Sender<LokiMessage>>,
40    data: Arc<LokiData>,
41    service: Arc<dyn LokiService + Send + Sync>,
42}
43
44impl Loki {
45    /// Creates a new [`LokiLogger`][`Loki`] using the [`StandardLokiService`].
46    ///
47    /// This is a convenience wrapper around [`Self::with_service`].
48    pub fn new(config: LokiConfig) -> Box<Loki> {
49        Self::with_service(config, Box::new(StandardLokiService {}))
50    }
51
52    /// Primary constructor that initializes the logger with a custom [`LokiService`].
53    ///
54    /// This method:
55    /// 1. Computes the full Loki Push API endpoint.
56    /// 2. Wraps the provided service in an [`Arc`] for shared access between the logger and the worker.
57    /// 3. Spawns a background worker thread to process the log queue.
58    ///
59    /// ### Dependency Injection
60    /// By providing a custom [`LokiService`], you can override how batches are processed,
61    /// enabling features like compression, custom filtering, or specialized error handling.
62    ///
63    /// # Threading
64    /// Spawns a dedicated OS thread. The worker handles all network I/O and will
65    /// gracefully shut down when the [`LokiLogger`][`Loki`] instance is dropped.
66    ///
67    /// # Panics
68    /// Panics if the internal [`Client`][reqwest::blocking::Client] cannot be initialized.
69    pub fn with_service(
70        config: LokiConfig,
71        service: Box<dyn LokiService + Send + Sync>,
72    ) -> Box<Loki> {
73        let mut post_url = config.url.clone();
74        post_url.push_str("loki/api/v1/push");
75
76        // Loki data
77        let data = Arc::new(LokiData {
78            client: Self::build_client(&config),
79            config,
80            post_url,
81        });
82
83        let work_data = data.clone();
84        let service: Arc<dyn LokiService + Send + Sync> = Arc::from(service);
85        let work_service = service.clone();
86        let (sender, receiver) = std::sync::mpsc::channel::<LokiMessage>();
87
88        Box::new(Loki {
89            highwater: Mutex::new(SystemTime::now()),
90            worker: Some(thread::spawn(move || {
91                work_service.work(receiver, work_data)
92            })),
93            sender: Some(sender),
94            data,
95            service,
96        })
97    }
98
99    /// Constructs a pre-configured HTTP client for the Loki service.
100    ///
101    /// This client is built with the specific connection and request timeouts
102    /// defined in the [`LokiConfig`]. It is intended to be used within an [`Arc`]
103    /// to share a connection pool across the worker's lifecycle.
104    ///
105    /// # Panics
106    /// - Panics if the TLS backend cannot be initialized or if the system
107    /// configuration prevents creating a secure socket.
108    pub fn build_client(config: &LokiConfig) -> Client {
109        Client::builder()
110            .connect_timeout(config.connection_timeout)
111            .timeout(config.request_timeout)
112            .build()
113            .expect("Failed to build reqwest client")
114    }
115
116    /// Performs a synchronous POST request to the Loki push API.
117    ///
118    /// This method handles the low-level HTTP transmission, including setting
119    /// the `Content-Type` header and attaching authentication credentials.
120    ///
121    /// # Errors
122    /// Returns a [`ServiceError::Network`] if the server is unreachable,
123    /// DNS resolution fails, or the connection times out.
124    pub fn request_post(payload: String, data: &Arc<LokiData>) -> Result<Response, ServiceError> {
125        let mut request = data
126            .client
127            .post(data.post_url.as_str())
128            .header("Content-Type", "application/json")
129            .body(payload); // payload is already a String, no need for .to_string()
130
131        request = Loki::request_auth(request, &data);
132
133        request.send().map_err(ServiceError::Network)
134    }
135
136    /// Decorates a [`RequestBuilder`] with the configured authentication method.
137    ///
138    /// Supports both **Basic Auth** (username/password) and **Bearer Token** /// authentication. If both are configured, they will both be applied to
139    /// the request (though Loki usually expects only one).
140    ///
141    /// # Parameters
142    /// - `request`: The initial request builder to decorate.
143    /// - `data`: The shared data containing authentication credentials.
144    pub fn request_auth(mut request: RequestBuilder, data: &Arc<LokiData>) -> RequestBuilder {
145        if let Some(auth) = &data.config.basic_auth {
146            request = request.basic_auth(&auth.username, auth.password.as_deref());
147        }
148        if let Some(token) = &data.config.bearer_auth {
149            request = request.bearer_auth(token);
150        }
151        request
152    }
153}
154
155impl LoggerImpl for Loki {
156    /// Returns the current operational status of the Loki service.
157    ///
158    /// This method performs a live health check by hitting the `/loki/status` endpoint.
159    /// It uses a functional pipeline to transform the network result into a [`LoggerStatus`].
160    ///
161    /// ### Performance Note:
162    /// This call is **blocking**. If the network is slow or the Loki server is hanging,
163    /// this method will block the calling thread until the default timeout is reached.
164    ///
165    /// # Returns
166    /// * [`LoggerStatus::Running`] - Server is reachable and returned a success code.
167    /// * [`LoggerStatus::Broken`] - Any failure occurred (DNS, Connection Refused, 404, 500, etc.).
168    fn status(&self) -> LoggerStatus {
169        // Fetch url
170        let mut url = self.data.config.url.clone();
171        url.push_str("ready");
172
173        // Build request
174        let mut request = self.data.client.get(&url);
175        request = Self::request_auth(request, &self.data);
176
177        // Perform request
178        request
179            .send()
180            .ok()
181            .map(|response| match response.status().is_success() {
182                true => LoggerStatus::Running,
183                false => LoggerStatus::Broken,
184            })
185            .unwrap_or(LoggerStatus::Broken)
186    }
187
188    /// Enqueues a [`Message`] for asynchronous processing and delivery.
189    ///
190    /// This is the primary entry point for recording logs. It performs two critical tasks:
191    /// 1. **Monotonic Timestamping**: Uses a "highwater" Mutex to ensure every message
192    ///    has a strictly increasing timestamp, preventing Loki out-of-order errors.
193    /// 2. **Asynchronous Dispatch**: Sends the message through an MPSC channel to the
194    ///    background worker.
195    ///
196    /// ### Thread Safety & Performance
197    /// This method is non-blocking (except for a very brief lock on the highwater
198    /// timestamp). If the background worker is overloaded or the channel is
199    /// disconnected, it triggers the [`Fallback`][`crate::service::Fallback`] immediately to avoid data loss.
200    ///
201    /// # Parameters
202    /// - `message`: The log entry containing level, target, and content.
203    fn log(&self, message: Message) {
204        // Process highwater
205        let mut timestamp = SystemTime::now();
206        if let Ok(mut highwater) = self.highwater.lock() {
207            if timestamp <= *highwater {
208                highwater.add_assign(Duration::new(0, 1));
209                timestamp = *highwater
210            }
211        }
212
213        // Add to queue
214        if let Some(sender) = &self.sender {
215            if let Err(error) = sender.send(LokiMessage { timestamp, message }) {
216                let message = error.0;
217                self.service
218                    .fallback(&ServiceError::LockPoisoned, &message.message);
219            }
220        } else {
221            self.service.fallback(&ServiceError::LockPoisoned, &message);
222        }
223    }
224
225    /// Returns a reference to the underlying type as [Any] for downcasting.
226    fn as_any(&self) -> &dyn Any {
227        self
228    }
229}
230
231/// Ensures a graceful shutdown of the logging pipeline.
232///
233/// When the `LokiLogger` goes out of scope, the following sequence occurs:
234/// 1. **Channel Closure**: The `sender` is dropped (`None`). This signals the background
235///    worker that no more messages will be sent.
236/// 2. **Worker Drain**: The worker's `receiver.recv()` will return an error once
237///    the channel is empty, allowing its loop to terminate naturally.
238/// 3. **Thread Join**: The main thread blocks until the worker thread has finished
239///    processing and sending the final batch of logs.
240///
241/// This mechanism prevents data loss during application shutdown or restarts.
242impl Drop for Loki {
243    fn drop(&mut self) {
244        // Drop the sender first to close the MPSC channel
245        self.sender = None;
246
247        // Wait for the worker thread to finish its last batch
248        if let Some(worker) = self.worker.take() {
249            let _ = worker.join();
250        }
251    }
252}