timber_rust/logger/queued.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 Dante Doménech Martinez dante19031999@gmail.com
3
4use crate::{LoggerImpl, LoggerStatus, Message, Service};
5use crossbeam::channel::Sender;
6use std::any::Any;
7use std::sync::Arc;
8use std::thread;
9use std::thread::JoinHandle;
10
11/// Wrapper for messages in flight.
12///
13/// This struct implements the "Traveling Sender" pattern, where the message
14/// carries its own means of re-queuing itself. This decouples the channel's
15/// lifetime from the workers and ties it to the completion of the work.
16struct PerishableMessage {
17 /// The actual log payload.
18 pub message: Message,
19 /// Number of retry attempts remaining.
20 pub lives: usize,
21 /// A clone of the producer-side channel handle used for retries.
22 pub sender: Sender<PerishableMessage>,
23}
24
25/// An asynchronous, thread-pooled logger with guaranteed completion.
26///
27/// `AsyncLogger` acts as an orchestration layer that offloads logging side-effects
28/// to a background pool of workers. It is designed for high-throughput scenarios
29/// where log generation should not block the main application execution.
30///
31/// ### Architecture: The "Traveling Sender" Pattern
32/// This logger implements a resilient dispatch system using [`crossbeam`] channels.
33/// Each [`Message`] is wrapped in a `PerishableMessage` which carries its own
34/// [`Sender`] clone. This ensures that:
35/// 1. **Decoupled Retries**: Workers can re-queue failed tasks without needing
36/// access to the parent logger's state.
37/// 2. **Lifespan Tracking**: Messages have a finite number of `lives`, preventing
38/// infinite loops in case of persistent service failure.
39///
40/// ### Graceful Shutdown & Persistence
41/// The `AsyncLogger` guarantees that no log is lost during program termination:
42/// - **Drain Policy**: On `drop()`, the primary sender is destroyed. Workers will
43/// continue processing until the internal buffer is empty and all re-queue
44/// attempts (lives) are exhausted.
45/// - **Thread Synchronization**: The logger waits (`join`) for all worker threads
46/// to finish their tasks before allowing the process to exit.
47///
48/// ### Important Design Considerations
49/// - **Ordering**: Because this logger uses multiple concurrent workers (MPMC),
50/// **strict chronological ordering is not guaranteed**. While each `Message`
51/// retains its original timestamp, the order in which they reach the final
52/// destination may vary due to thread scheduling and retry logic.
53/// - **Compatibility**: This pattern is ideal for I/O-bound services like local
54/// files or standard streams. It is **not recommended** for services requiring
55/// strict sequential consistency (e.g., Loki or Aws).
56///
57/// ### Example
58/// ```rust
59/// # use std::fs::File;
60/// # use timber_rust::{LogLevel, Logger};
61/// # use timber_rust::Message;
62/// # use timber_rust::MessageFactory;
63/// # use timber_rust::QueuedLogger;
64/// # use timber_rust::service::StandardCoutWrite;
65/// let service = StandardCoutWrite::new();
66/// let logger = QueuedLogger::new(service, 3, 4); // 3 retries, 4 worker threads
67/// let logger = Logger::new(logger);
68/// logger.log((LogLevel::Info,"System started"));
69/// ```
70pub struct Queued {
71 /// The underlying service used to perform the actual logging/work.
72 service: Arc<dyn Service + Send + Sync>,
73 /// Default number of retries for new messages.
74 max_retries: usize,
75 /// Handles to the background worker threads.
76 workers: Vec<JoinHandle<()>>,
77 /// The primary entry point for the queue. Wrapped in an `Option` to allow
78 /// the `Drop` implementation to signal shutdown by destroying the sender.
79 sender: Option<Sender<PerishableMessage>>,
80}
81
82impl Queued {
83 /// Creates a new [`QueuedLogger`][`Queued`] and initializes the worker pool.
84 ///
85 /// # Arguments
86 /// * `service` - The logging service implementation.
87 /// * `max_retries` - How many times a failed log should be re-queued.
88 /// * `worker_count` - The number of background threads to spawn.
89 pub fn new(
90 service: Box<dyn Service + Send + Sync>,
91 max_retries: usize,
92 worker_count: usize,
93 ) -> Box<Self> {
94 let service: Arc<dyn Service + Send + Sync> = Arc::from(service);
95 let (sender, receiver) = crossbeam::channel::unbounded::<PerishableMessage>();
96 let mut workers = Vec::with_capacity(worker_count);
97
98 for _ in 0..worker_count {
99 let worker_receiver = receiver.clone();
100 let worker_service = service.clone();
101
102 workers.push(thread::spawn(move || {
103 while let Ok( mut message) = worker_receiver.recv() {
104
105 // Attempt to log
106 let result = worker_service.work(&message.message);
107 if result.is_ok() {
108 continue;
109 }
110
111 // Treat error
112 let error = result.unwrap_err();
113 // Discard if lives reaches 0
114 if message.lives == 0 {
115 worker_service.fallback(&error, &message.message);
116 continue;
117 }
118
119 // Retry
120 message.lives -= 1;
121 let sender = message.sender.clone();
122 let result = sender.send(message);
123 if let Err(err) = result {
124 worker_service.fallback(&error, &err.0.message);
125 }
126 }
127 }));
128 }
129
130 Box::new(Queued {
131 service,
132 max_retries,
133 workers,
134 sender: Some(sender),
135 })
136 }
137
138 /// Returns the base service
139 pub fn get_service(&self) -> &dyn Service {
140 self.service.as_ref()
141 }
142
143 /// Consumes the logger and returns the underlying service.
144 ///
145 /// This method will **block** the current thread until all background workers
146 /// have finished processing the current queue and all retries have been
147 /// exhausted. This ensures that the returned service is in a clean,
148 /// idle state.
149 pub fn take_service(self) -> Arc<dyn Service + Send + Sync> {
150 // self is dropped here, triggering the Drop impl
151 // which joins all worker threads.
152 self.service.clone()
153 }
154}
155
156impl LoggerImpl for Queued {
157 /// Checks the health status of the underlying service.
158 fn status(&self) -> LoggerStatus {
159 self.service.status()
160 }
161
162 /// Dispatches a message to the background worker pool.
163 ///
164 /// Each message is bundled with a clone of the sender to enable
165 /// self-contained retry logic within the worker threads.
166 fn log(&self, message: Message) {
167 // Safe to expect: Borrow checker prevents log() during drop().
168 let sender = self
169 .sender
170 .as_ref()
171 .expect("AsyncLogger integrity violation: log() called after drop() initialization.");
172
173 let _ = sender.send(PerishableMessage {
174 message,
175 lives: self.max_retries,
176 sender: sender.clone(),
177 });
178 }
179
180 /// Downcasts the logger to `Any` for reflection-like capabilities.
181 fn as_any(&self) -> &dyn Any {
182 self
183 }
184}
185
186impl Drop for Queued {
187 /// Orchestrates a graceful shutdown of the logger.
188 ///
189 /// 1. The primary [`Sender`] is dropped (set to `None`).
190 /// 2. This does not immediately close the channel if messages are in the queue,
191 /// as those messages carry their own [`Sender`] clones.
192 /// 3. Once all messages are processed/exhausted, the last [`Sender`] dies.
193 /// 4. `worker_receiver.recv()` returns `Err`, and threads exit.
194 /// 5. [`join()`][std::thread::JoinHandle::join()] ensures all threads have finished before the program proceeds.
195 fn drop(&mut self) {
196 // Pull the plug on the primary entry point.
197 self.sender = None;
198
199 // Wait for workers to drain the existing queue and self-terminate.
200 for worker in self.workers.drain(..) {
201 let _ = worker.join();
202 }
203 }
204}