timber_rust/service/loki/
service.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 Dante Doménech Martinez dante19031999@gmail.com
3
4#![cfg(feature = "loki")]
5#![cfg_attr(docsrs, doc(cfg(feature = "loki")))]
6
7use crate::service::{HttpError, LokiData, LokiMessage, ServiceError};
8use crate::{Fallback, LokiLogger, Message};
9use chrono::{SecondsFormat, Utc};
10use reqwest::blocking::Response;
11use serde_json::json;
12use std::sync::Arc;
13use std::sync::mpsc::Receiver;
14use std::time::UNIX_EPOCH;
15
16/// The core logic provider for Loki interactions.
17///
18/// This trait defines how batches of logs are collected, formatted, and pushed.
19/// By implementing this trait, you can customize the batching strategy or
20/// adding custom logic (like GZIP compression) before sending.
21///
22/// ### Required Hierarchy
23/// Must implement [`LokiFallback`][`Fallback`] to handle delivery failures.
24pub trait Loki: Fallback {
25    /// Orchestrates the background worker loop.
26    ///
27    /// This method is the entry point for the worker thread. It implements a
28    /// **double-buffer/drain** strategy:
29    /// 1. It blocks on `receiver.recv()` until at least one message is available,
30    ///    ensuring zero CPU usage during idle periods.
31    /// 2. Once a message arrives, it performs a non-blocking `try_recv` loop to
32    ///    "drain" the channel and form a batch.
33    /// 3. It groups messages by their log level into a `BTreeMap` to maintain
34    ///    consistent stream ordering before calling [`work_batch`][`Self::work_batch()`].
35    ///
36    /// This design assumes that network I/O is the bottleneck, allowing the
37    /// in-memory collection of logs to be extremely fast.
38    fn work(&self, receiver: Receiver<LokiMessage>, data: Arc<LokiData>) {
39        let mut messages = std::collections::BTreeMap::<String, Vec<LokiMessage>>::new();
40
41        // Get first message
42        while let Ok(message) = receiver.recv() {
43            // Deal with message
44            let level = message.message.level().to_string();
45            messages.entry(level).or_insert_with(Vec::new).push(message);
46
47            // Get batch
48            while let Ok(message) = receiver.try_recv() {
49                let level = message.message.level().to_string();
50                messages.entry(level).or_insert_with(Vec::new).push(message);
51            }
52
53            // Process batch
54            for (level, batch) in &mut messages {
55                if !batch.is_empty() {
56                    let _ = self.work_batch(level, batch, &data);
57                }
58            } // Process batch
59        } // Receive message
60    }
61
62    /// Handles the transformation and transmission of a specific log batch.
63    ///
64    /// This method performs the following steps:
65    /// 1. **Serialization**: Converts the batch of [`LokiMessage`] into a
66    ///    JSON payload compatible with the Loki Push API.
67    /// 2. **Transmission**: Sends the payload via a POST request.
68    /// 3. **Retry Logic**: If the request fails or returns a non-success status,
69    ///    it enters a retry loop up to the `max_retries` limit defined in [`LokiConfig`][`crate::service::LokiConfig`].
70    /// 4. **Safety Net**: If all retries fail, it triggers the [`LokiFallback`][`Fallback`]
71    ///    mechanism for every message in the batch.
72    /// 5. **Cleanup**: Clears the batch vector to prepare it for the next iteration,
73    ///    reusing the allocated memory capacity.
74    ///
75    /// # Returns
76    ///
77    /// Returns the last [`Response`] received from the server, or a [`ServiceError`]
78    /// if the transmission was impossible.
79    fn work_batch(
80        &self,
81        level: &str,
82        batch: &mut Vec<LokiMessage>,
83        data: &Arc<LokiData>,
84    ) -> Result<Response, ServiceError> {
85        // Prepare messages for loki
86        let payload_batch = batch
87            .iter()
88            .map(|message| {
89                let timestamp = message
90                    .timestamp
91                    .duration_since(UNIX_EPOCH)
92                    .expect("Time went backwards")
93                    .as_nanos();
94                json!([timestamp.to_string(), message.message.content().to_string()])
95            })
96            .collect::<Vec<_>>();
97
98        // Payload
99        let payload = json!({
100            "streams": [{
101                "stream": {
102                    "app": data.config.get_app(),
103                    "job": data.config.get_job(),
104                    "env": data.config.get_env(),
105                    "level": level,
106                },
107                "values": payload_batch
108            }]
109        });
110
111        // First attempt
112        let txt_payload = payload.to_string();
113        let mut response = LokiLogger::request_post(txt_payload, &data);
114
115        // Check if retry
116        let needs_retry = match &response {
117            Ok(response) => !response.status().is_success(),
118            Err(_) => true,
119        };
120
121        // Attempt until end of lives
122        if needs_retry {
123            let mut i = 0usize;
124            let payload = payload.to_string();
125
126            response = loop {
127                let current_response = LokiLogger::request_post(payload.clone(), &data);
128
129                // Check if retry
130                let is_success = match &current_response {
131                    Ok(current_response) => current_response.status().is_success(),
132                    Err(_) => false,
133                };
134
135                // End if retry
136                if is_success {
137                    break current_response;
138                }
139
140                if i >= data.config.get_max_retries() {
141                    match current_response {
142                        Ok(current_response) => {
143                            break Err(ServiceError::Http(HttpError::new(
144                                current_response.status().as_u16(),
145                            )));
146                        }
147                        Err(_) => {
148                            break current_response;
149                        }
150                    }
151                }
152
153                i += 1;
154            } // Loop
155        } // If not first OK
156
157        // Last fallback
158        if let Err(error) = &response {
159            for message in &*batch {
160                self.fallback(error, &message.message);
161            }
162        }
163
164        // Clear message trail
165        batch.clear();
166
167        response
168    }
169}
170
171/// The standard implementation of the Loki transport logic.
172///
173/// [`StandardLokiService`][`StandardLoki`] provides the baseline behavior for the logging pipeline,
174/// using the standard batching and retry mechanisms defined in the [`LokiService`][`Loki`]
175/// default trait methods.
176///
177/// ### Behavior
178/// - **Transport**: Uses the `reqwest` blocking client to push JSON payloads.
179/// - **Fallback**: Inherits the default [`LokiFallback`][`Fallback`] behavior, which redirects
180///   failed logs to the system's standard error and output streams.
181///
182/// This struct is stateless, acting primarily as a marker to satisfy the trait
183/// requirements of the [`LokiLogger`].
184pub struct StandardLoki {}
185
186impl Loki for StandardLoki {}
187
188impl Fallback for StandardLoki {
189    /// Handles the ultimate delivery failure for a log message.
190    ///
191    /// This method is the "safety net" of the logging pipeline. It is invoked when
192    /// a message cannot be sent to Loki after exhausting all retry attempts or
193    /// encountering a non-recoverable error (like a 400 Bad Request).
194    ///
195    /// ### Default Behavior
196    /// The default implementation performs a **Best-Effort** recovery:
197    /// 1. Formats the failure reason (HTTP status or Network error) to `stderr`.
198    /// 2. Prints the original log content to `stdout` with its timestamp,
199    ///    level, and payload.
200    ///
201    /// This ensures that even in a total network collapse, the logs are
202    /// captured by the system's standard output streams (useful for Docker/K8s
203    /// logs collectors).
204    ///
205    /// ### Thread Safety
206    /// This is called from the background worker thread. Any custom implementation
207    /// must be non-blocking and thread-safe to avoid stalling the entire
208    /// logging pipeline.
209    fn fallback(&self, error: &ServiceError, msg: &Message) {
210        let now: chrono::DateTime<Utc> = msg.instant().into();
211        let now = now.to_rfc3339_opts(SecondsFormat::Nanos, true);
212
213        match error {
214            ServiceError::Http(e) => eprintln!("Loki rejected log: Status {}", e.status_code()),
215            ServiceError::Network(e) => eprintln!("Loki network error: {}", e),
216            _ => eprintln!("Loki service failure: {}", error),
217        }
218
219        println!("{} [{}] | {}", now, msg.level(), msg.content());
220    }
221}