byteor_adapters/
core.rs

1use byteor_pipeline_exec::{ExecError, StageResolver};
2use byteor_pipeline_spec::PipelineSpecV1;
3
4/// Snapshot-ready runtime status for one adapter endpoint.
5#[derive(Clone, Debug, PartialEq, Eq)]
6pub struct AdapterRuntimeStatus {
7    /// Stable adapter name for logs/ops.
8    pub name: String,
9    /// Endpoint role within the adapter loop.
10    pub role: String,
11    /// Adapter transport family.
12    pub transport: String,
13    /// Last observed connection/runtime state.
14    pub connection_state: String,
15    /// Number of messages handled by this endpoint.
16    pub message_count: u64,
17    /// Number of recent failures observed for this endpoint.
18    pub recent_failures: u64,
19    /// Most recent error string, when available.
20    pub last_error: Option<String>,
21    /// Queue/stream lag where the adapter can report it.
22    pub lag: Option<u64>,
23    /// Transport-specific status details for richer operator visibility.
24    pub details: serde_json::Map<String, serde_json::Value>,
25}
26
27impl AdapterRuntimeStatus {
28    fn new(adapter: &dyn Adapter, role: &str) -> Self {
29        Self {
30            name: adapter.name().to_string(),
31            role: role.to_string(),
32            transport: adapter.transport_kind().to_string(),
33            connection_state: adapter.connection_state().to_string(),
34            message_count: 0,
35            recent_failures: 0,
36            last_error: None,
37            lag: adapter.lag(),
38            details: adapter.status_fields(),
39        }
40    }
41
42    fn refresh(&mut self, adapter: &dyn Adapter) {
43        self.connection_state = adapter.connection_state().to_string();
44        self.lag = adapter.lag();
45        self.details = adapter.status_fields();
46    }
47
48    fn record_error(&mut self, error: &impl core::fmt::Display) {
49        self.connection_state = "error".to_string();
50        self.recent_failures = self.recent_failures.saturating_add(1);
51        self.last_error = Some(error.to_string());
52    }
53}
54
55/// Adapter-level errors.
56#[derive(Debug)]
57pub enum AdapterError {
58    /// IO error.
59    Io {
60        /// Which operation failed.
61        op: &'static str,
62        /// Underlying IO error.
63        source: std::io::Error,
64    },
65    /// HTTP or protocol error.
66    Http {
67        /// Which operation failed.
68        op: &'static str,
69        /// Human-readable details.
70        detail: String,
71    },
72    /// gRPC transport or protocol error.
73    Grpc {
74        /// Which operation failed.
75        op: &'static str,
76        /// Human-readable details.
77        detail: String,
78    },
79    /// WebSocket transport or protocol error.
80    WebSocket {
81        /// Which operation failed.
82        op: &'static str,
83        /// Human-readable details.
84        detail: String,
85    },
86    /// Kafka transport or protocol error.
87    Kafka {
88        /// Which operation failed.
89        op: &'static str,
90        /// Human-readable details.
91        detail: String,
92    },
93    /// NATS transport or protocol error.
94    Nats {
95        /// Which operation failed.
96        op: &'static str,
97        /// Human-readable details.
98        detail: String,
99    },
100    /// RabbitMQ Streams transport or protocol error.
101    RabbitMq {
102        /// Which operation failed.
103        op: &'static str,
104        /// Human-readable details.
105        detail: String,
106    },
107    /// Redis transport or protocol error.
108    Redis {
109        /// Which operation failed.
110        op: &'static str,
111        /// Human-readable details.
112        detail: String,
113    },
114    /// S3 transport or protocol error.
115    S3 {
116        /// Which operation failed.
117        op: &'static str,
118        /// Human-readable details.
119        detail: String,
120    },
121    /// PostgreSQL transport or protocol error.
122    Postgres {
123        /// Which operation failed.
124        op: &'static str,
125        /// Human-readable details.
126        detail: String,
127    },
128    /// Adapter configuration error.
129    Config {
130        /// Human-readable details.
131        detail: String,
132    },
133}
134
135/// Adapter loop errors.
136#[derive(Debug)]
137pub enum AdapterLoopFailure {
138    /// Adapter IO or protocol failure.
139    Adapter(AdapterError),
140    /// Pipeline execution failure.
141    Exec(ExecError),
142}
143
144/// Adapter loop failure plus the runtime status captured before exit.
145#[derive(Debug)]
146pub struct AdapterLoopError {
147    /// Partial adapter stats captured before the loop failed.
148    pub stats: AdapterLoopStats,
149    /// Underlying failure source.
150    pub failure: AdapterLoopFailure,
151}
152
153impl AdapterLoopError {
154    fn adapter(error: AdapterError, stats: AdapterLoopStats) -> Self {
155        Self {
156            stats,
157            failure: AdapterLoopFailure::Adapter(error),
158        }
159    }
160
161    fn exec(error: ExecError, stats: AdapterLoopStats) -> Self {
162        Self {
163            stats,
164            failure: AdapterLoopFailure::Exec(error),
165        }
166    }
167}
168
169impl core::fmt::Display for AdapterLoopFailure {
170    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
171        match self {
172            Self::Adapter(err) => write!(f, "adapter loop adapter error: {err}"),
173            Self::Exec(err) => write!(f, "adapter loop exec error: {err}"),
174        }
175    }
176}
177
178impl std::error::Error for AdapterLoopFailure {
179    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
180        match self {
181            Self::Adapter(err) => Some(err),
182            Self::Exec(err) => Some(err),
183        }
184    }
185}
186
187impl core::fmt::Display for AdapterLoopError {
188    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
189        write!(f, "{}", self.failure)
190    }
191}
192
193impl std::error::Error for AdapterLoopError {
194    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
195        Some(&self.failure)
196    }
197}
198
199/// Summary stats for one adapter loop run.
200#[derive(Clone, Debug, PartialEq, Eq)]
201pub struct AdapterLoopStats {
202    /// Number of input messages read from ingress.
203    pub ingress_messages: u64,
204    /// Number of output messages written to egress.
205    pub egress_messages: u64,
206    /// Last observed ingress adapter status.
207    pub ingress: AdapterRuntimeStatus,
208    /// Last observed egress adapter status.
209    pub egress: AdapterRuntimeStatus,
210}
211
212impl AdapterLoopStats {
213    fn new(ingress: &dyn Adapter, egress: &dyn Adapter) -> Self {
214        Self {
215            ingress_messages: 0,
216            egress_messages: 0,
217            ingress: AdapterRuntimeStatus::new(ingress, "ingress"),
218            egress: AdapterRuntimeStatus::new(egress, "egress"),
219        }
220    }
221
222    /// Return the snapshot-ready adapter status entries for this loop.
223    pub fn snapshot_entries(&self) -> Vec<serde_json::Value> {
224        vec![
225            serde_json::json!({
226                "name": self.ingress.name,
227                "role": self.ingress.role,
228                "transport": self.ingress.transport,
229                "connection_state": self.ingress.connection_state,
230                "message_count": self.ingress.message_count,
231                "recent_failures": self.ingress.recent_failures,
232                "last_error": self.ingress.last_error,
233                "lag": self.ingress.lag,
234                "details": self.ingress.details,
235            }),
236            serde_json::json!({
237                "name": self.egress.name,
238                "role": self.egress.role,
239                "transport": self.egress.transport,
240                "connection_state": self.egress.connection_state,
241                "message_count": self.egress.message_count,
242                "recent_failures": self.egress.recent_failures,
243                "last_error": self.egress.last_error,
244                "lag": self.egress.lag,
245                "details": self.egress.details,
246            }),
247        ]
248    }
249
250    /// Merge adapter status into a snapshot payload under the `adapters` key.
251    pub fn merge_into_snapshot(&self, snapshot: &mut serde_json::Value) {
252        if let Some(map) = snapshot.as_object_mut() {
253            map.insert(
254                "adapters".to_string(),
255                serde_json::Value::Array(self.snapshot_entries()),
256            );
257        }
258    }
259}
260
261/// Optional controls for one adapter loop run.
262#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
263pub struct AdapterLoopOptions {
264    /// Stop the loop after reading at most this many ingress messages.
265    pub max_ingress_messages: Option<u64>,
266}
267
268impl core::fmt::Display for AdapterError {
269    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
270        match self {
271            Self::Io { op, source } => write!(f, "{op} failed: {source}"),
272            Self::Http { op, detail } => write!(f, "{op} failed: {detail}"),
273            Self::Grpc { op, detail } => write!(f, "{op} failed: {detail}"),
274            Self::WebSocket { op, detail } => write!(f, "{op} failed: {detail}"),
275            Self::Kafka { op, detail } => write!(f, "{op} failed: {detail}"),
276            Self::Nats { op, detail } => write!(f, "{op} failed: {detail}"),
277            Self::RabbitMq { op, detail } => write!(f, "{op} failed: {detail}"),
278            Self::Redis { op, detail } => write!(f, "{op} failed: {detail}"),
279            Self::S3 { op, detail } => write!(f, "{op} failed: {detail}"),
280            Self::Postgres { op, detail } => write!(f, "{op} failed: {detail}"),
281            Self::Config { detail } => write!(f, "config error: {detail}"),
282        }
283    }
284}
285
286impl std::error::Error for AdapterError {
287    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
288        match self {
289            Self::Io { source, .. } => Some(source),
290            Self::Http { .. }
291            | Self::Grpc { .. }
292            | Self::WebSocket { .. }
293            | Self::Kafka { .. }
294            | Self::Nats { .. }
295            | Self::RabbitMq { .. }
296            | Self::Redis { .. }
297            | Self::S3 { .. }
298            | Self::Postgres { .. }
299            | Self::Config { .. } => None,
300        }
301    }
302}
303
304/// Minimal adapter identity contract.
305pub trait Adapter {
306    /// Stable adapter name for logs/obs.
307    fn name(&self) -> &str;
308
309    /// Adapter transport family label.
310    fn transport_kind(&self) -> &str {
311        "generic"
312    }
313
314    /// Best-effort connection/runtime state for operator visibility.
315    fn connection_state(&self) -> &str {
316        "ready"
317    }
318
319    /// Best-effort queue/stream lag, when the adapter can observe it.
320    fn lag(&self) -> Option<u64> {
321        None
322    }
323
324    /// Transport-specific runtime fields that can be attached to operator snapshots.
325    fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
326        serde_json::Map::new()
327    }
328}
329
330impl<T> Adapter for Box<T>
331where
332    T: Adapter + ?Sized,
333{
334    fn name(&self) -> &str {
335        (**self).name()
336    }
337
338    fn transport_kind(&self) -> &str {
339        (**self).transport_kind()
340    }
341
342    fn connection_state(&self) -> &str {
343        (**self).connection_state()
344    }
345
346    fn lag(&self) -> Option<u64> {
347        (**self).lag()
348    }
349
350    fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
351        (**self).status_fields()
352    }
353}
354
355/// Ingress adapter: produces input messages.
356///
357/// The API is deliberately simple and allocation-free for callers: the caller provides `out`.
358pub trait IngressAdapter: Adapter {
359    /// Read the next message.
360    ///
361    /// Returns:
362    /// - `Ok(Some(n))` when `n` bytes were written to `out`
363    /// - `Ok(None)` on end-of-stream
364    fn read_next(&mut self, out: &mut [u8]) -> Result<Option<usize>, AdapterError>;
365}
366
367impl<T> IngressAdapter for Box<T>
368where
369    T: IngressAdapter + ?Sized,
370{
371    fn read_next(&mut self, out: &mut [u8]) -> Result<Option<usize>, AdapterError> {
372        (**self).read_next(out)
373    }
374}
375
376/// Egress adapter: consumes output messages.
377pub trait EgressAdapter: Adapter {
378    /// Write one message.
379    fn write_msg(&mut self, msg: &[u8]) -> Result<(), AdapterError>;
380
381    /// Flush buffered output.
382    fn flush(&mut self) -> Result<(), AdapterError>;
383}
384
385impl<T> EgressAdapter for Box<T>
386where
387    T: EgressAdapter + ?Sized,
388{
389    fn write_msg(&mut self, msg: &[u8]) -> Result<(), AdapterError> {
390        (**self).write_msg(msg)
391    }
392
393    fn flush(&mut self) -> Result<(), AdapterError> {
394        (**self).flush()
395    }
396}
397
398/// Run a minimal adapter loop for an in-memory SingleRing worker job.
399///
400/// The loop reads one message at a time from `ingress`, executes the provided `single_ring`
401/// pipeline via the in-memory executor, writes all produced outputs to `egress`, and flushes after
402/// each emitted message.
403pub fn run_single_ring_adapter_loop<I, E>(
404    single_ring: &PipelineSpecV1,
405    resolver: &dyn StageResolver,
406    ingress: &mut I,
407    egress: &mut E,
408) -> Result<AdapterLoopStats, AdapterLoopError>
409where
410    I: IngressAdapter,
411    E: EgressAdapter,
412{
413    run_single_ring_adapter_loop_with_options(
414        single_ring,
415        resolver,
416        ingress,
417        egress,
418        AdapterLoopOptions::default(),
419    )
420}
421
422/// Run a minimal adapter loop for an in-memory SingleRing worker job with optional controls.
423pub fn run_single_ring_adapter_loop_with_options<I, E>(
424    single_ring: &PipelineSpecV1,
425    resolver: &dyn StageResolver,
426    ingress: &mut I,
427    egress: &mut E,
428    options: AdapterLoopOptions,
429) -> Result<AdapterLoopStats, AdapterLoopError>
430where
431    I: IngressAdapter,
432    E: EgressAdapter,
433{
434    run_single_ring_adapter_loop_with_options_and_observer(
435        single_ring,
436        resolver,
437        ingress,
438        egress,
439        options,
440        |_| {},
441    )
442}
443
444/// Run a minimal adapter loop and observe best-effort progress snapshots after each message.
445pub fn run_single_ring_adapter_loop_with_options_and_observer<I, E, F>(
446    single_ring: &PipelineSpecV1,
447    resolver: &dyn StageResolver,
448    ingress: &mut I,
449    egress: &mut E,
450    options: AdapterLoopOptions,
451    mut observe: F,
452) -> Result<AdapterLoopStats, AdapterLoopError>
453where
454    I: IngressAdapter,
455    E: EgressAdapter,
456    F: FnMut(&AdapterLoopStats),
457{
458    let mut input_buf = vec![0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
459    let mut stats = AdapterLoopStats::new(ingress, egress);
460
461    loop {
462        if options
463            .max_ingress_messages
464            .is_some_and(|limit| stats.ingress_messages >= limit)
465        {
466            break;
467        }
468
469        stats.ingress.refresh(ingress);
470        let Some(n) = ingress.read_next(&mut input_buf).map_err(|error| {
471            stats.ingress.record_error(&error);
472            AdapterLoopError::adapter(error, stats.clone())
473        })?
474        else {
475            break;
476        };
477        stats.ingress_messages = stats.ingress_messages.saturating_add(1);
478        stats.ingress.message_count = stats.ingress_messages;
479        stats.ingress.refresh(ingress);
480
481        let outputs = byteor_pipeline_exec::run_single_ring_mem(
482            single_ring,
483            resolver,
484            &[input_buf[..n].to_vec()],
485        )
486        .map_err(|error| AdapterLoopError::exec(error, stats.clone()))?;
487
488        for output in outputs {
489            stats.egress.refresh(egress);
490            egress.write_msg(&output).map_err(|error| {
491                stats.egress.record_error(&error);
492                AdapterLoopError::adapter(error, stats.clone())
493            })?;
494            egress.flush().map_err(|error| {
495                stats.egress.record_error(&error);
496                AdapterLoopError::adapter(error, stats.clone())
497            })?;
498            stats.egress_messages = stats.egress_messages.saturating_add(1);
499            stats.egress.message_count = stats.egress_messages;
500            stats.egress.refresh(egress);
501        }
502
503        observe(&stats);
504    }
505
506    stats.ingress.refresh(ingress);
507    stats.egress.refresh(egress);
508    observe(&stats);
509    Ok(stats)
510}