byteor_adapters/
s3.rs

1use std::collections::{HashSet, VecDeque};
2use std::sync::{Arc, Mutex};
3use std::time::{Duration, SystemTime, UNIX_EPOCH};
4
5use aws_config::{BehaviorVersion, Region};
6use aws_sdk_s3::primitives::ByteStream;
7use tokio::runtime::Runtime;
8
9use crate::{Adapter, AdapterError, EgressAdapter, IngressAdapter};
10
11pub(crate) struct S3IngressObject {
12    pub key: String,
13    pub payload: Vec<u8>,
14    pub size_bytes: usize,
15    pub e_tag: Option<String>,
16    pub pending_after_read: usize,
17}
18
19pub(crate) struct S3PutResult {
20    pub e_tag: Option<String>,
21}
22
23#[derive(Debug)]
24struct S3IngressStatus {
25    transfer_state: String,
26    last_object_key: Option<String>,
27    last_object_size_bytes: Option<u64>,
28    last_object_etag: Option<String>,
29    pending_objects: u64,
30}
31
32impl Default for S3IngressStatus {
33    fn default() -> Self {
34        Self {
35            transfer_state: "idle".to_string(),
36            last_object_key: None,
37            last_object_size_bytes: None,
38            last_object_etag: None,
39            pending_objects: 0,
40        }
41    }
42}
43
44impl S3IngressStatus {
45    fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
46        let mut details = serde_json::Map::new();
47        details.insert(
48            "transfer_state".to_string(),
49            serde_json::Value::String(self.transfer_state.clone()),
50        );
51        details.insert(
52            "pending_objects".to_string(),
53            serde_json::Value::from(self.pending_objects),
54        );
55        if let Some(key) = &self.last_object_key {
56            details.insert(
57                "last_object_key".to_string(),
58                serde_json::Value::String(key.clone()),
59            );
60        }
61        if let Some(size_bytes) = self.last_object_size_bytes {
62            details.insert(
63                "last_object_size_bytes".to_string(),
64                serde_json::Value::from(size_bytes),
65            );
66        }
67        if let Some(e_tag) = &self.last_object_etag {
68            details.insert(
69                "last_object_etag".to_string(),
70                serde_json::Value::String(e_tag.clone()),
71            );
72        }
73        details
74    }
75}
76
77#[derive(Debug)]
78struct S3EgressStatus {
79    transfer_state: String,
80    last_object_key: Option<String>,
81    last_object_size_bytes: Option<u64>,
82    last_object_etag: Option<String>,
83}
84
85impl Default for S3EgressStatus {
86    fn default() -> Self {
87        Self {
88            transfer_state: "idle".to_string(),
89            last_object_key: None,
90            last_object_size_bytes: None,
91            last_object_etag: None,
92        }
93    }
94}
95
96impl S3EgressStatus {
97    fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
98        let mut details = serde_json::Map::new();
99        details.insert(
100            "transfer_state".to_string(),
101            serde_json::Value::String(self.transfer_state.clone()),
102        );
103        if let Some(key) = &self.last_object_key {
104            details.insert(
105                "last_object_key".to_string(),
106                serde_json::Value::String(key.clone()),
107            );
108        }
109        if let Some(size_bytes) = self.last_object_size_bytes {
110            details.insert(
111                "last_object_size_bytes".to_string(),
112                serde_json::Value::from(size_bytes),
113            );
114        }
115        if let Some(e_tag) = &self.last_object_etag {
116            details.insert(
117                "last_object_etag".to_string(),
118                serde_json::Value::String(e_tag.clone()),
119            );
120        }
121        details
122    }
123}
124
125/// Internal S3 ingress client contract used by the adapter and crate-local tests.
126pub(crate) trait S3IngressClient: Send {
127    fn read_next(&mut self, timeout: Duration) -> Result<Option<S3IngressObject>, AdapterError>;
128}
129
130/// Internal S3 egress client contract used by the adapter and crate-local tests.
131pub(crate) trait S3EgressClient: Send {
132    fn put_object(
133        &mut self,
134        key: &str,
135        payload: &[u8],
136        content_type: Option<&str>,
137    ) -> Result<S3PutResult, AdapterError>;
138}
139
140struct AsyncS3IngressClient {
141    runtime: Runtime,
142    client: aws_sdk_s3::Client,
143    bucket: String,
144    prefix: Option<String>,
145    max_object_bytes: usize,
146    seen_keys: HashSet<String>,
147    pending: VecDeque<S3IngressObject>,
148}
149
150impl AsyncS3IngressClient {
151    fn connect(
152        region: &str,
153        endpoint: Option<&str>,
154        bucket: &str,
155        prefix: Option<&str>,
156        max_object_bytes: usize,
157    ) -> Result<Self, AdapterError> {
158        let runtime = tokio::runtime::Builder::new_current_thread()
159            .enable_all()
160            .build()
161            .map_err(|err| AdapterError::S3 {
162                op: "s3_runtime_build",
163                detail: err.to_string(),
164            })?;
165        let client = runtime.block_on(async {
166            let shared_config = aws_config::defaults(BehaviorVersion::latest())
167                .region(Region::new(region.to_string()))
168                .load()
169                .await;
170            let mut builder = aws_sdk_s3::config::Builder::from(&shared_config);
171            if let Some(endpoint) = endpoint {
172                builder = builder.endpoint_url(endpoint).force_path_style(true);
173            }
174            Ok::<_, AdapterError>(aws_sdk_s3::Client::from_conf(builder.build()))
175        })?;
176
177        Ok(Self {
178            runtime,
179            client,
180            bucket: bucket.to_string(),
181            prefix: normalize_prefix(prefix),
182            max_object_bytes,
183            seen_keys: HashSet::new(),
184            pending: VecDeque::new(),
185        })
186    }
187
188    async fn fetch_pending_objects(
189        client: aws_sdk_s3::Client,
190        bucket: String,
191        prefix: Option<String>,
192        max_object_bytes: usize,
193        seen_keys: &HashSet<String>,
194    ) -> Result<Vec<S3IngressObject>, AdapterError> {
195        let mut continuation_token = None;
196        let mut discovered = Vec::new();
197
198        loop {
199            let mut request = client.list_objects_v2().bucket(&bucket);
200            if let Some(prefix) = &prefix {
201                request = request.prefix(prefix);
202            }
203            if let Some(token) = continuation_token.take() {
204                request = request.continuation_token(token);
205            }
206
207            let response = request.send().await.map_err(|err| AdapterError::S3 {
208                op: "s3_list_objects_v2",
209                detail: err.to_string(),
210            })?;
211
212            for object in response.contents() {
213                let Some(key) = object.key() else {
214                    continue;
215                };
216                if seen_keys.contains(key) {
217                    continue;
218                }
219                if let Some(size) = object.size() {
220                    if size < 0 {
221                        return Err(AdapterError::S3 {
222                            op: "s3_list_objects_v2",
223                            detail: format!("object {key} returned a negative size"),
224                        });
225                    }
226                    if size as usize > max_object_bytes {
227                        return Err(AdapterError::S3 {
228                            op: "s3_get_object",
229                            detail: format!(
230                                "object too large key={key} len={} max={max_object_bytes}",
231                                size
232                            ),
233                        });
234                    }
235                }
236                discovered.push(key.to_string());
237            }
238
239            if !response.is_truncated().unwrap_or(false) {
240                break;
241            }
242            continuation_token = response.next_continuation_token().map(ToOwned::to_owned);
243            if continuation_token.is_none() {
244                break;
245            }
246        }
247
248        discovered.sort();
249        let mut fetched = Vec::with_capacity(discovered.len());
250        for key in discovered {
251            let response = client
252                .get_object()
253                .bucket(&bucket)
254                .key(&key)
255                .send()
256                .await
257                .map_err(|err| AdapterError::S3 {
258                    op: "s3_get_object",
259                    detail: err.to_string(),
260                })?;
261            if let Some(content_length) = response.content_length() {
262                if content_length < 0 {
263                    return Err(AdapterError::S3 {
264                        op: "s3_get_object",
265                        detail: format!("object {key} returned a negative content length"),
266                    });
267                }
268                if content_length as usize > max_object_bytes {
269                    return Err(AdapterError::S3 {
270                        op: "s3_get_object",
271                        detail: format!(
272                            "object too large key={key} len={} max={max_object_bytes}",
273                            content_length
274                        ),
275                    });
276                }
277            }
278
279            let e_tag = response.e_tag().map(ToOwned::to_owned);
280
281            let payload = response
282                .body
283                .collect()
284                .await
285                .map_err(|err| AdapterError::S3 {
286                    op: "s3_collect_object",
287                    detail: err.to_string(),
288                })?
289                .into_bytes()
290                .to_vec();
291
292            if payload.len() > max_object_bytes {
293                return Err(AdapterError::S3 {
294                    op: "s3_collect_object",
295                    detail: format!(
296                        "object too large key={key} len={} max={max_object_bytes}",
297                        payload.len()
298                    ),
299                });
300            }
301
302            let size_bytes = payload.len();
303            fetched.push(S3IngressObject {
304                key,
305                payload,
306                size_bytes,
307                e_tag,
308                pending_after_read: 0,
309            });
310        }
311
312        let total = fetched.len();
313        for (index, object) in fetched.iter_mut().enumerate() {
314            object.pending_after_read = total.saturating_sub(index + 1);
315        }
316
317        Ok(fetched)
318    }
319}
320
321impl S3IngressClient for AsyncS3IngressClient {
322    fn read_next(&mut self, timeout: Duration) -> Result<Option<S3IngressObject>, AdapterError> {
323        if let Some(mut object) = self.pending.pop_front() {
324            object.pending_after_read = self.pending.len();
325            return Ok(Some(object));
326        }
327
328        let client = self.client.clone();
329        let bucket = self.bucket.clone();
330        let prefix = self.prefix.clone();
331        let max_object_bytes = self.max_object_bytes;
332        let fetched = self.runtime.block_on(Self::fetch_pending_objects(
333            client,
334            bucket,
335            prefix,
336            max_object_bytes,
337            &self.seen_keys,
338        ))?;
339
340        if fetched.is_empty() {
341            std::thread::sleep(timeout);
342            return Ok(None);
343        }
344
345        for object in fetched {
346            self.seen_keys.insert(object.key.clone());
347            self.pending.push_back(object);
348        }
349
350        let Some(mut object) = self.pending.pop_front() else {
351            return Ok(None);
352        };
353        object.pending_after_read = self.pending.len();
354        Ok(Some(object))
355    }
356}
357
358struct AsyncS3EgressClient {
359    runtime: Runtime,
360    client: aws_sdk_s3::Client,
361    bucket: String,
362}
363
364impl AsyncS3EgressClient {
365    fn connect(region: &str, endpoint: Option<&str>, bucket: &str) -> Result<Self, AdapterError> {
366        let runtime = tokio::runtime::Builder::new_current_thread()
367            .enable_all()
368            .build()
369            .map_err(|err| AdapterError::S3 {
370                op: "s3_runtime_build",
371                detail: err.to_string(),
372            })?;
373        let client = runtime.block_on(async {
374            let shared_config = aws_config::defaults(BehaviorVersion::latest())
375                .region(Region::new(region.to_string()))
376                .load()
377                .await;
378            let mut builder = aws_sdk_s3::config::Builder::from(&shared_config);
379            if let Some(endpoint) = endpoint {
380                builder = builder.endpoint_url(endpoint).force_path_style(true);
381            }
382            Ok::<_, AdapterError>(aws_sdk_s3::Client::from_conf(builder.build()))
383        })?;
384
385        Ok(Self {
386            runtime,
387            client,
388            bucket: bucket.to_string(),
389        })
390    }
391}
392
393impl S3EgressClient for AsyncS3EgressClient {
394    fn put_object(
395        &mut self,
396        key: &str,
397        payload: &[u8],
398        content_type: Option<&str>,
399    ) -> Result<S3PutResult, AdapterError> {
400        let client = self.client.clone();
401        let bucket = self.bucket.clone();
402        let key = key.to_string();
403        let payload = payload.to_vec();
404        let content_type = content_type.map(ToOwned::to_owned);
405
406        self.runtime.block_on(async move {
407            let mut request = client
408                .put_object()
409                .bucket(bucket)
410                .key(key)
411                .body(ByteStream::from(payload));
412            if let Some(content_type) = content_type {
413                request = request.content_type(content_type);
414            }
415            let response = request.send().await.map_err(|err| AdapterError::S3 {
416                op: "s3_put_object",
417                detail: err.to_string(),
418            })?;
419            Ok(S3PutResult {
420                e_tag: response.e_tag().map(ToOwned::to_owned),
421            })
422        })
423    }
424}
425
426fn normalize_prefix(prefix: Option<&str>) -> Option<String> {
427    prefix
428        .map(str::trim)
429        .filter(|prefix| !prefix.is_empty())
430        .map(ToOwned::to_owned)
431}
432
433fn build_object_key(prefix: Option<&str>, sequence: u64) -> String {
434    let nanos = SystemTime::now()
435        .duration_since(UNIX_EPOCH)
436        .unwrap_or_default()
437        .as_nanos();
438    let leaf = format!("{nanos:020}-{sequence:020}.bin");
439    match prefix
440        .map(|prefix| prefix.trim_end_matches('/'))
441        .filter(|prefix| !prefix.is_empty())
442    {
443        Some(prefix) => format!("{}/{}", prefix.trim_end_matches('/'), leaf),
444        None => leaf,
445    }
446}
447
448/// Minimal blocking S3 ingress adapter.
449///
450/// This adapter polls one bucket/prefix and exposes each unseen object body as one ingress
451/// message. Credentials come from the AWS default provider chain; a custom `endpoint` can be used
452/// for S3-compatible stores such as MinIO.
453pub struct S3Ingress {
454    name: String,
455    client: Box<dyn S3IngressClient>,
456    poll_interval: Duration,
457    status: Arc<Mutex<S3IngressStatus>>,
458}
459
460impl S3Ingress {
461    /// Create a blocking S3 ingress adapter.
462    pub fn new(
463        name: impl Into<String>,
464        bucket: &str,
465        prefix: Option<&str>,
466        region: &str,
467        endpoint: Option<&str>,
468        poll_interval: Duration,
469        max_object_bytes: usize,
470    ) -> Result<Self, AdapterError> {
471        let client =
472            AsyncS3IngressClient::connect(region, endpoint, bucket, prefix, max_object_bytes)?;
473        Ok(Self {
474            name: name.into(),
475            client: Box::new(client),
476            poll_interval,
477            status: Arc::new(Mutex::new(S3IngressStatus::default())),
478        })
479    }
480
481    #[cfg(test)]
482    /// Build an S3 ingress adapter around a crate-local fake client.
483    pub(crate) fn with_client(
484        name: impl Into<String>,
485        client: impl S3IngressClient + 'static,
486        poll_interval: Duration,
487    ) -> Self {
488        Self {
489            name: name.into(),
490            client: Box::new(client),
491            poll_interval,
492            status: Arc::new(Mutex::new(S3IngressStatus::default())),
493        }
494    }
495}
496
497impl Adapter for S3Ingress {
498    fn name(&self) -> &str {
499        &self.name
500    }
501
502    fn transport_kind(&self) -> &str {
503        "s3"
504    }
505
506    fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
507        self.status.lock().unwrap().status_fields()
508    }
509}
510
511impl IngressAdapter for S3Ingress {
512    fn read_next(&mut self, out: &mut [u8]) -> Result<Option<usize>, AdapterError> {
513        self.status.lock().unwrap().transfer_state = "polling".to_string();
514        let Some(object) = self.client.read_next(self.poll_interval)? else {
515            self.status.lock().unwrap().transfer_state = "idle".to_string();
516            return Ok(None);
517        };
518        let payload = object.payload;
519        if payload.len() > out.len() {
520            return Err(AdapterError::S3 {
521                op: "s3_read_next",
522                detail: format!("payload too large len={} buf={}", payload.len(), out.len()),
523            });
524        }
525        {
526            let mut status = self.status.lock().unwrap();
527            status.transfer_state = "downloaded".to_string();
528            status.pending_objects = object.pending_after_read as u64;
529            if !object.key.is_empty() {
530                status.last_object_key = Some(object.key);
531            }
532            status.last_object_size_bytes = Some(object.size_bytes as u64);
533            status.last_object_etag = object.e_tag;
534        }
535        out[..payload.len()].copy_from_slice(&payload);
536        Ok(Some(payload.len()))
537    }
538}
539
540/// Minimal blocking S3 egress adapter.
541///
542/// Each `write_msg()` uploads one object under the configured bucket/prefix. Keys are generated as
543/// `<prefix>/<unix-nanos>-<sequence>.bin`.
544pub struct S3Egress {
545    name: String,
546    client: Box<dyn S3EgressClient>,
547    prefix: Option<String>,
548    content_type: Option<String>,
549    sequence: u64,
550    status: Arc<Mutex<S3EgressStatus>>,
551}
552
553impl S3Egress {
554    /// Create a blocking S3 egress adapter.
555    pub fn new(
556        name: impl Into<String>,
557        bucket: &str,
558        prefix: Option<&str>,
559        region: &str,
560        endpoint: Option<&str>,
561        content_type: Option<&str>,
562    ) -> Result<Self, AdapterError> {
563        let client = AsyncS3EgressClient::connect(region, endpoint, bucket)?;
564        Ok(Self {
565            name: name.into(),
566            client: Box::new(client),
567            prefix: normalize_prefix(prefix),
568            content_type: content_type.map(ToOwned::to_owned),
569            sequence: 0,
570            status: Arc::new(Mutex::new(S3EgressStatus::default())),
571        })
572    }
573
574    #[cfg(test)]
575    /// Build an S3 egress adapter around a crate-local fake client.
576    pub(crate) fn with_client(
577        name: impl Into<String>,
578        client: impl S3EgressClient + 'static,
579        prefix: Option<&str>,
580        content_type: Option<&str>,
581    ) -> Self {
582        Self {
583            name: name.into(),
584            client: Box::new(client),
585            prefix: normalize_prefix(prefix),
586            content_type: content_type.map(ToOwned::to_owned),
587            sequence: 0,
588            status: Arc::new(Mutex::new(S3EgressStatus::default())),
589        }
590    }
591}
592
593impl Adapter for S3Egress {
594    fn name(&self) -> &str {
595        &self.name
596    }
597
598    fn transport_kind(&self) -> &str {
599        "s3"
600    }
601
602    fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
603        self.status.lock().unwrap().status_fields()
604    }
605}
606
607impl EgressAdapter for S3Egress {
608    fn write_msg(&mut self, msg: &[u8]) -> Result<(), AdapterError> {
609        self.sequence = self.sequence.saturating_add(1);
610        let key = build_object_key(self.prefix.as_deref(), self.sequence);
611        self.status.lock().unwrap().transfer_state = "uploading".to_string();
612        let result = self
613            .client
614            .put_object(&key, msg, self.content_type.as_deref())?;
615        let mut status = self.status.lock().unwrap();
616        status.transfer_state = "uploaded".to_string();
617        status.last_object_key = Some(key);
618        status.last_object_size_bytes = Some(msg.len() as u64);
619        status.last_object_etag = result.e_tag;
620        Ok(())
621    }
622
623    fn flush(&mut self) -> Result<(), AdapterError> {
624        Ok(())
625    }
626}