timber_rust/logger/
cloudwatch.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 Dante Doménech Martinez dante19031999@gmail.com
3
4#![cfg(feature = "aws")]
5#![cfg_attr(docsrs, doc(cfg(feature = "aws")))]
6
7use crate::service::CloudWatch as CloudWatchService;
8use crate::service::aws::{MessageFormatter, SimpleCloudWatch};
9use crate::service::{CloudWatchConfig, CloudWatchMessage, ServiceError};
10use crate::{LoggerImpl, LoggerStatus, Message};
11use std::any::Any;
12use std::sync::Arc;
13use std::sync::mpsc::Sender;
14use std::thread;
15use std::thread::JoinHandle;
16use std::time::{SystemTime, UNIX_EPOCH};
17
18/// A high-level, thread-safe logger implementation for AWS CloudWatch.
19///
20/// This structure acts as a bridge between the application's logging calls and the
21/// asynchronous AWS SDK. It implements a **producer-consumer pattern** using a
22/// dedicated background worker thread to ensure that logging operations do not
23/// block the main application execution.
24///
25/// # Architecture
26/// 1. **Producer ([`log`][Self::log])**: Receives [`Message`] objects, timestamps them,
27///    and pushes them into an internal MPSC (Multi-Producer, Single-Consumer) channel.
28/// 2. **Consumer (`worker`)**: A dedicated background thread that
29///    drains the channel using a **greedy-drain** strategy, batching logs to
30///    optimize network throughput to AWS.
31/// 3. **Service Layer**: Handles the actual communication with the CloudWatch API
32///    via the internal Tokio runtime.
33///
34/// # Performance & Thread Safety
35/// - **Non-blocking**: The `log` method is essentially non-blocking, as it only
36///   performs a channel send operation.
37/// - **Graceful Shutdown**: Implements [`Drop`] to ensure that the channel is closed
38///   and all pending logs are flushed to AWS before the thread joins and the
39///   application terminates.
40///
41/// # Alternative: CloudWatch via Standard Output (Cout)
42///
43/// While this [`CloudWatch`] implementation provides a robust, SDK-based integration
44/// with the AWS API, many modern AWS environments are optimized for "Log Driver"
45/// ingestion.
46///
47/// If your application runs in **AWS Lambda**, **ECS** (with the `awslogs` driver),
48/// or **Fargate**, you may prefer using the [`CloudWatchCout`][`crate::service::CloudWatchCout`] service.
49///
50/// ### Why use the `Cout` Alternative?
51/// * **Performance**: Writing to `stdout` is significantly faster than performing
52///   HTTPS requests, even with background workers. It avoids the CPU and memory
53///   overhead of the AWS SDK and TLS stack.
54/// * **Security**: You do not need to provide AWS IAM credentials (like Access Keys)
55///   to your application code. The execution environment (e.g., the Lambda Role)
56///   automatically handles the permissions for the captured stream.
57/// * **Resilience**: If the network is unstable, logs are buffered by the container
58///   runtime or orchestrator rather than occupying your application's heap memory.
59///
60/// ### Comparison
61/// | Feature          | SDK Logger ([`CloudWatch`][Self]) | Stdout Logger ([`CloudWatchCout`][crate::service::aws::CloudWatchCout]) |
62/// | :--------------- | :------------------------------- | :-------------------------------------------------------------------- |
63/// | **Transport** | HTTPS (AWS SDK)                  | Standard Output (`stdout`)                                            |
64/// | **Binary Size** | Larger (SDK + TLS)               | Minimal (No AWS dependencies)                                         |
65/// | **IAM Config** | Handled in-app                   | Handled by Execution Environment                                      |
66/// | **Best For** | On-premise, EC2, Legacy          | Lambda, ECS, Fargate, Kubernetes                                      |
67///
68/// ### Usage
69/// You can quickly initialize the alternative via the factory:
70/// ```rust
71/// use timber_rust::LoggerFactory;
72/// use timber_rust::Concurrency;
73///
74/// // Direct (Sync) for Lambda
75/// let logger = LoggerFactory::cloudwatch().cout().build(Concurrency::Sync);
76///
77/// // Queued (Async) for ECS/Fargate
78/// let logger = LoggerFactory::cloudwatch().cout().build(Concurrency::Async);
79/// ```
80///
81/// See also: [`CloudWatchCoutMessageFormatter`][crate::service::CloudWatchCoutMessageFormatter]
82/// for the JSON schema used by the alternative.
83///
84/// # Example
85/// ```rust
86/// use timber_rust::Logger;
87/// use timber_rust::logger::CloudWatch;
88/// use timber_rust::service::aws::Config;
89///
90/// let config = Config::new("access", "secret", "my-group", "us-east-1");
91/// let logger = CloudWatch::new(config);
92/// let logger = Logger::new(logger);
93/// ```
94pub struct CloudWatch {
95    /// Handle to the background worker thread. Taken during [`Drop`].
96    worker: Option<JoinHandle<()>>,
97    /// Sending end of the log pipeline. Dropped during [`Drop`] to signal shutdown.
98    sender: Option<Sender<CloudWatchMessage>>,
99    /// Shared reference to the underlying CloudWatch service provider.
100    service: Arc<dyn CloudWatchService + Send + Sync>,
101}
102
103impl CloudWatch {
104    pub fn new(config: CloudWatchConfig) -> Box<CloudWatch> {
105        Self::with_service(SimpleCloudWatch::new(config))
106    }
107
108    pub fn new_formatted<F>(config: CloudWatchConfig, formatter: F) -> Box<CloudWatch>
109    where
110        F: MessageFormatter + Send + Sync + 'static,
111    {
112        Self::with_service(SimpleCloudWatch::new_formatted(config, formatter))
113    }
114
115    pub fn from_env<S>(log_group: S) -> Box<CloudWatch>
116    where
117        S: Into<String>,
118    {
119        Self::with_service(SimpleCloudWatch::from_env(log_group.into()))
120    }
121
122    pub fn from_env_formatted<S, F>(log_group: S, formatter: F) -> Box<CloudWatch>
123    where
124        S: Into<String>,
125        F: MessageFormatter + Send + Sync + 'static,
126    {
127        Self::with_service(SimpleCloudWatch::from_env_formatted(
128            log_group.into(),
129            formatter,
130        ))
131    }
132
133    pub fn with_service(service: Box<dyn CloudWatchService + Send + Sync>) -> Box<CloudWatch> {
134        let service: Arc<dyn CloudWatchService + Send + Sync> = Arc::from(service);
135        let work_service = service.clone();
136        let (sender, receiver) = std::sync::mpsc::channel::<CloudWatchMessage>();
137
138        Box::new(Self {
139            worker: Some(thread::spawn(move || work_service.work(receiver))),
140            sender: Some(sender),
141            service,
142        })
143    }
144}
145
146impl LoggerImpl for CloudWatch {
147    /// Returns the current operational status of the Loki service.
148    ///
149    /// This method performs a live health check by hitting the `/loki/status` endpoint.
150    /// It uses a functional pipeline to transform the network result into a [`LoggerStatus`].
151    ///
152    /// ### Performance Note:
153    /// This call is **blocking**. If the network is slow or the Loki server is hanging,
154    /// this method will block the calling thread until the default timeout is reached.
155    ///
156    /// # Returns
157    /// * [`LoggerStatus::Running`] - Server is reachable and returned a success code.
158    /// * [`LoggerStatus::Broken`] - Any failure occurred (DNS, Connection Refused, 404, 500, etc.).
159    fn status(&self) -> LoggerStatus {
160        self.service.status()
161    }
162
163    /// Enqueues a [`Message`] for asynchronous processing and delivery.
164    ///
165    /// This is the primary entry point for recording logs. It performs two critical tasks:
166    /// 1. **Timestamping**: Uses a timestamp generated at the log moment.
167    /// 2. **Asynchronous Dispatch**: Sends the message through an MPSC channel to the
168    ///    background worker.
169    ///
170    /// ### Thread Safety & Performance
171    /// This method is non-blocking (except for a very brief lock on the highwater
172    /// timestamp). If the background worker is overloaded or the channel is
173    /// disconnected, it triggers the [`Fallback`][`crate::service::Fallback`] immediately to avoid data loss.
174    ///
175    /// # Parameters
176    /// - `message`: The log entry containing level, target, and content.
177    fn log(&self, message: Message) {
178        // Timestamp
179        let timestamp = SystemTime::now()
180            .duration_since(UNIX_EPOCH)
181            .expect("Time went backwards")
182            .as_millis() as i64;
183
184        // Add to queue
185        if let Some(sender) = &self.sender {
186            if let Err(error) = sender.send(CloudWatchMessage { message, timestamp }) {
187                let message = error.0;
188                self.service
189                    .fallback(&ServiceError::LockPoisoned, &message.message);
190            }
191        } else {
192            self.service.fallback(&ServiceError::LockPoisoned, &message);
193        }
194    }
195
196    /// Returns a reference to the underlying type as [Any] for downcasting.
197    fn as_any(&self) -> &dyn Any {
198        self
199    }
200}
201
202/// Ensures a graceful shutdown of the logging pipeline.
203///
204/// When the [`CloudwatchLogger`][`CloudWatch`] goes out of scope, the following sequence occurs:
205/// 1. **Channel Closure**: The `sender` is dropped (`None`). This signals the background
206///    worker that no more messages will be sent.
207/// 2. **Worker Drain**: The worker's `receiver.recv()` will return an error once
208///    the channel is empty, allowing its loop to terminate naturally.
209/// 3. **Thread Join**: The main thread blocks until the worker thread has finished
210///    processing and sending the final batch of logs.
211///
212/// This mechanism prevents data loss during application shutdown or restarts.
213impl Drop for CloudWatch {
214    fn drop(&mut self) {
215        // Drop the sender first to close the MPSC channel
216        self.sender = None;
217
218        // Wait for the worker thread to finish its last batch
219        if let Some(worker) = self.worker.take() {
220            let _ = worker.join();
221        }
222    }
223}