timber_rust/service/aws/
service.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 Dante Doménech Martinez dante19031999@gmail.com
3
4#![cfg(feature = "aws")]
5#![cfg_attr(docsrs, doc(cfg(feature = "aws")))]
6
7use crate::logger::Status;
8use crate::service::aws::{Config, Data, StandardMessageFormatter, MessageFormatter};
9use crate::service::{CloudWatchMessage, ServiceError};
10use crate::{Fallback, Message};
11use aws_config::{BehaviorVersion, Region};
12use aws_sdk_cloudwatchlogs::types::InputLogEvent;
13use chrono::{SecondsFormat, Utc};
14use std::sync::mpsc::Receiver;
15use std::sync::Arc;
16use tokio::runtime::Builder;
17
18/// Trait defining the behavior for CloudWatch log providers.
19///
20/// This abstraction allows swapping log service providers (e.g., for testing or
21/// migrating to other cloud providers) without changing the core application logic.
22///
23/// It implements `Sync + Send` to ensure the service can be safely shared across
24/// multiple threads in a concurrent environment.
25pub trait CloudWatch: Fallback + Sync + Send {
26    fn status(&self) -> Status;
27
28    fn work(&self, receiver: Receiver<CloudWatchMessage>);
29}
30
31/// Standard implementation of the CloudWatch service using the AWS SDK.
32///
33/// # Design Considerations
34/// AWS SDK for Rust is natively asynchronous. This implementation wraps the
35/// asynchronous client and manages an internal **Tokio [`Runtime`][`tokio::runtime::Runtime`]** to provide
36/// a synchronous (blocking) API.
37///
38/// ### Performance Note
39/// Initializing this struct has a high overhead due to the creation of a
40/// multi-threaded Tokio [`Runtime`][`tokio::runtime::Runtime`]. It is recommended to initialize this service
41/// once and reuse the instance throughout the application's lifecycle.
42///
43/// ### Alternatives
44/// If high-performance logging is required without the overhead of an internal runtime, consider:
45/// - Logging to [`stdout`][`crate::service::CoutWrite`] and using the **CloudWatch Agent**.
46/// - Logging to `stdout` with a [`CloudWatchCout`][`crate::service::CloudWatchCout`] service and using the **CloudWatch Agent**.
47/// - Using **AWS Lambda** integrated logging.
48pub struct SimpleCloudWatch {
49    /// Service configuration
50    data: Arc<Data>,
51}
52
53impl SimpleCloudWatch {
54    /// Initializes a new `SimpleCloudWatch` service using the [`StandardMessageFormatter`].
55    ///
56    /// This is a convenience wrapper around [`Self::new_formatted`]. It creates a
57    /// multi-threaded Tokio runtime to manage the asynchronous AWS SDK
58    /// synchronously under the hood.
59    ///
60    /// # Arguments
61    /// * `config` - A [`Config`] object containing AWS credentials and log group.
62    ///
63    /// # Panics
64    /// Panics if the Tokio [`Runtime`][`tokio::runtime::Runtime`] fails to initialize or the AWS region is invalid.
65    ///
66    /// # Example
67    /// ```rust
68    /// # use timber_rust::service::aws::Config;
69    /// # use timber_rust::service::SimpleCloudWatch;
70    /// let config = Config::new("access_key", "secret", "my-group", "us-east-1");
71    /// let service = SimpleCloudWatch::new(config);
72    /// ```
73    pub fn new(config: Config) -> Box<dyn CloudWatch + Send + Sync> {
74        Self::new_formatted(config, StandardMessageFormatter {})
75    }
76
77    /// Initializes a new `SimpleCloudWatch` service with a custom [`MessageFormatter`].
78    ///
79    /// Use this method if you need to customize how logs are structured (e.g., adding
80    /// extra fields, changing JSON keys, or using a non-JSON format).
81    ///
82    /// # Arguments
83    /// * `config` - AWS configuration and credentials.
84    /// * `formatter` - An implementation of [`MessageFormatter`].
85    ///
86    /// # Type Parameters
87    /// * `F` - The specific type of the formatter, which must be `Send + Sync + 'static`.
88    pub fn new_formatted<F>(config: Config, formatter: F) -> Box<dyn CloudWatch + Send + Sync>
89    where
90        F: MessageFormatter + Send + Sync + 'static,
91    {
92        let rt = Builder::new_multi_thread()
93            .enable_all()
94            .build()
95            .expect("Failed to create Tokio runtime");
96
97        let access_key_id = config.get_access_key_id().to_string();
98        let access_key_secret = config.get_access_key_secret().to_string();
99        let session_token = config.get_session_token().map(|t| t.to_string());
100        let expires_in = config.get_expires_in();
101        let provider = config.get_provider();
102        let region = config.get_region().to_string();
103
104        let client = rt.block_on(async {
105            let creds = aws_sdk_cloudwatchlogs::config::Credentials::new(
106                access_key_id,
107                access_key_secret,
108                session_token,
109                expires_in,
110                provider,
111            );
112
113            let sdk_config = aws_config::defaults(BehaviorVersion::latest())
114                .region(Region::new(region))
115                .credentials_provider(creds)
116                .load()
117                .await; // Need tokio to fix this
118
119            aws_sdk_cloudwatchlogs::Client::new(&sdk_config)
120        });
121
122        Box::new(SimpleCloudWatch {
123            data: Arc::new(Data {
124                client,
125                rt,
126                log_group: config.get_log_group().to_string(),
127                formatter: Box::new(formatter),
128            }),
129        })
130    }
131
132    /// Initializes the service using AWS standard environment variables.
133    ///
134    /// It looks for `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, and `AWS_REGION`.
135    ///
136    /// # Arguments
137    /// * `log_group` - The name of the CloudWatch Log Group to use.
138    ///
139    /// # Panics
140    /// Panics if the internal Tokio [`Runtime`][`tokio::runtime::Runtime`] fails to initialize.
141    ///
142    /// # Example
143    /// ```rust
144    /// // Assumes AWS_REGION and credentials are set in the environment
145    /// # use timber_rust::service::SimpleCloudWatch;
146    /// let cw_service = SimpleCloudWatch::from_env("my-app-logs".to_string());
147    /// ```
148    pub fn from_env(log_group: String) -> Box<dyn CloudWatch + Send + Sync> {
149        Self::from_env_formatted(log_group, StandardMessageFormatter {})
150    }
151
152    pub fn from_env_formatted<F>(
153        log_group: String,
154        formatter: F,
155    ) -> Box<dyn CloudWatch + Send + Sync>
156    where
157        F: MessageFormatter + Send + Sync + 'static,
158    {
159        let rt = Builder::new_multi_thread()
160            .enable_all()
161            .build()
162            .expect("Failed to create Tokio runtime");
163
164        let client = rt.block_on(async {
165            let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
166            aws_sdk_cloudwatchlogs::Client::new(&sdk_config)
167        });
168
169        Box::new(SimpleCloudWatch {
170            data: Arc::new(Data {
171                client,
172                rt,
173                log_group,
174                formatter: Box::new(formatter),
175            }),
176        })
177    }
178
179    /// Returns the configured Log Group name.
180    pub fn log_group(&self) -> &str {
181        &self.data.log_group
182    }
183
184    /// Returns the internal data
185    pub fn get_data(&self) -> Arc<Data> {
186        self.data.clone()
187    }
188}
189
190impl CloudWatch for SimpleCloudWatch {
191    /// Checks if the service is operational.
192    ///
193    /// This performs a synchronous check against AWS to verify:
194    /// 1. Connectivity and Credentials.
195    /// 2. Existence of the target Log Group.
196    fn status(&self) -> Status {
197        let result = self.data.rt.block_on(async {
198            self.data
199                .client
200                .describe_log_groups()
201                .log_group_name_prefix(&self.data.log_group)
202                .send()
203                .await
204        });
205
206        match result {
207            Ok(output) => {
208                // Check if our specific log group is in the returned list
209                let exists = output
210                    .log_groups()
211                    .iter()
212                    .any(|g| g.log_group_name() == Some(&self.data.log_group));
213
214                if exists {
215                    Status::Running
216                } else {
217                    // If the group doesn't exist, PutLogEvents will fail.
218                    Status::Broken
219                }
220            }
221            Err(_) => {
222                // Network error, 403 Forbidden, or expired tokens.
223                Status::Broken
224            }
225        }
226    }
227
228    /// This method runs in a dedicated thread and implements a **greedy-drain** /// batching strategy to optimize network I/O:
229    ///
230    /// 1. **Idle Efficiency**: It uses a blocking `receiver.recv()` to wait for the
231    ///    first message, ensuring the thread consumes zero CPU cycles when there is
232    ///    no logging activity.
233    /// 2. **Batch Formation**: Once the first message is received, it performs
234    ///    non-blocking `try_recv()` calls to "drain" all currently pending messages
235    ///    in the channel into a single batch.
236    /// 3. **Synchronous Bridge**: It utilizes the internal Tokio [`Runtime`][`tokio::runtime::Runtime`] via
237    ///    `block_on` to execute the asynchronous AWS SDK `PutLogEvents` call
238    ///    synchronously within the worker thread.
239    ///
240    /// # Thread Safety
241    /// This loop is designed to run indefinitely until the `receiver` is disconnected
242    /// (usually when the application starts its shutdown sequence).
243    fn work(&self, receiver: Receiver<CloudWatchMessage>) {
244        let mut messages = Vec::with_capacity(128);
245
246        // Get first message
247        while let Ok(message) = receiver.recv() {
248            // Clear buffer
249            messages.clear();
250
251            // Deal with message
252            messages.push(message);
253
254            // Get batch
255            while let Ok(message) = receiver.try_recv() {
256                messages.push(message);
257            }
258
259            let data = &self.data;
260            data.rt.block_on(async {
261                let mut log_events = Vec::with_capacity(messages.len());
262
263                for msg in &messages {
264                    let event = InputLogEvent::builder()
265                        .message(data.formatter.format(&msg.message))
266                        .timestamp(msg.timestamp) // El i64 que definimos antes
267                        .build()
268                        .expect("Failed to build log event");
269                    log_events.push(event);
270                }
271
272                let _ = data
273                    .client
274                    .put_log_events()
275                    .log_group_name(&data.log_group)
276                    .log_stream_name("my-stream-name") // ¡OJO! Esto debe existir
277                    .set_log_events(Some(log_events))
278                    .send()
279                    .await;
280            });
281        } // Receive message
282    }
283}
284
285impl Fallback for SimpleCloudWatch {
286    fn fallback(&self, _error: &ServiceError, msg: &Message) {
287        let now: chrono::DateTime<Utc> = msg.instant().into();
288        let now = now.to_rfc3339_opts(SecondsFormat::Nanos, true);
289        println!("{} [{}] | {}", now, msg.level(), msg.content());
290    }
291}