1use byteor_pipeline_exec::{ExecError, StageResolver};
2use byteor_pipeline_spec::PipelineSpecV1;
3
4#[derive(Clone, Debug, PartialEq, Eq)]
6pub struct AdapterRuntimeStatus {
7 pub name: String,
9 pub role: String,
11 pub transport: String,
13 pub connection_state: String,
15 pub message_count: u64,
17 pub recent_failures: u64,
19 pub last_error: Option<String>,
21 pub lag: Option<u64>,
23 pub details: serde_json::Map<String, serde_json::Value>,
25}
26
27impl AdapterRuntimeStatus {
28 fn new(adapter: &dyn Adapter, role: &str) -> Self {
29 Self {
30 name: adapter.name().to_string(),
31 role: role.to_string(),
32 transport: adapter.transport_kind().to_string(),
33 connection_state: adapter.connection_state().to_string(),
34 message_count: 0,
35 recent_failures: 0,
36 last_error: None,
37 lag: adapter.lag(),
38 details: adapter.status_fields(),
39 }
40 }
41
42 fn refresh(&mut self, adapter: &dyn Adapter) {
43 self.connection_state = adapter.connection_state().to_string();
44 self.lag = adapter.lag();
45 self.details = adapter.status_fields();
46 }
47
48 fn record_error(&mut self, error: &impl core::fmt::Display) {
49 self.connection_state = "error".to_string();
50 self.recent_failures = self.recent_failures.saturating_add(1);
51 self.last_error = Some(error.to_string());
52 }
53}
54
55#[derive(Debug)]
57pub enum AdapterError {
58 Io {
60 op: &'static str,
62 source: std::io::Error,
64 },
65 Http {
67 op: &'static str,
69 detail: String,
71 },
72 Grpc {
74 op: &'static str,
76 detail: String,
78 },
79 WebSocket {
81 op: &'static str,
83 detail: String,
85 },
86 Kafka {
88 op: &'static str,
90 detail: String,
92 },
93 Nats {
95 op: &'static str,
97 detail: String,
99 },
100 RabbitMq {
102 op: &'static str,
104 detail: String,
106 },
107 Redis {
109 op: &'static str,
111 detail: String,
113 },
114 S3 {
116 op: &'static str,
118 detail: String,
120 },
121 Postgres {
123 op: &'static str,
125 detail: String,
127 },
128 Config {
130 detail: String,
132 },
133}
134
135#[derive(Debug)]
137pub enum AdapterLoopFailure {
138 Adapter(AdapterError),
140 Exec(ExecError),
142}
143
144#[derive(Debug)]
146pub struct AdapterLoopError {
147 pub stats: AdapterLoopStats,
149 pub failure: AdapterLoopFailure,
151}
152
153impl AdapterLoopError {
154 fn adapter(error: AdapterError, stats: AdapterLoopStats) -> Self {
155 Self {
156 stats,
157 failure: AdapterLoopFailure::Adapter(error),
158 }
159 }
160
161 fn exec(error: ExecError, stats: AdapterLoopStats) -> Self {
162 Self {
163 stats,
164 failure: AdapterLoopFailure::Exec(error),
165 }
166 }
167}
168
169impl core::fmt::Display for AdapterLoopFailure {
170 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
171 match self {
172 Self::Adapter(err) => write!(f, "adapter loop adapter error: {err}"),
173 Self::Exec(err) => write!(f, "adapter loop exec error: {err}"),
174 }
175 }
176}
177
178impl std::error::Error for AdapterLoopFailure {
179 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
180 match self {
181 Self::Adapter(err) => Some(err),
182 Self::Exec(err) => Some(err),
183 }
184 }
185}
186
187impl core::fmt::Display for AdapterLoopError {
188 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
189 write!(f, "{}", self.failure)
190 }
191}
192
193impl std::error::Error for AdapterLoopError {
194 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
195 Some(&self.failure)
196 }
197}
198
199#[derive(Clone, Debug, PartialEq, Eq)]
201pub struct AdapterLoopStats {
202 pub ingress_messages: u64,
204 pub egress_messages: u64,
206 pub ingress: AdapterRuntimeStatus,
208 pub egress: AdapterRuntimeStatus,
210}
211
212impl AdapterLoopStats {
213 fn new(ingress: &dyn Adapter, egress: &dyn Adapter) -> Self {
214 Self {
215 ingress_messages: 0,
216 egress_messages: 0,
217 ingress: AdapterRuntimeStatus::new(ingress, "ingress"),
218 egress: AdapterRuntimeStatus::new(egress, "egress"),
219 }
220 }
221
222 pub fn snapshot_entries(&self) -> Vec<serde_json::Value> {
224 vec![
225 serde_json::json!({
226 "name": self.ingress.name,
227 "role": self.ingress.role,
228 "transport": self.ingress.transport,
229 "connection_state": self.ingress.connection_state,
230 "message_count": self.ingress.message_count,
231 "recent_failures": self.ingress.recent_failures,
232 "last_error": self.ingress.last_error,
233 "lag": self.ingress.lag,
234 "details": self.ingress.details,
235 }),
236 serde_json::json!({
237 "name": self.egress.name,
238 "role": self.egress.role,
239 "transport": self.egress.transport,
240 "connection_state": self.egress.connection_state,
241 "message_count": self.egress.message_count,
242 "recent_failures": self.egress.recent_failures,
243 "last_error": self.egress.last_error,
244 "lag": self.egress.lag,
245 "details": self.egress.details,
246 }),
247 ]
248 }
249
250 pub fn merge_into_snapshot(&self, snapshot: &mut serde_json::Value) {
252 if let Some(map) = snapshot.as_object_mut() {
253 map.insert(
254 "adapters".to_string(),
255 serde_json::Value::Array(self.snapshot_entries()),
256 );
257 }
258 }
259}
260
261#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
263pub struct AdapterLoopOptions {
264 pub max_ingress_messages: Option<u64>,
266}
267
268impl core::fmt::Display for AdapterError {
269 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
270 match self {
271 Self::Io { op, source } => write!(f, "{op} failed: {source}"),
272 Self::Http { op, detail } => write!(f, "{op} failed: {detail}"),
273 Self::Grpc { op, detail } => write!(f, "{op} failed: {detail}"),
274 Self::WebSocket { op, detail } => write!(f, "{op} failed: {detail}"),
275 Self::Kafka { op, detail } => write!(f, "{op} failed: {detail}"),
276 Self::Nats { op, detail } => write!(f, "{op} failed: {detail}"),
277 Self::RabbitMq { op, detail } => write!(f, "{op} failed: {detail}"),
278 Self::Redis { op, detail } => write!(f, "{op} failed: {detail}"),
279 Self::S3 { op, detail } => write!(f, "{op} failed: {detail}"),
280 Self::Postgres { op, detail } => write!(f, "{op} failed: {detail}"),
281 Self::Config { detail } => write!(f, "config error: {detail}"),
282 }
283 }
284}
285
286impl std::error::Error for AdapterError {
287 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
288 match self {
289 Self::Io { source, .. } => Some(source),
290 Self::Http { .. }
291 | Self::Grpc { .. }
292 | Self::WebSocket { .. }
293 | Self::Kafka { .. }
294 | Self::Nats { .. }
295 | Self::RabbitMq { .. }
296 | Self::Redis { .. }
297 | Self::S3 { .. }
298 | Self::Postgres { .. }
299 | Self::Config { .. } => None,
300 }
301 }
302}
303
304pub trait Adapter {
306 fn name(&self) -> &str;
308
309 fn transport_kind(&self) -> &str {
311 "generic"
312 }
313
314 fn connection_state(&self) -> &str {
316 "ready"
317 }
318
319 fn lag(&self) -> Option<u64> {
321 None
322 }
323
324 fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
326 serde_json::Map::new()
327 }
328}
329
330impl<T> Adapter for Box<T>
331where
332 T: Adapter + ?Sized,
333{
334 fn name(&self) -> &str {
335 (**self).name()
336 }
337
338 fn transport_kind(&self) -> &str {
339 (**self).transport_kind()
340 }
341
342 fn connection_state(&self) -> &str {
343 (**self).connection_state()
344 }
345
346 fn lag(&self) -> Option<u64> {
347 (**self).lag()
348 }
349
350 fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
351 (**self).status_fields()
352 }
353}
354
355pub trait IngressAdapter: Adapter {
359 fn read_next(&mut self, out: &mut [u8]) -> Result<Option<usize>, AdapterError>;
365}
366
367impl<T> IngressAdapter for Box<T>
368where
369 T: IngressAdapter + ?Sized,
370{
371 fn read_next(&mut self, out: &mut [u8]) -> Result<Option<usize>, AdapterError> {
372 (**self).read_next(out)
373 }
374}
375
376pub trait EgressAdapter: Adapter {
378 fn write_msg(&mut self, msg: &[u8]) -> Result<(), AdapterError>;
380
381 fn flush(&mut self) -> Result<(), AdapterError>;
383}
384
385impl<T> EgressAdapter for Box<T>
386where
387 T: EgressAdapter + ?Sized,
388{
389 fn write_msg(&mut self, msg: &[u8]) -> Result<(), AdapterError> {
390 (**self).write_msg(msg)
391 }
392
393 fn flush(&mut self) -> Result<(), AdapterError> {
394 (**self).flush()
395 }
396}
397
398pub fn run_single_ring_adapter_loop<I, E>(
404 single_ring: &PipelineSpecV1,
405 resolver: &dyn StageResolver,
406 ingress: &mut I,
407 egress: &mut E,
408) -> Result<AdapterLoopStats, AdapterLoopError>
409where
410 I: IngressAdapter,
411 E: EgressAdapter,
412{
413 run_single_ring_adapter_loop_with_options(
414 single_ring,
415 resolver,
416 ingress,
417 egress,
418 AdapterLoopOptions::default(),
419 )
420}
421
422pub fn run_single_ring_adapter_loop_with_options<I, E>(
424 single_ring: &PipelineSpecV1,
425 resolver: &dyn StageResolver,
426 ingress: &mut I,
427 egress: &mut E,
428 options: AdapterLoopOptions,
429) -> Result<AdapterLoopStats, AdapterLoopError>
430where
431 I: IngressAdapter,
432 E: EgressAdapter,
433{
434 run_single_ring_adapter_loop_with_options_and_observer(
435 single_ring,
436 resolver,
437 ingress,
438 egress,
439 options,
440 |_| {},
441 )
442}
443
444pub fn run_single_ring_adapter_loop_with_options_and_observer<I, E, F>(
446 single_ring: &PipelineSpecV1,
447 resolver: &dyn StageResolver,
448 ingress: &mut I,
449 egress: &mut E,
450 options: AdapterLoopOptions,
451 mut observe: F,
452) -> Result<AdapterLoopStats, AdapterLoopError>
453where
454 I: IngressAdapter,
455 E: EgressAdapter,
456 F: FnMut(&AdapterLoopStats),
457{
458 let mut input_buf = vec![0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
459 let mut stats = AdapterLoopStats::new(ingress, egress);
460
461 loop {
462 if options
463 .max_ingress_messages
464 .is_some_and(|limit| stats.ingress_messages >= limit)
465 {
466 break;
467 }
468
469 stats.ingress.refresh(ingress);
470 let Some(n) = ingress.read_next(&mut input_buf).map_err(|error| {
471 stats.ingress.record_error(&error);
472 AdapterLoopError::adapter(error, stats.clone())
473 })?
474 else {
475 break;
476 };
477 stats.ingress_messages = stats.ingress_messages.saturating_add(1);
478 stats.ingress.message_count = stats.ingress_messages;
479 stats.ingress.refresh(ingress);
480
481 let outputs = byteor_pipeline_exec::run_single_ring_mem(
482 single_ring,
483 resolver,
484 &[input_buf[..n].to_vec()],
485 )
486 .map_err(|error| AdapterLoopError::exec(error, stats.clone()))?;
487
488 for output in outputs {
489 stats.egress.refresh(egress);
490 egress.write_msg(&output).map_err(|error| {
491 stats.egress.record_error(&error);
492 AdapterLoopError::adapter(error, stats.clone())
493 })?;
494 egress.flush().map_err(|error| {
495 stats.egress.record_error(&error);
496 AdapterLoopError::adapter(error, stats.clone())
497 })?;
498 stats.egress_messages = stats.egress_messages.saturating_add(1);
499 stats.egress.message_count = stats.egress_messages;
500 stats.egress.refresh(egress);
501 }
502
503 observe(&stats);
504 }
505
506 stats.ingress.refresh(ingress);
507 stats.egress.refresh(egress);
508 observe(&stats);
509 Ok(stats)
510}