timber_rust/service/aws/service.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 Dante Doménech Martinez dante19031999@gmail.com
3
4#![cfg(feature = "aws")]
5#![cfg_attr(docsrs, doc(cfg(feature = "aws")))]
6
7use crate::logger::Status;
8use crate::service::aws::{Config, Data, StandardMessageFormatter, MessageFormatter};
9use crate::service::{CloudWatchMessage, ServiceError};
10use crate::{Fallback, Message};
11use aws_config::{BehaviorVersion, Region};
12use aws_sdk_cloudwatchlogs::types::InputLogEvent;
13use chrono::{SecondsFormat, Utc};
14use std::sync::mpsc::Receiver;
15use std::sync::Arc;
16use tokio::runtime::Builder;
17
18/// Trait defining the behavior for CloudWatch log providers.
19///
20/// This abstraction allows swapping log service providers (e.g., for testing or
21/// migrating to other cloud providers) without changing the core application logic.
22///
23/// It implements `Sync + Send` to ensure the service can be safely shared across
24/// multiple threads in a concurrent environment.
25pub trait CloudWatch: Fallback + Sync + Send {
26 fn status(&self) -> Status;
27
28 fn work(&self, receiver: Receiver<CloudWatchMessage>);
29}
30
31/// Standard implementation of the CloudWatch service using the AWS SDK.
32///
33/// # Design Considerations
34/// AWS SDK for Rust is natively asynchronous. This implementation wraps the
35/// asynchronous client and manages an internal **Tokio [`Runtime`][`tokio::runtime::Runtime`]** to provide
36/// a synchronous (blocking) API.
37///
38/// ### Performance Note
39/// Initializing this struct has a high overhead due to the creation of a
40/// multi-threaded Tokio [`Runtime`][`tokio::runtime::Runtime`]. It is recommended to initialize this service
41/// once and reuse the instance throughout the application's lifecycle.
42///
43/// ### Alternatives
44/// If high-performance logging is required without the overhead of an internal runtime, consider:
45/// - Logging to [`stdout`][`crate::service::CoutWrite`] and using the **CloudWatch Agent**.
46/// - Logging to `stdout` with a [`CloudWatchCout`][`crate::service::CloudWatchCout`] service and using the **CloudWatch Agent**.
47/// - Using **AWS Lambda** integrated logging.
48pub struct SimpleCloudWatch {
49 /// Service configuration
50 data: Arc<Data>,
51}
52
53impl SimpleCloudWatch {
54 /// Initializes a new `SimpleCloudWatch` service using the [`StandardMessageFormatter`].
55 ///
56 /// This is a convenience wrapper around [`Self::new_formatted`]. It creates a
57 /// multi-threaded Tokio runtime to manage the asynchronous AWS SDK
58 /// synchronously under the hood.
59 ///
60 /// # Arguments
61 /// * `config` - A [`Config`] object containing AWS credentials and log group.
62 ///
63 /// # Panics
64 /// Panics if the Tokio [`Runtime`][`tokio::runtime::Runtime`] fails to initialize or the AWS region is invalid.
65 ///
66 /// # Example
67 /// ```rust
68 /// # use timber_rust::service::aws::Config;
69 /// # use timber_rust::service::SimpleCloudWatch;
70 /// let config = Config::new("access_key", "secret", "my-group", "us-east-1");
71 /// let service = SimpleCloudWatch::new(config);
72 /// ```
73 pub fn new(config: Config) -> Box<dyn CloudWatch + Send + Sync> {
74 Self::new_formatted(config, StandardMessageFormatter {})
75 }
76
77 /// Initializes a new `SimpleCloudWatch` service with a custom [`MessageFormatter`].
78 ///
79 /// Use this method if you need to customize how logs are structured (e.g., adding
80 /// extra fields, changing JSON keys, or using a non-JSON format).
81 ///
82 /// # Arguments
83 /// * `config` - AWS configuration and credentials.
84 /// * `formatter` - An implementation of [`MessageFormatter`].
85 ///
86 /// # Type Parameters
87 /// * `F` - The specific type of the formatter, which must be `Send + Sync + 'static`.
88 pub fn new_formatted<F>(config: Config, formatter: F) -> Box<dyn CloudWatch + Send + Sync>
89 where
90 F: MessageFormatter + Send + Sync + 'static,
91 {
92 let rt = Builder::new_multi_thread()
93 .enable_all()
94 .build()
95 .expect("Failed to create Tokio runtime");
96
97 let access_key_id = config.get_access_key_id().to_string();
98 let access_key_secret = config.get_access_key_secret().to_string();
99 let session_token = config.get_session_token().map(|t| t.to_string());
100 let expires_in = config.get_expires_in();
101 let provider = config.get_provider();
102 let region = config.get_region().to_string();
103
104 let client = rt.block_on(async {
105 let creds = aws_sdk_cloudwatchlogs::config::Credentials::new(
106 access_key_id,
107 access_key_secret,
108 session_token,
109 expires_in,
110 provider,
111 );
112
113 let sdk_config = aws_config::defaults(BehaviorVersion::latest())
114 .region(Region::new(region))
115 .credentials_provider(creds)
116 .load()
117 .await; // Need tokio to fix this
118
119 aws_sdk_cloudwatchlogs::Client::new(&sdk_config)
120 });
121
122 Box::new(SimpleCloudWatch {
123 data: Arc::new(Data {
124 client,
125 rt,
126 log_group: config.get_log_group().to_string(),
127 formatter: Box::new(formatter),
128 }),
129 })
130 }
131
132 /// Initializes the service using AWS standard environment variables.
133 ///
134 /// It looks for `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, and `AWS_REGION`.
135 ///
136 /// # Arguments
137 /// * `log_group` - The name of the CloudWatch Log Group to use.
138 ///
139 /// # Panics
140 /// Panics if the internal Tokio [`Runtime`][`tokio::runtime::Runtime`] fails to initialize.
141 ///
142 /// # Example
143 /// ```rust
144 /// // Assumes AWS_REGION and credentials are set in the environment
145 /// # use timber_rust::service::SimpleCloudWatch;
146 /// let cw_service = SimpleCloudWatch::from_env("my-app-logs".to_string());
147 /// ```
148 pub fn from_env(log_group: String) -> Box<dyn CloudWatch + Send + Sync> {
149 Self::from_env_formatted(log_group, StandardMessageFormatter {})
150 }
151
152 pub fn from_env_formatted<F>(
153 log_group: String,
154 formatter: F,
155 ) -> Box<dyn CloudWatch + Send + Sync>
156 where
157 F: MessageFormatter + Send + Sync + 'static,
158 {
159 let rt = Builder::new_multi_thread()
160 .enable_all()
161 .build()
162 .expect("Failed to create Tokio runtime");
163
164 let client = rt.block_on(async {
165 let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
166 aws_sdk_cloudwatchlogs::Client::new(&sdk_config)
167 });
168
169 Box::new(SimpleCloudWatch {
170 data: Arc::new(Data {
171 client,
172 rt,
173 log_group,
174 formatter: Box::new(formatter),
175 }),
176 })
177 }
178
179 /// Returns the configured Log Group name.
180 pub fn log_group(&self) -> &str {
181 &self.data.log_group
182 }
183
184 /// Returns the internal data
185 pub fn get_data(&self) -> Arc<Data> {
186 self.data.clone()
187 }
188}
189
190impl CloudWatch for SimpleCloudWatch {
191 /// Checks if the service is operational.
192 ///
193 /// This performs a synchronous check against AWS to verify:
194 /// 1. Connectivity and Credentials.
195 /// 2. Existence of the target Log Group.
196 fn status(&self) -> Status {
197 let result = self.data.rt.block_on(async {
198 self.data
199 .client
200 .describe_log_groups()
201 .log_group_name_prefix(&self.data.log_group)
202 .send()
203 .await
204 });
205
206 match result {
207 Ok(output) => {
208 // Check if our specific log group is in the returned list
209 let exists = output
210 .log_groups()
211 .iter()
212 .any(|g| g.log_group_name() == Some(&self.data.log_group));
213
214 if exists {
215 Status::Running
216 } else {
217 // If the group doesn't exist, PutLogEvents will fail.
218 Status::Broken
219 }
220 }
221 Err(_) => {
222 // Network error, 403 Forbidden, or expired tokens.
223 Status::Broken
224 }
225 }
226 }
227
228 /// This method runs in a dedicated thread and implements a **greedy-drain** /// batching strategy to optimize network I/O:
229 ///
230 /// 1. **Idle Efficiency**: It uses a blocking `receiver.recv()` to wait for the
231 /// first message, ensuring the thread consumes zero CPU cycles when there is
232 /// no logging activity.
233 /// 2. **Batch Formation**: Once the first message is received, it performs
234 /// non-blocking `try_recv()` calls to "drain" all currently pending messages
235 /// in the channel into a single batch.
236 /// 3. **Synchronous Bridge**: It utilizes the internal Tokio [`Runtime`][`tokio::runtime::Runtime`] via
237 /// `block_on` to execute the asynchronous AWS SDK `PutLogEvents` call
238 /// synchronously within the worker thread.
239 ///
240 /// # Thread Safety
241 /// This loop is designed to run indefinitely until the `receiver` is disconnected
242 /// (usually when the application starts its shutdown sequence).
243 fn work(&self, receiver: Receiver<CloudWatchMessage>) {
244 let mut messages = Vec::with_capacity(128);
245
246 // Get first message
247 while let Ok(message) = receiver.recv() {
248 // Clear buffer
249 messages.clear();
250
251 // Deal with message
252 messages.push(message);
253
254 // Get batch
255 while let Ok(message) = receiver.try_recv() {
256 messages.push(message);
257 }
258
259 let data = &self.data;
260 data.rt.block_on(async {
261 let mut log_events = Vec::with_capacity(messages.len());
262
263 for msg in &messages {
264 let event = InputLogEvent::builder()
265 .message(data.formatter.format(&msg.message))
266 .timestamp(msg.timestamp) // El i64 que definimos antes
267 .build()
268 .expect("Failed to build log event");
269 log_events.push(event);
270 }
271
272 let _ = data
273 .client
274 .put_log_events()
275 .log_group_name(&data.log_group)
276 .log_stream_name("my-stream-name") // ¡OJO! Esto debe existir
277 .set_log_events(Some(log_events))
278 .send()
279 .await;
280 });
281 } // Receive message
282 }
283}
284
285impl Fallback for SimpleCloudWatch {
286 fn fallback(&self, _error: &ServiceError, msg: &Message) {
287 let now: chrono::DateTime<Utc> = msg.instant().into();
288 let now = now.to_rfc3339_opts(SecondsFormat::Nanos, true);
289 println!("{} [{}] | {}", now, msg.level(), msg.content());
290 }
291}