timber_rust/logger/
queued.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 Dante Doménech Martinez dante19031999@gmail.com
3
4use crate::{LoggerImpl, LoggerStatus, Message, Service};
5use crossbeam::channel::Sender;
6use std::any::Any;
7use std::sync::Arc;
8use std::thread;
9use std::thread::JoinHandle;
10
11/// Wrapper for messages in flight.
12///
13/// This struct implements the "Traveling Sender" pattern, where the message
14/// carries its own means of re-queuing itself. This decouples the channel's
15/// lifetime from the workers and ties it to the completion of the work.
16struct PerishableMessage {
17    /// The actual log payload.
18    pub message: Message,
19    /// Number of retry attempts remaining.
20    pub lives: usize,
21    /// A clone of the producer-side channel handle used for retries.
22    pub sender: Sender<PerishableMessage>,
23}
24
25/// An asynchronous, thread-pooled logger with guaranteed completion.
26///
27/// `AsyncLogger` acts as an orchestration layer that offloads logging side-effects
28/// to a background pool of workers. It is designed for high-throughput scenarios
29/// where log generation should not block the main application execution.
30///
31/// ### Architecture: The "Traveling Sender" Pattern
32/// This logger implements a resilient dispatch system using [`crossbeam`] channels.
33/// Each [`Message`] is wrapped in a `PerishableMessage` which carries its own
34/// [`Sender`] clone. This ensures that:
35/// 1. **Decoupled Retries**: Workers can re-queue failed tasks without needing
36///    access to the parent logger's state.
37/// 2. **Lifespan Tracking**: Messages have a finite number of `lives`, preventing
38///    infinite loops in case of persistent service failure.
39///
40/// ### Graceful Shutdown & Persistence
41/// The `AsyncLogger` guarantees that no log is lost during program termination:
42/// - **Drain Policy**: On `drop()`, the primary sender is destroyed. Workers will
43///   continue processing until the internal buffer is empty and all re-queue
44///   attempts (lives) are exhausted.
45/// - **Thread Synchronization**: The logger waits (`join`) for all worker threads
46///   to finish their tasks before allowing the process to exit.
47///
48/// ### Important Design Considerations
49/// - **Ordering**: Because this logger uses multiple concurrent workers (MPMC),
50///   **strict chronological ordering is not guaranteed**. While each `Message`
51///   retains its original timestamp, the order in which they reach the final
52///   destination may vary due to thread scheduling and retry logic.
53/// - **Compatibility**: This pattern is ideal for I/O-bound services like local
54///   files or standard streams. It is **not recommended** for services requiring
55///   strict sequential consistency (e.g., Loki or Aws).
56///
57/// ### Example
58/// ```rust
59/// # use std::fs::File;
60/// # use timber_rust::{LogLevel, Logger};
61/// # use timber_rust::Message;
62/// # use timber_rust::MessageFactory;
63/// # use timber_rust::QueuedLogger;
64/// # use timber_rust::service::StandardCoutWrite;
65/// let service = StandardCoutWrite::new();
66/// let logger = QueuedLogger::new(service, 3, 4); // 3 retries, 4 worker threads
67/// let logger = Logger::new(logger);
68/// logger.log((LogLevel::Info,"System started"));
69/// ```
70pub struct Queued {
71    /// The underlying service used to perform the actual logging/work.
72    service: Arc<dyn Service + Send + Sync>,
73    /// Default number of retries for new messages.
74    max_retries: usize,
75    /// Handles to the background worker threads.
76    workers: Vec<JoinHandle<()>>,
77    /// The primary entry point for the queue. Wrapped in an `Option` to allow
78    /// the `Drop` implementation to signal shutdown by destroying the sender.
79    sender: Option<Sender<PerishableMessage>>,
80}
81
82impl Queued {
83    /// Creates a new [`QueuedLogger`][`Queued`] and initializes the worker pool.
84    ///
85    /// # Arguments
86    /// * `service` - The logging service implementation.
87    /// * `max_retries` - How many times a failed log should be re-queued.
88    /// * `worker_count` - The number of background threads to spawn.
89    pub fn new(
90        service: Box<dyn Service + Send + Sync>,
91        max_retries: usize,
92        worker_count: usize,
93    ) -> Box<Self> {
94        let service: Arc<dyn Service + Send + Sync> = Arc::from(service);
95        let (sender, receiver) = crossbeam::channel::unbounded::<PerishableMessage>();
96        let mut workers = Vec::with_capacity(worker_count);
97
98        for _ in 0..worker_count {
99            let worker_receiver = receiver.clone();
100            let worker_service = service.clone();
101
102            workers.push(thread::spawn(move || {
103                while let Ok( mut message) = worker_receiver.recv() {
104
105                    // Attempt to log
106                    let result = worker_service.work(&message.message);
107                    if result.is_ok() {
108                        continue;
109                    }
110
111                    // Treat error
112                    let error = result.unwrap_err();
113                    // Discard if lives reaches 0
114                    if message.lives == 0 {
115                        worker_service.fallback(&error, &message.message);
116                        continue;
117                    }
118
119                    // Retry
120                    message.lives -= 1;
121                    let sender = message.sender.clone();
122                    let result = sender.send(message);
123                    if let Err(err) = result {
124                        worker_service.fallback(&error, &err.0.message);
125                    }
126                }
127            }));
128        }
129
130        Box::new(Queued {
131            service,
132            max_retries,
133            workers,
134            sender: Some(sender),
135        })
136    }
137
138    /// Returns the base service
139    pub fn get_service(&self) -> &dyn Service {
140        self.service.as_ref()
141    }
142
143    /// Consumes the logger and returns the underlying service.
144    ///
145    /// This method will **block** the current thread until all background workers
146    /// have finished processing the current queue and all retries have been
147    /// exhausted. This ensures that the returned service is in a clean,
148    /// idle state.
149    pub fn take_service(self) -> Arc<dyn Service + Send + Sync> {
150        // self is dropped here, triggering the Drop impl
151        // which joins all worker threads.
152        self.service.clone()
153    }
154}
155
156impl LoggerImpl for Queued {
157    /// Checks the health status of the underlying service.
158    fn status(&self) -> LoggerStatus {
159        self.service.status()
160    }
161
162    /// Dispatches a message to the background worker pool.
163    ///
164    /// Each message is bundled with a clone of the sender to enable
165    /// self-contained retry logic within the worker threads.
166    fn log(&self, message: Message) {
167        // Safe to expect: Borrow checker prevents log() during drop().
168        let sender = self
169            .sender
170            .as_ref()
171            .expect("AsyncLogger integrity violation: log() called after drop() initialization.");
172
173        let _ = sender.send(PerishableMessage {
174            message,
175            lives: self.max_retries,
176            sender: sender.clone(),
177        });
178    }
179
180    /// Downcasts the logger to `Any` for reflection-like capabilities.
181    fn as_any(&self) -> &dyn Any {
182        self
183    }
184}
185
186impl Drop for Queued {
187    /// Orchestrates a graceful shutdown of the logger.
188    ///
189    /// 1. The primary [`Sender`] is dropped (set to `None`).
190    /// 2. This does not immediately close the channel if messages are in the queue,
191    ///    as those messages carry their own [`Sender`] clones.
192    /// 3. Once all messages are processed/exhausted, the last [`Sender`] dies.
193    /// 4. `worker_receiver.recv()` returns `Err`, and threads exit.
194    /// 5. [`join()`][std::thread::JoinHandle::join()] ensures all threads have finished before the program proceeds.
195    fn drop(&mut self) {
196        // Pull the plug on the primary entry point.
197        self.sender = None;
198
199        // Wait for workers to drain the existing queue and self-terminate.
200        for worker in self.workers.drain(..) {
201            let _ = worker.join();
202        }
203    }
204}