1use std::collections::{HashSet, VecDeque};
2use std::sync::{Arc, Mutex};
3use std::time::{Duration, SystemTime, UNIX_EPOCH};
4
5use aws_config::{BehaviorVersion, Region};
6use aws_sdk_s3::primitives::ByteStream;
7use tokio::runtime::Runtime;
8
9use crate::{Adapter, AdapterError, EgressAdapter, IngressAdapter};
10
11pub(crate) struct S3IngressObject {
12 pub key: String,
13 pub payload: Vec<u8>,
14 pub size_bytes: usize,
15 pub e_tag: Option<String>,
16 pub pending_after_read: usize,
17}
18
19pub(crate) struct S3PutResult {
20 pub e_tag: Option<String>,
21}
22
23#[derive(Debug)]
24struct S3IngressStatus {
25 transfer_state: String,
26 last_object_key: Option<String>,
27 last_object_size_bytes: Option<u64>,
28 last_object_etag: Option<String>,
29 pending_objects: u64,
30}
31
32impl Default for S3IngressStatus {
33 fn default() -> Self {
34 Self {
35 transfer_state: "idle".to_string(),
36 last_object_key: None,
37 last_object_size_bytes: None,
38 last_object_etag: None,
39 pending_objects: 0,
40 }
41 }
42}
43
44impl S3IngressStatus {
45 fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
46 let mut details = serde_json::Map::new();
47 details.insert(
48 "transfer_state".to_string(),
49 serde_json::Value::String(self.transfer_state.clone()),
50 );
51 details.insert(
52 "pending_objects".to_string(),
53 serde_json::Value::from(self.pending_objects),
54 );
55 if let Some(key) = &self.last_object_key {
56 details.insert(
57 "last_object_key".to_string(),
58 serde_json::Value::String(key.clone()),
59 );
60 }
61 if let Some(size_bytes) = self.last_object_size_bytes {
62 details.insert(
63 "last_object_size_bytes".to_string(),
64 serde_json::Value::from(size_bytes),
65 );
66 }
67 if let Some(e_tag) = &self.last_object_etag {
68 details.insert(
69 "last_object_etag".to_string(),
70 serde_json::Value::String(e_tag.clone()),
71 );
72 }
73 details
74 }
75}
76
77#[derive(Debug)]
78struct S3EgressStatus {
79 transfer_state: String,
80 last_object_key: Option<String>,
81 last_object_size_bytes: Option<u64>,
82 last_object_etag: Option<String>,
83}
84
85impl Default for S3EgressStatus {
86 fn default() -> Self {
87 Self {
88 transfer_state: "idle".to_string(),
89 last_object_key: None,
90 last_object_size_bytes: None,
91 last_object_etag: None,
92 }
93 }
94}
95
96impl S3EgressStatus {
97 fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
98 let mut details = serde_json::Map::new();
99 details.insert(
100 "transfer_state".to_string(),
101 serde_json::Value::String(self.transfer_state.clone()),
102 );
103 if let Some(key) = &self.last_object_key {
104 details.insert(
105 "last_object_key".to_string(),
106 serde_json::Value::String(key.clone()),
107 );
108 }
109 if let Some(size_bytes) = self.last_object_size_bytes {
110 details.insert(
111 "last_object_size_bytes".to_string(),
112 serde_json::Value::from(size_bytes),
113 );
114 }
115 if let Some(e_tag) = &self.last_object_etag {
116 details.insert(
117 "last_object_etag".to_string(),
118 serde_json::Value::String(e_tag.clone()),
119 );
120 }
121 details
122 }
123}
124
125pub(crate) trait S3IngressClient: Send {
127 fn read_next(&mut self, timeout: Duration) -> Result<Option<S3IngressObject>, AdapterError>;
128}
129
130pub(crate) trait S3EgressClient: Send {
132 fn put_object(
133 &mut self,
134 key: &str,
135 payload: &[u8],
136 content_type: Option<&str>,
137 ) -> Result<S3PutResult, AdapterError>;
138}
139
140struct AsyncS3IngressClient {
141 runtime: Runtime,
142 client: aws_sdk_s3::Client,
143 bucket: String,
144 prefix: Option<String>,
145 max_object_bytes: usize,
146 seen_keys: HashSet<String>,
147 pending: VecDeque<S3IngressObject>,
148}
149
150impl AsyncS3IngressClient {
151 fn connect(
152 region: &str,
153 endpoint: Option<&str>,
154 bucket: &str,
155 prefix: Option<&str>,
156 max_object_bytes: usize,
157 ) -> Result<Self, AdapterError> {
158 let runtime = tokio::runtime::Builder::new_current_thread()
159 .enable_all()
160 .build()
161 .map_err(|err| AdapterError::S3 {
162 op: "s3_runtime_build",
163 detail: err.to_string(),
164 })?;
165 let client = runtime.block_on(async {
166 let shared_config = aws_config::defaults(BehaviorVersion::latest())
167 .region(Region::new(region.to_string()))
168 .load()
169 .await;
170 let mut builder = aws_sdk_s3::config::Builder::from(&shared_config);
171 if let Some(endpoint) = endpoint {
172 builder = builder.endpoint_url(endpoint).force_path_style(true);
173 }
174 Ok::<_, AdapterError>(aws_sdk_s3::Client::from_conf(builder.build()))
175 })?;
176
177 Ok(Self {
178 runtime,
179 client,
180 bucket: bucket.to_string(),
181 prefix: normalize_prefix(prefix),
182 max_object_bytes,
183 seen_keys: HashSet::new(),
184 pending: VecDeque::new(),
185 })
186 }
187
188 async fn fetch_pending_objects(
189 client: aws_sdk_s3::Client,
190 bucket: String,
191 prefix: Option<String>,
192 max_object_bytes: usize,
193 seen_keys: &HashSet<String>,
194 ) -> Result<Vec<S3IngressObject>, AdapterError> {
195 let mut continuation_token = None;
196 let mut discovered = Vec::new();
197
198 loop {
199 let mut request = client.list_objects_v2().bucket(&bucket);
200 if let Some(prefix) = &prefix {
201 request = request.prefix(prefix);
202 }
203 if let Some(token) = continuation_token.take() {
204 request = request.continuation_token(token);
205 }
206
207 let response = request.send().await.map_err(|err| AdapterError::S3 {
208 op: "s3_list_objects_v2",
209 detail: err.to_string(),
210 })?;
211
212 for object in response.contents() {
213 let Some(key) = object.key() else {
214 continue;
215 };
216 if seen_keys.contains(key) {
217 continue;
218 }
219 if let Some(size) = object.size() {
220 if size < 0 {
221 return Err(AdapterError::S3 {
222 op: "s3_list_objects_v2",
223 detail: format!("object {key} returned a negative size"),
224 });
225 }
226 if size as usize > max_object_bytes {
227 return Err(AdapterError::S3 {
228 op: "s3_get_object",
229 detail: format!(
230 "object too large key={key} len={} max={max_object_bytes}",
231 size
232 ),
233 });
234 }
235 }
236 discovered.push(key.to_string());
237 }
238
239 if !response.is_truncated().unwrap_or(false) {
240 break;
241 }
242 continuation_token = response.next_continuation_token().map(ToOwned::to_owned);
243 if continuation_token.is_none() {
244 break;
245 }
246 }
247
248 discovered.sort();
249 let mut fetched = Vec::with_capacity(discovered.len());
250 for key in discovered {
251 let response = client
252 .get_object()
253 .bucket(&bucket)
254 .key(&key)
255 .send()
256 .await
257 .map_err(|err| AdapterError::S3 {
258 op: "s3_get_object",
259 detail: err.to_string(),
260 })?;
261 if let Some(content_length) = response.content_length() {
262 if content_length < 0 {
263 return Err(AdapterError::S3 {
264 op: "s3_get_object",
265 detail: format!("object {key} returned a negative content length"),
266 });
267 }
268 if content_length as usize > max_object_bytes {
269 return Err(AdapterError::S3 {
270 op: "s3_get_object",
271 detail: format!(
272 "object too large key={key} len={} max={max_object_bytes}",
273 content_length
274 ),
275 });
276 }
277 }
278
279 let e_tag = response.e_tag().map(ToOwned::to_owned);
280
281 let payload = response
282 .body
283 .collect()
284 .await
285 .map_err(|err| AdapterError::S3 {
286 op: "s3_collect_object",
287 detail: err.to_string(),
288 })?
289 .into_bytes()
290 .to_vec();
291
292 if payload.len() > max_object_bytes {
293 return Err(AdapterError::S3 {
294 op: "s3_collect_object",
295 detail: format!(
296 "object too large key={key} len={} max={max_object_bytes}",
297 payload.len()
298 ),
299 });
300 }
301
302 let size_bytes = payload.len();
303 fetched.push(S3IngressObject {
304 key,
305 payload,
306 size_bytes,
307 e_tag,
308 pending_after_read: 0,
309 });
310 }
311
312 let total = fetched.len();
313 for (index, object) in fetched.iter_mut().enumerate() {
314 object.pending_after_read = total.saturating_sub(index + 1);
315 }
316
317 Ok(fetched)
318 }
319}
320
321impl S3IngressClient for AsyncS3IngressClient {
322 fn read_next(&mut self, timeout: Duration) -> Result<Option<S3IngressObject>, AdapterError> {
323 if let Some(mut object) = self.pending.pop_front() {
324 object.pending_after_read = self.pending.len();
325 return Ok(Some(object));
326 }
327
328 let client = self.client.clone();
329 let bucket = self.bucket.clone();
330 let prefix = self.prefix.clone();
331 let max_object_bytes = self.max_object_bytes;
332 let fetched = self.runtime.block_on(Self::fetch_pending_objects(
333 client,
334 bucket,
335 prefix,
336 max_object_bytes,
337 &self.seen_keys,
338 ))?;
339
340 if fetched.is_empty() {
341 std::thread::sleep(timeout);
342 return Ok(None);
343 }
344
345 for object in fetched {
346 self.seen_keys.insert(object.key.clone());
347 self.pending.push_back(object);
348 }
349
350 let Some(mut object) = self.pending.pop_front() else {
351 return Ok(None);
352 };
353 object.pending_after_read = self.pending.len();
354 Ok(Some(object))
355 }
356}
357
358struct AsyncS3EgressClient {
359 runtime: Runtime,
360 client: aws_sdk_s3::Client,
361 bucket: String,
362}
363
364impl AsyncS3EgressClient {
365 fn connect(region: &str, endpoint: Option<&str>, bucket: &str) -> Result<Self, AdapterError> {
366 let runtime = tokio::runtime::Builder::new_current_thread()
367 .enable_all()
368 .build()
369 .map_err(|err| AdapterError::S3 {
370 op: "s3_runtime_build",
371 detail: err.to_string(),
372 })?;
373 let client = runtime.block_on(async {
374 let shared_config = aws_config::defaults(BehaviorVersion::latest())
375 .region(Region::new(region.to_string()))
376 .load()
377 .await;
378 let mut builder = aws_sdk_s3::config::Builder::from(&shared_config);
379 if let Some(endpoint) = endpoint {
380 builder = builder.endpoint_url(endpoint).force_path_style(true);
381 }
382 Ok::<_, AdapterError>(aws_sdk_s3::Client::from_conf(builder.build()))
383 })?;
384
385 Ok(Self {
386 runtime,
387 client,
388 bucket: bucket.to_string(),
389 })
390 }
391}
392
393impl S3EgressClient for AsyncS3EgressClient {
394 fn put_object(
395 &mut self,
396 key: &str,
397 payload: &[u8],
398 content_type: Option<&str>,
399 ) -> Result<S3PutResult, AdapterError> {
400 let client = self.client.clone();
401 let bucket = self.bucket.clone();
402 let key = key.to_string();
403 let payload = payload.to_vec();
404 let content_type = content_type.map(ToOwned::to_owned);
405
406 self.runtime.block_on(async move {
407 let mut request = client
408 .put_object()
409 .bucket(bucket)
410 .key(key)
411 .body(ByteStream::from(payload));
412 if let Some(content_type) = content_type {
413 request = request.content_type(content_type);
414 }
415 let response = request.send().await.map_err(|err| AdapterError::S3 {
416 op: "s3_put_object",
417 detail: err.to_string(),
418 })?;
419 Ok(S3PutResult {
420 e_tag: response.e_tag().map(ToOwned::to_owned),
421 })
422 })
423 }
424}
425
426fn normalize_prefix(prefix: Option<&str>) -> Option<String> {
427 prefix
428 .map(str::trim)
429 .filter(|prefix| !prefix.is_empty())
430 .map(ToOwned::to_owned)
431}
432
433fn build_object_key(prefix: Option<&str>, sequence: u64) -> String {
434 let nanos = SystemTime::now()
435 .duration_since(UNIX_EPOCH)
436 .unwrap_or_default()
437 .as_nanos();
438 let leaf = format!("{nanos:020}-{sequence:020}.bin");
439 match prefix
440 .map(|prefix| prefix.trim_end_matches('/'))
441 .filter(|prefix| !prefix.is_empty())
442 {
443 Some(prefix) => format!("{}/{}", prefix.trim_end_matches('/'), leaf),
444 None => leaf,
445 }
446}
447
448pub struct S3Ingress {
454 name: String,
455 client: Box<dyn S3IngressClient>,
456 poll_interval: Duration,
457 status: Arc<Mutex<S3IngressStatus>>,
458}
459
460impl S3Ingress {
461 pub fn new(
463 name: impl Into<String>,
464 bucket: &str,
465 prefix: Option<&str>,
466 region: &str,
467 endpoint: Option<&str>,
468 poll_interval: Duration,
469 max_object_bytes: usize,
470 ) -> Result<Self, AdapterError> {
471 let client =
472 AsyncS3IngressClient::connect(region, endpoint, bucket, prefix, max_object_bytes)?;
473 Ok(Self {
474 name: name.into(),
475 client: Box::new(client),
476 poll_interval,
477 status: Arc::new(Mutex::new(S3IngressStatus::default())),
478 })
479 }
480
481 #[cfg(test)]
482 pub(crate) fn with_client(
484 name: impl Into<String>,
485 client: impl S3IngressClient + 'static,
486 poll_interval: Duration,
487 ) -> Self {
488 Self {
489 name: name.into(),
490 client: Box::new(client),
491 poll_interval,
492 status: Arc::new(Mutex::new(S3IngressStatus::default())),
493 }
494 }
495}
496
497impl Adapter for S3Ingress {
498 fn name(&self) -> &str {
499 &self.name
500 }
501
502 fn transport_kind(&self) -> &str {
503 "s3"
504 }
505
506 fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
507 self.status.lock().unwrap().status_fields()
508 }
509}
510
511impl IngressAdapter for S3Ingress {
512 fn read_next(&mut self, out: &mut [u8]) -> Result<Option<usize>, AdapterError> {
513 self.status.lock().unwrap().transfer_state = "polling".to_string();
514 let Some(object) = self.client.read_next(self.poll_interval)? else {
515 self.status.lock().unwrap().transfer_state = "idle".to_string();
516 return Ok(None);
517 };
518 let payload = object.payload;
519 if payload.len() > out.len() {
520 return Err(AdapterError::S3 {
521 op: "s3_read_next",
522 detail: format!("payload too large len={} buf={}", payload.len(), out.len()),
523 });
524 }
525 {
526 let mut status = self.status.lock().unwrap();
527 status.transfer_state = "downloaded".to_string();
528 status.pending_objects = object.pending_after_read as u64;
529 if !object.key.is_empty() {
530 status.last_object_key = Some(object.key);
531 }
532 status.last_object_size_bytes = Some(object.size_bytes as u64);
533 status.last_object_etag = object.e_tag;
534 }
535 out[..payload.len()].copy_from_slice(&payload);
536 Ok(Some(payload.len()))
537 }
538}
539
540pub struct S3Egress {
545 name: String,
546 client: Box<dyn S3EgressClient>,
547 prefix: Option<String>,
548 content_type: Option<String>,
549 sequence: u64,
550 status: Arc<Mutex<S3EgressStatus>>,
551}
552
553impl S3Egress {
554 pub fn new(
556 name: impl Into<String>,
557 bucket: &str,
558 prefix: Option<&str>,
559 region: &str,
560 endpoint: Option<&str>,
561 content_type: Option<&str>,
562 ) -> Result<Self, AdapterError> {
563 let client = AsyncS3EgressClient::connect(region, endpoint, bucket)?;
564 Ok(Self {
565 name: name.into(),
566 client: Box::new(client),
567 prefix: normalize_prefix(prefix),
568 content_type: content_type.map(ToOwned::to_owned),
569 sequence: 0,
570 status: Arc::new(Mutex::new(S3EgressStatus::default())),
571 })
572 }
573
574 #[cfg(test)]
575 pub(crate) fn with_client(
577 name: impl Into<String>,
578 client: impl S3EgressClient + 'static,
579 prefix: Option<&str>,
580 content_type: Option<&str>,
581 ) -> Self {
582 Self {
583 name: name.into(),
584 client: Box::new(client),
585 prefix: normalize_prefix(prefix),
586 content_type: content_type.map(ToOwned::to_owned),
587 sequence: 0,
588 status: Arc::new(Mutex::new(S3EgressStatus::default())),
589 }
590 }
591}
592
593impl Adapter for S3Egress {
594 fn name(&self) -> &str {
595 &self.name
596 }
597
598 fn transport_kind(&self) -> &str {
599 "s3"
600 }
601
602 fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
603 self.status.lock().unwrap().status_fields()
604 }
605}
606
607impl EgressAdapter for S3Egress {
608 fn write_msg(&mut self, msg: &[u8]) -> Result<(), AdapterError> {
609 self.sequence = self.sequence.saturating_add(1);
610 let key = build_object_key(self.prefix.as_deref(), self.sequence);
611 self.status.lock().unwrap().transfer_state = "uploading".to_string();
612 let result = self
613 .client
614 .put_object(&key, msg, self.content_type.as_deref())?;
615 let mut status = self.status.lock().unwrap();
616 status.transfer_state = "uploaded".to_string();
617 status.last_object_key = Some(key);
618 status.last_object_size_bytes = Some(msg.len() as u64);
619 status.last_object_etag = result.e_tag;
620 Ok(())
621 }
622
623 fn flush(&mut self) -> Result<(), AdapterError> {
624 Ok(())
625 }
626}