byteor_runtime/
adapter_bundle.rs

1use std::collections::BTreeMap;
2use std::fs::File;
3use std::io::{BufRead, BufReader, Write};
4use std::path::{Path, PathBuf};
5use std::time::Duration;
6
7use byteor_adapters::{
8    EgressAdapter, GrpcEgress, GrpcIngress, HttpEgress, HttpIngress, IngressAdapter, KafkaEgress,
9    KafkaIngress, LineEgress, LineIngress, NatsEgress, NatsIngress, PostgresEgress,
10    PostgresIngress, RabbitMqOffsetSpecification, RabbitMqStreamsEgress, RabbitMqStreamsIngress,
11    RedisStreamsEgress, RedisStreamsIngress, S3Egress, S3Ingress, WebSocketEgress,
12    WebSocketIngress,
13};
14use serde_json::{Map, Value};
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct BundleAdapterBindings {
18    pub config_path: PathBuf,
19    pub ingress_name: String,
20    pub egress_name: String,
21    pub ingress_adapter: String,
22    pub egress_adapter: String,
23    pub ingress: IngressEndpointConfig,
24    pub egress: EgressEndpointConfig,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub enum IngressEndpointConfig {
29    Line {
30        name: String,
31        path: String,
32    },
33    Http {
34        name: String,
35        bind: String,
36        path: String,
37        max_body_bytes: usize,
38    },
39    Grpc {
40        name: String,
41        bind: String,
42        max_message_bytes: usize,
43    },
44    WebSocket {
45        name: String,
46        bind: String,
47        path: String,
48        max_message_bytes: usize,
49    },
50    Kafka {
51        name: String,
52        brokers: String,
53        group_id: String,
54        topic: String,
55        poll_timeout_ms: u64,
56        max_message_bytes: usize,
57    },
58    Nats {
59        name: String,
60        server_url: String,
61        subject: String,
62        queue_group: Option<String>,
63        read_timeout_ms: u64,
64        max_message_bytes: usize,
65    },
66    RedisStreams {
67        name: String,
68        redis_url: String,
69        stream: String,
70        start_id: String,
71        read_timeout_ms: u64,
72        max_message_bytes: usize,
73    },
74    RabbitMqStreams {
75        name: String,
76        server_url: String,
77        stream: String,
78        consumer_name: Option<String>,
79        offset_spec: RabbitMqOffsetSpecification,
80        read_timeout_ms: u64,
81        max_message_bytes: usize,
82    },
83    S3 {
84        name: String,
85        bucket: String,
86        prefix: Option<String>,
87        region: String,
88        endpoint: Option<String>,
89        poll_interval_ms: u64,
90        max_object_bytes: usize,
91    },
92    Postgres {
93        name: String,
94        dsn: String,
95        query: String,
96        poll_interval_ms: u64,
97        max_row_bytes: usize,
98        row_format: Option<String>,
99    },
100}
101
102impl IngressEndpointConfig {
103    pub fn label(&self) -> String {
104        match self {
105            Self::Line { path, .. } => line_endpoint_label(path),
106            Self::Http { bind, path, .. } => format!("{bind}{path}"),
107            Self::Grpc { bind, .. } => bind.clone(),
108            Self::WebSocket { bind, path, .. } => format!("ws://{bind}{path}"),
109            Self::Kafka { topic, .. } => topic.clone(),
110            Self::Nats { subject, .. } => subject.clone(),
111            Self::RedisStreams { stream, .. } => stream.clone(),
112            Self::RabbitMqStreams { stream, .. } => stream.clone(),
113            Self::S3 { bucket, prefix, .. } => {
114                format!("{bucket}/{}", prefix.clone().unwrap_or_default())
115            }
116            Self::Postgres { query, .. } => query.clone(),
117        }
118    }
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub enum EgressEndpointConfig {
123    Line {
124        name: String,
125        path: String,
126    },
127    Http {
128        name: String,
129        url: String,
130        method: String,
131        headers: Vec<(String, String)>,
132        timeout_ms: u64,
133        max_retries: usize,
134    },
135    Grpc {
136        name: String,
137        endpoint: String,
138        timeout_ms: u64,
139    },
140    WebSocket {
141        name: String,
142        url: String,
143        headers: Vec<(String, String)>,
144        connect_timeout_ms: u64,
145        max_retries: usize,
146    },
147    Kafka {
148        name: String,
149        brokers: String,
150        topic: String,
151        flush_timeout_ms: u64,
152    },
153    Nats {
154        name: String,
155        server_url: String,
156        subject: String,
157    },
158    RedisStreams {
159        name: String,
160        redis_url: String,
161        stream: String,
162    },
163    RabbitMqStreams {
164        name: String,
165        server_url: String,
166        stream: String,
167        publisher_name: Option<String>,
168        flush_timeout_ms: u64,
169    },
170    S3 {
171        name: String,
172        bucket: String,
173        prefix: Option<String>,
174        region: String,
175        endpoint: Option<String>,
176        content_type: Option<String>,
177    },
178    Postgres {
179        name: String,
180        dsn: String,
181        table: String,
182        mode: String,
183        conflict_key: Option<String>,
184    },
185}
186
187impl EgressEndpointConfig {
188    pub fn label(&self) -> String {
189        match self {
190            Self::Line { path, .. } => line_endpoint_label(path),
191            Self::Http { url, .. } => url.clone(),
192            Self::Grpc { endpoint, .. } => endpoint.clone(),
193            Self::WebSocket { url, .. } => url.clone(),
194            Self::Kafka { topic, .. } => topic.clone(),
195            Self::Nats { subject, .. } => subject.clone(),
196            Self::RedisStreams { stream, .. } => stream.clone(),
197            Self::RabbitMqStreams { stream, .. } => stream.clone(),
198            Self::S3 { bucket, prefix, .. } => {
199                format!("{bucket}/{}", prefix.clone().unwrap_or_default())
200            }
201            Self::Postgres { table, .. } => table.clone(),
202        }
203    }
204}
205
206pub fn default_bundle_config_path(spec_path: &Path) -> Option<PathBuf> {
207    let path = spec_path.parent()?.join("config.json");
208    path.is_file().then_some(path)
209}
210
211pub fn load_single_ring_bundle_adapter_bindings(
212    spec_path: &Path,
213    config_path: Option<&Path>,
214) -> Result<Option<BundleAdapterBindings>, String> {
215    let Some(config_path) = config_path
216        .map(Path::to_path_buf)
217        .or_else(|| default_bundle_config_path(spec_path))
218    else {
219        return Ok(None);
220    };
221
222    let config_text = std::fs::read_to_string(&config_path)
223        .map_err(|error| format!("read config.json {}: {error}", config_path.display()))?;
224    let config_json: Value = serde_json::from_str(&config_text)
225        .map_err(|error| format!("parse config.json {}: {error}", config_path.display()))?;
226    validate_bundle_version(&config_json, &config_path)?;
227    let endpoints = config_json
228        .get("endpoints")
229        .and_then(Value::as_object)
230        .ok_or_else(|| format!("config.json {} is missing endpoints", config_path.display()))?;
231
232    let mut ingress = Vec::new();
233    let mut egress = Vec::new();
234    for (name, endpoint) in endpoints {
235        let endpoint_object = endpoint.as_object().ok_or_else(|| {
236            format!(
237                "config.json endpoint {name} must be an object in {}",
238                config_path.display()
239            )
240        })?;
241        let endpoint_kind = endpoint_string(endpoint_object, &["endpointKind", "endpoint_kind"])
242            .ok_or_else(|| {
243                format!(
244                    "config.json endpoint {name} is missing endpointKind in {}",
245                    config_path.display()
246                )
247            })?;
248        match endpoint_kind {
249            "ingress" => ingress.push((name.clone(), endpoint_object)),
250            "egress" => egress.push((name.clone(), endpoint_object)),
251            other => {
252                return Err(format!(
253                    "config.json endpoint {name} has unsupported endpointKind={other} in {}",
254                    config_path.display()
255                ));
256            }
257        }
258    }
259
260    if ingress.len() != 1 || egress.len() != 1 {
261        return Err(format!(
262            "adapter-backed SingleRing deploy bundles require exactly one ingress and one egress endpoint in {}",
263            config_path.display()
264        ));
265    }
266
267    let (ingress_name, ingress_object) = ingress.into_iter().next().expect("one ingress");
268    let (egress_name, egress_object) = egress.into_iter().next().expect("one egress");
269    let (ingress_adapter, ingress_config) = parse_ingress_endpoint(&ingress_name, ingress_object)?;
270    let (egress_adapter, egress_config) = parse_egress_endpoint(&egress_name, egress_object)?;
271
272    Ok(Some(BundleAdapterBindings {
273        config_path,
274        ingress_name,
275        egress_name,
276        ingress_adapter,
277        egress_adapter,
278        ingress: ingress_config,
279        egress: egress_config,
280    }))
281}
282
283pub fn open_ingress_endpoint(
284    config: &IngressEndpointConfig,
285) -> Result<Box<dyn IngressAdapter>, String> {
286    match config {
287        IngressEndpointConfig::Line { name, path } => {
288            Ok(Box::new(open_line_input(name.as_str(), path.as_str())?))
289        }
290        IngressEndpointConfig::Http {
291            name,
292            bind,
293            path,
294            max_body_bytes,
295        } => HttpIngress::bind(name.clone(), bind, path.clone(), *max_body_bytes)
296            .map(|adapter| Box::new(adapter) as Box<dyn IngressAdapter>)
297            .map_err(|error| format!("open http ingress {bind}{path}: {error}")),
298        IngressEndpointConfig::Grpc {
299            name,
300            bind,
301            max_message_bytes,
302        } => GrpcIngress::bind(name.clone(), bind, *max_message_bytes)
303            .map(|adapter| Box::new(adapter) as Box<dyn IngressAdapter>)
304            .map_err(|error| format!("open grpc ingress {bind}: {error}")),
305        IngressEndpointConfig::WebSocket {
306            name,
307            bind,
308            path,
309            max_message_bytes,
310        } => WebSocketIngress::bind(name.clone(), bind, path.clone(), *max_message_bytes)
311            .map(|adapter| Box::new(adapter) as Box<dyn IngressAdapter>)
312            .map_err(|error| format!("open websocket ingress {bind}{path}: {error}")),
313        IngressEndpointConfig::Kafka {
314            name,
315            brokers,
316            group_id,
317            topic,
318            poll_timeout_ms,
319            max_message_bytes,
320        } => KafkaIngress::new(
321            name.clone(),
322            brokers,
323            group_id,
324            topic,
325            Duration::from_millis(*poll_timeout_ms),
326            *max_message_bytes,
327        )
328        .map(|adapter| Box::new(adapter) as Box<dyn IngressAdapter>)
329        .map_err(|error| format!("open kafka ingress {topic}: {error}")),
330        IngressEndpointConfig::Nats {
331            name,
332            server_url,
333            subject,
334            queue_group,
335            read_timeout_ms,
336            max_message_bytes,
337        } => NatsIngress::new(
338            name.clone(),
339            server_url,
340            subject,
341            queue_group.as_deref(),
342            Duration::from_millis(*read_timeout_ms),
343            *max_message_bytes,
344        )
345        .map(|adapter| Box::new(adapter) as Box<dyn IngressAdapter>)
346        .map_err(|error| format!("open nats ingress {subject}: {error}")),
347        IngressEndpointConfig::RedisStreams {
348            name,
349            redis_url,
350            stream,
351            start_id,
352            read_timeout_ms,
353            max_message_bytes,
354        } => RedisStreamsIngress::new(
355            name.clone(),
356            redis_url,
357            stream,
358            start_id,
359            Duration::from_millis(*read_timeout_ms),
360            *max_message_bytes,
361        )
362        .map(|adapter| Box::new(adapter) as Box<dyn IngressAdapter>)
363        .map_err(|error| format!("open redis streams ingress {stream}: {error}")),
364        IngressEndpointConfig::RabbitMqStreams {
365            name,
366            server_url,
367            stream,
368            consumer_name,
369            offset_spec,
370            read_timeout_ms,
371            max_message_bytes,
372        } => RabbitMqStreamsIngress::new(
373            name.clone(),
374            server_url,
375            stream,
376            consumer_name.as_deref(),
377            offset_spec.clone(),
378            Duration::from_millis(*read_timeout_ms),
379            *max_message_bytes,
380        )
381        .map(|adapter| Box::new(adapter) as Box<dyn IngressAdapter>)
382        .map_err(|error| format!("open rabbitmq streams ingress {stream}: {error}")),
383        IngressEndpointConfig::S3 {
384            name,
385            bucket,
386            prefix,
387            region,
388            endpoint,
389            poll_interval_ms,
390            max_object_bytes,
391        } => S3Ingress::new(
392            name.clone(),
393            bucket,
394            prefix.as_deref(),
395            region,
396            endpoint.as_deref(),
397            Duration::from_millis(*poll_interval_ms),
398            *max_object_bytes,
399        )
400        .map(|adapter| Box::new(adapter) as Box<dyn IngressAdapter>)
401        .map_err(|error| format!("open s3 ingress {bucket}: {error}")),
402        IngressEndpointConfig::Postgres {
403            name,
404            dsn,
405            query,
406            poll_interval_ms,
407            max_row_bytes,
408            row_format,
409        } => PostgresIngress::new(
410            name.clone(),
411            dsn,
412            query,
413            Duration::from_millis(*poll_interval_ms),
414            *max_row_bytes,
415            row_format.as_deref(),
416        )
417        .map(|adapter| Box::new(adapter) as Box<dyn IngressAdapter>)
418        .map_err(|error| format!("open postgres ingress {name}: {error}")),
419    }
420}
421
422pub fn open_egress_endpoint(
423    config: &EgressEndpointConfig,
424) -> Result<Box<dyn EgressAdapter>, String> {
425    match config {
426        EgressEndpointConfig::Line { name, path } => {
427            Ok(Box::new(open_line_output(name.as_str(), path.as_str())?))
428        }
429        EgressEndpointConfig::Http {
430            name,
431            url,
432            method,
433            headers,
434            timeout_ms,
435            max_retries,
436        } => HttpEgress::with_method(
437            name.clone(),
438            url.clone(),
439            method.clone(),
440            headers.clone(),
441            Duration::from_millis(*timeout_ms),
442            *max_retries,
443        )
444        .map(|adapter| Box::new(adapter) as Box<dyn EgressAdapter>)
445        .map_err(|error| format!("open http egress {url}: {error}")),
446        EgressEndpointConfig::Grpc {
447            name,
448            endpoint,
449            timeout_ms,
450        } => GrpcEgress::new(
451            name.clone(),
452            endpoint.clone(),
453            Duration::from_millis(*timeout_ms),
454        )
455        .map(|adapter| Box::new(adapter) as Box<dyn EgressAdapter>)
456        .map_err(|error| format!("open grpc egress {endpoint}: {error}")),
457        EgressEndpointConfig::WebSocket {
458            name,
459            url,
460            headers,
461            connect_timeout_ms,
462            max_retries,
463        } => WebSocketEgress::new(
464            name.clone(),
465            url.clone(),
466            headers.clone(),
467            Duration::from_millis(*connect_timeout_ms),
468            *max_retries,
469        )
470        .map(|adapter| Box::new(adapter) as Box<dyn EgressAdapter>)
471        .map_err(|error| format!("open websocket egress {url}: {error}")),
472        EgressEndpointConfig::Kafka {
473            name,
474            brokers,
475            topic,
476            flush_timeout_ms,
477        } => KafkaEgress::new(
478            name.clone(),
479            brokers,
480            topic,
481            Duration::from_millis(*flush_timeout_ms),
482        )
483        .map(|adapter| Box::new(adapter) as Box<dyn EgressAdapter>)
484        .map_err(|error| format!("open kafka egress {topic}: {error}")),
485        EgressEndpointConfig::Nats {
486            name,
487            server_url,
488            subject,
489        } => NatsEgress::new(name.clone(), server_url, subject)
490            .map(|adapter| Box::new(adapter) as Box<dyn EgressAdapter>)
491            .map_err(|error| format!("open nats egress {subject}: {error}")),
492        EgressEndpointConfig::RedisStreams {
493            name,
494            redis_url,
495            stream,
496        } => RedisStreamsEgress::new(name.clone(), redis_url, stream)
497            .map(|adapter| Box::new(adapter) as Box<dyn EgressAdapter>)
498            .map_err(|error| format!("open redis streams egress {stream}: {error}")),
499        EgressEndpointConfig::RabbitMqStreams {
500            name,
501            server_url,
502            stream,
503            publisher_name,
504            flush_timeout_ms,
505        } => RabbitMqStreamsEgress::new(
506            name.clone(),
507            server_url,
508            stream,
509            publisher_name.as_deref(),
510            Duration::from_millis(*flush_timeout_ms),
511        )
512        .map(|adapter| Box::new(adapter) as Box<dyn EgressAdapter>)
513        .map_err(|error| format!("open rabbitmq streams egress {stream}: {error}")),
514        EgressEndpointConfig::S3 {
515            name,
516            bucket,
517            prefix,
518            region,
519            endpoint,
520            content_type,
521        } => S3Egress::new(
522            name.clone(),
523            bucket,
524            prefix.as_deref(),
525            region,
526            endpoint.as_deref(),
527            content_type.as_deref(),
528        )
529        .map(|adapter| Box::new(adapter) as Box<dyn EgressAdapter>)
530        .map_err(|error| format!("open s3 egress {bucket}: {error}")),
531        EgressEndpointConfig::Postgres {
532            name,
533            dsn,
534            table,
535            mode,
536            conflict_key,
537        } => PostgresEgress::new(name.clone(), dsn, table, mode, conflict_key.as_deref())
538            .map(|adapter| Box::new(adapter) as Box<dyn EgressAdapter>)
539            .map_err(|error| format!("open postgres egress {table}: {error}")),
540    }
541}
542
543fn parse_ingress_endpoint(
544    name: &str,
545    endpoint: &Map<String, Value>,
546) -> Result<(String, IngressEndpointConfig), String> {
547    let adapter_type = endpoint_string(endpoint, &["adapterType", "adapter_type"])
548        .ok_or_else(|| format!("config.json endpoint {name} is missing adapterType"))?;
549    let fields = endpoint_fields(endpoint)?;
550
551    match adapter_type {
552        "line_ingress" => Ok((
553            "line".to_string(),
554            IngressEndpointConfig::Line {
555                name: name.to_string(),
556                path: required_string(&fields, &["path", "source_path"], name, adapter_type)?,
557            },
558        )),
559        "http_ingress" => Ok((
560            "http".to_string(),
561            IngressEndpointConfig::Http {
562                name: name.to_string(),
563                bind: required_string(&fields, &["bind", "bind_address"], name, adapter_type)?,
564                path: optional_string(&fields, &["path", "route_path"])
565                    .unwrap_or_else(|| "/ingest".to_string()),
566                max_body_bytes: optional_usize_checked(
567                    &fields,
568                    &["max_body_bytes", "max_message_bytes"],
569                    name,
570                    adapter_type,
571                )?
572                .unwrap_or(1024 * 1024),
573            },
574        )),
575        "grpc_ingress" => Ok((
576            "grpc".to_string(),
577            IngressEndpointConfig::Grpc {
578                name: name.to_string(),
579                bind: required_string(&fields, &["bind", "bind_address"], name, adapter_type)?,
580                max_message_bytes: optional_usize_checked(
581                    &fields,
582                    &["max_message_bytes"],
583                    name,
584                    adapter_type,
585                )?
586                .unwrap_or(1024 * 1024),
587            },
588        )),
589        "websocket_ingress" => Ok((
590            "websocket".to_string(),
591            IngressEndpointConfig::WebSocket {
592                name: name.to_string(),
593                bind: required_string(&fields, &["bind", "bind_address"], name, adapter_type)?,
594                path: optional_string(&fields, &["path", "route_path"])
595                    .unwrap_or_else(|| "/".to_string()),
596                max_message_bytes: optional_usize_checked(
597                    &fields,
598                    &["max_message_bytes"],
599                    name,
600                    adapter_type,
601                )?
602                .unwrap_or(1024 * 1024),
603            },
604        )),
605        "kafka_ingress" => Ok((
606            "kafka".to_string(),
607            IngressEndpointConfig::Kafka {
608                name: name.to_string(),
609                brokers: required_string(
610                    &fields,
611                    &["brokers", "broker_endpoints"],
612                    name,
613                    adapter_type,
614                )?,
615                group_id: required_string(
616                    &fields,
617                    &["group_id", "consumer_group", "topic_and_consumer_group"],
618                    name,
619                    adapter_type,
620                )?,
621                topic: required_string(&fields, &["topic"], name, adapter_type)?,
622                poll_timeout_ms: optional_u64_checked(
623                    &fields,
624                    &["poll_timeout_ms"],
625                    name,
626                    adapter_type,
627                )?
628                .unwrap_or(1_000),
629                max_message_bytes: optional_usize_checked(
630                    &fields,
631                    &["max_message_bytes"],
632                    name,
633                    adapter_type,
634                )?
635                .unwrap_or(1024 * 1024),
636            },
637        )),
638        "nats_ingress" => Ok((
639            "nats".to_string(),
640            IngressEndpointConfig::Nats {
641                name: name.to_string(),
642                server_url: required_string(&fields, &["server_url"], name, adapter_type)?,
643                subject: required_string(&fields, &["subject"], name, adapter_type)?,
644                queue_group: optional_string(&fields, &["queue_group"]),
645                read_timeout_ms: optional_u64_checked(
646                    &fields,
647                    &["read_timeout_ms"],
648                    name,
649                    adapter_type,
650                )?
651                .unwrap_or(1_000),
652                max_message_bytes: optional_usize_checked(
653                    &fields,
654                    &["max_message_bytes"],
655                    name,
656                    adapter_type,
657                )?
658                .unwrap_or(1024 * 1024),
659            },
660        )),
661        "redis_streams_ingress" => Ok((
662            "redis_streams".to_string(),
663            IngressEndpointConfig::RedisStreams {
664                name: name.to_string(),
665                redis_url: required_string(&fields, &["redis_url"], name, adapter_type)?,
666                stream: required_string(&fields, &["stream", "stream_key"], name, adapter_type)?,
667                start_id: optional_string(&fields, &["start_id"])
668                    .unwrap_or_else(|| "$".to_string()),
669                read_timeout_ms: optional_u64_checked(
670                    &fields,
671                    &["read_timeout_ms"],
672                    name,
673                    adapter_type,
674                )?
675                .unwrap_or(1_000),
676                max_message_bytes: optional_usize_checked(
677                    &fields,
678                    &["max_message_bytes"],
679                    name,
680                    adapter_type,
681                )?
682                .unwrap_or(1024 * 1024),
683            },
684        )),
685        "rabbitmq_streams_ingress" => Ok((
686            "rabbitmq_streams".to_string(),
687            IngressEndpointConfig::RabbitMqStreams {
688                name: name.to_string(),
689                server_url: required_string(
690                    &fields,
691                    &["server_url", "broker_endpoint"],
692                    name,
693                    adapter_type,
694                )?,
695                stream: required_string(&fields, &["stream", "stream_name"], name, adapter_type)?,
696                consumer_name: optional_string(&fields, &["consumer_name"]),
697                offset_spec: parse_rabbitmq_offset(
698                    optional_string(&fields, &["offset_spec"]).as_deref(),
699                )?,
700                read_timeout_ms: optional_u64_checked(
701                    &fields,
702                    &["read_timeout_ms"],
703                    name,
704                    adapter_type,
705                )?
706                .unwrap_or(1_000),
707                max_message_bytes: optional_usize_checked(
708                    &fields,
709                    &["max_message_bytes"],
710                    name,
711                    adapter_type,
712                )?
713                .unwrap_or(1024 * 1024),
714            },
715        )),
716        "s3_ingress" => {
717            let (region, endpoint) = parse_region_endpoint(&fields, name, adapter_type)?;
718            Ok((
719                "s3".to_string(),
720                IngressEndpointConfig::S3 {
721                    name: name.to_string(),
722                    bucket: required_string(&fields, &["bucket"], name, adapter_type)?,
723                    prefix: optional_string(&fields, &["prefix"]),
724                    region,
725                    endpoint,
726                    poll_interval_ms: optional_u64_checked(
727                        &fields,
728                        &["poll_interval_ms"],
729                        name,
730                        adapter_type,
731                    )?
732                    .unwrap_or(5_000),
733                    max_object_bytes: optional_usize_checked(
734                        &fields,
735                        &["max_object_bytes"],
736                        name,
737                        adapter_type,
738                    )?
739                    .unwrap_or(1024 * 1024),
740                },
741            ))
742        }
743        "postgres_ingress" => Ok((
744            "postgres".to_string(),
745            IngressEndpointConfig::Postgres {
746                name: name.to_string(),
747                dsn: required_string(&fields, &["dsn", "connection_target"], name, adapter_type)?,
748                query: required_string(&fields, &["query", "sql_query"], name, adapter_type)?,
749                poll_interval_ms: optional_u64_checked(
750                    &fields,
751                    &["poll_interval_ms"],
752                    name,
753                    adapter_type,
754                )?
755                .unwrap_or(5_000),
756                max_row_bytes: optional_usize_checked(
757                    &fields,
758                    &["max_row_bytes"],
759                    name,
760                    adapter_type,
761                )?
762                .unwrap_or(1024 * 1024),
763                row_format: optional_string(&fields, &["row_format"]),
764            },
765        )),
766        other => Err(format!(
767            "config.json endpoint {name} uses unsupported ingress adapterType={other}"
768        )),
769    }
770}
771
772fn parse_egress_endpoint(
773    name: &str,
774    endpoint: &Map<String, Value>,
775) -> Result<(String, EgressEndpointConfig), String> {
776    let adapter_type = endpoint_string(endpoint, &["adapterType", "adapter_type"])
777        .ok_or_else(|| format!("config.json endpoint {name} is missing adapterType"))?;
778    let fields = endpoint_fields(endpoint)?;
779
780    match adapter_type {
781        "line_egress" => Ok((
782            "line".to_string(),
783            EgressEndpointConfig::Line {
784                name: name.to_string(),
785                path: required_string(&fields, &["path", "sink_path"], name, adapter_type)?,
786            },
787        )),
788        "http_egress" => Ok((
789            "http".to_string(),
790            EgressEndpointConfig::Http {
791                name: name.to_string(),
792                url: required_string(&fields, &["url", "url_or_bind_address"], name, adapter_type)?,
793                method: optional_string(&fields, &["method", "route_or_method"])
794                    .unwrap_or_else(|| "POST".to_string())
795                    .to_ascii_uppercase(),
796                headers: resolved_http_headers(&fields),
797                timeout_ms: optional_u64_checked(
798                    &fields,
799                    &["timeout_ms", "deadline_ms"],
800                    name,
801                    adapter_type,
802                )?
803                .unwrap_or(10_000),
804                max_retries: optional_usize_checked(
805                    &fields,
806                    &["max_retries", "retry_budget"],
807                    name,
808                    adapter_type,
809                )?
810                .unwrap_or(0),
811            },
812        )),
813        "grpc_egress" => Ok((
814            "grpc".to_string(),
815            EgressEndpointConfig::Grpc {
816                name: name.to_string(),
817                endpoint: required_string(&fields, &["endpoint", "target"], name, adapter_type)?,
818                timeout_ms: optional_u64_checked(
819                    &fields,
820                    &["timeout_ms", "deadline_ms"],
821                    name,
822                    adapter_type,
823                )?
824                .unwrap_or(8_000),
825            },
826        )),
827        "websocket_egress" => Ok((
828            "websocket".to_string(),
829            EgressEndpointConfig::WebSocket {
830                name: name.to_string(),
831                url: required_string(&fields, &["url"], name, adapter_type)?,
832                headers: resolved_http_headers(&fields),
833                connect_timeout_ms: optional_u64_checked(
834                    &fields,
835                    &["connect_timeout_ms", "reconnect_seconds"],
836                    name,
837                    adapter_type,
838                )?
839                .map(|value| {
840                    if fields.contains_key("reconnect_seconds") {
841                        value.saturating_mul(1_000)
842                    } else {
843                        value
844                    }
845                })
846                .unwrap_or(5_000),
847                max_retries: optional_usize_checked(
848                    &fields,
849                    &["max_retries"],
850                    name,
851                    adapter_type,
852                )?
853                .unwrap_or(0),
854            },
855        )),
856        "kafka_egress" => Ok((
857            "kafka".to_string(),
858            EgressEndpointConfig::Kafka {
859                name: name.to_string(),
860                brokers: required_string(
861                    &fields,
862                    &["brokers", "broker_endpoints"],
863                    name,
864                    adapter_type,
865                )?,
866                topic: required_string(&fields, &["topic"], name, adapter_type)?,
867                flush_timeout_ms: optional_u64_checked(
868                    &fields,
869                    &["flush_timeout_ms"],
870                    name,
871                    adapter_type,
872                )?
873                .unwrap_or(1_000),
874            },
875        )),
876        "nats_egress" => Ok((
877            "nats".to_string(),
878            EgressEndpointConfig::Nats {
879                name: name.to_string(),
880                server_url: required_string(&fields, &["server_url"], name, adapter_type)?,
881                subject: required_string(&fields, &["subject"], name, adapter_type)?,
882            },
883        )),
884        "redis_streams_egress" => Ok((
885            "redis_streams".to_string(),
886            EgressEndpointConfig::RedisStreams {
887                name: name.to_string(),
888                redis_url: required_string(&fields, &["redis_url"], name, adapter_type)?,
889                stream: required_string(&fields, &["stream", "stream_key"], name, adapter_type)?,
890            },
891        )),
892        "rabbitmq_streams_egress" => Ok((
893            "rabbitmq_streams".to_string(),
894            EgressEndpointConfig::RabbitMqStreams {
895                name: name.to_string(),
896                server_url: required_string(
897                    &fields,
898                    &["server_url", "broker_endpoint"],
899                    name,
900                    adapter_type,
901                )?,
902                stream: required_string(&fields, &["stream", "stream_name"], name, adapter_type)?,
903                publisher_name: optional_string(&fields, &["publisher_name"]),
904                flush_timeout_ms: optional_u64_checked(
905                    &fields,
906                    &["flush_timeout_ms", "confirm_timeout_ms"],
907                    name,
908                    adapter_type,
909                )?
910                .unwrap_or(5_000),
911            },
912        )),
913        "s3_egress" => {
914            let (region, endpoint) = parse_region_endpoint(&fields, name, adapter_type)?;
915            Ok((
916                "s3".to_string(),
917                EgressEndpointConfig::S3 {
918                    name: name.to_string(),
919                    bucket: required_string(&fields, &["bucket"], name, adapter_type)?,
920                    prefix: optional_string(&fields, &["prefix"]),
921                    region,
922                    endpoint,
923                    content_type: optional_string(&fields, &["content_type"]),
924                },
925            ))
926        }
927        "postgres_egress" => Ok((
928            "postgres".to_string(),
929            EgressEndpointConfig::Postgres {
930                name: name.to_string(),
931                dsn: required_string(&fields, &["dsn", "connection_target"], name, adapter_type)?,
932                table: required_string(&fields, &["table", "target_table"], name, adapter_type)?,
933                mode: optional_string(&fields, &["mode", "write_mode"])
934                    .unwrap_or_else(|| "insert".to_string()),
935                conflict_key: optional_string(&fields, &["conflict_key"]),
936            },
937        )),
938        other => Err(format!(
939            "config.json endpoint {name} uses unsupported egress adapterType={other}"
940        )),
941    }
942}
943
944fn endpoint_fields(endpoint: &Map<String, Value>) -> Result<Map<String, Value>, String> {
945    Ok(endpoint
946        .get("fields")
947        .and_then(Value::as_object)
948        .cloned()
949        .unwrap_or_else(Map::new))
950}
951
952fn endpoint_string<'a>(endpoint: &'a Map<String, Value>, keys: &[&str]) -> Option<&'a str> {
953    keys.iter()
954        .find_map(|key| endpoint.get(*key).and_then(Value::as_str))
955}
956
957fn validate_bundle_version(config_json: &Value, config_path: &Path) -> Result<(), String> {
958    match parse_u64(config_json.get("version")) {
959        Some(1) => Ok(()),
960        Some(other) => Err(format!(
961            "config.json {} has unsupported version={other}; expected version=1",
962            config_path.display()
963        )),
964        None => Err(format!(
965            "config.json {} is missing version=1",
966            config_path.display()
967        )),
968    }
969}
970
971fn required_string(
972    fields: &Map<String, Value>,
973    keys: &[&str],
974    endpoint_name: &str,
975    adapter_type: &str,
976) -> Result<String, String> {
977    optional_string(fields, keys).ok_or_else(|| {
978        format!(
979            "config.json endpoint {endpoint_name} ({adapter_type}) is missing one of: {}",
980            keys.join(", ")
981        )
982    })
983}
984
985fn optional_string(fields: &Map<String, Value>, keys: &[&str]) -> Option<String> {
986    keys.iter().find_map(|key| {
987        fields
988            .get(*key)
989            .and_then(Value::as_str)
990            .map(str::trim)
991            .filter(|value| !value.is_empty())
992            .map(ToOwned::to_owned)
993    })
994}
995
996fn optional_u64_checked(
997    fields: &Map<String, Value>,
998    keys: &[&str],
999    endpoint_name: &str,
1000    adapter_type: &str,
1001) -> Result<Option<u64>, String> {
1002    for key in keys {
1003        if let Some(value) = fields.get(*key) {
1004            return parse_u64(Some(value)).map(Some).ok_or_else(|| {
1005                format!(
1006                    "config.json endpoint {endpoint_name} ({adapter_type}) has invalid {key}: expected unsigned integer"
1007                )
1008            });
1009        }
1010    }
1011    Ok(None)
1012}
1013
1014fn optional_usize_checked(
1015    fields: &Map<String, Value>,
1016    keys: &[&str],
1017    endpoint_name: &str,
1018    adapter_type: &str,
1019) -> Result<Option<usize>, String> {
1020    for key in keys {
1021        if let Some(value) = fields.get(*key) {
1022            return parse_usize(Some(value)).map(Some).ok_or_else(|| {
1023                format!(
1024                    "config.json endpoint {endpoint_name} ({adapter_type}) has invalid {key}: expected unsigned integer"
1025                )
1026            });
1027        }
1028    }
1029    Ok(None)
1030}
1031
1032fn parse_u64(value: Option<&Value>) -> Option<u64> {
1033    match value? {
1034        Value::Number(number) => number.as_u64(),
1035        Value::String(text) => text.trim().parse().ok(),
1036        _ => None,
1037    }
1038}
1039
1040fn parse_usize(value: Option<&Value>) -> Option<usize> {
1041    parse_u64(value).and_then(|value| usize::try_from(value).ok())
1042}
1043
1044fn parse_rabbitmq_offset(value: Option<&str>) -> Result<RabbitMqOffsetSpecification, String> {
1045    match value.unwrap_or("next").trim().to_ascii_lowercase().as_str() {
1046        "first" => Ok(RabbitMqOffsetSpecification::First),
1047        "last" => Ok(RabbitMqOffsetSpecification::Last),
1048        "next" => Ok(RabbitMqOffsetSpecification::Next),
1049        other => Err(format!(
1050            "unsupported rabbitmq_streams ingress offset_spec={other}; expected first, last, or next"
1051        )),
1052    }
1053}
1054
1055fn parse_region_endpoint(
1056    fields: &Map<String, Value>,
1057    endpoint_name: &str,
1058    adapter_type: &str,
1059) -> Result<(String, Option<String>), String> {
1060    if let Some(region) = optional_string(fields, &["region"]) {
1061        return Ok((region, optional_string(fields, &["endpoint"])));
1062    }
1063
1064    let region_or_endpoint =
1065        required_string(fields, &["region_or_endpoint"], endpoint_name, adapter_type)?;
1066    if region_or_endpoint.starts_with("http://") || region_or_endpoint.starts_with("https://") {
1067        Ok(("us-east-1".to_string(), Some(region_or_endpoint)))
1068    } else {
1069        Ok((region_or_endpoint, None))
1070    }
1071}
1072
1073fn resolved_http_headers(fields: &Map<String, Value>) -> Vec<(String, String)> {
1074    let mut headers = BTreeMap::new();
1075    if let Some(Value::Object(map)) = fields.get("headers") {
1076        for (name, value) in map {
1077            if let Some(value) = value.as_str() {
1078                headers.insert(name.clone(), value.to_string());
1079            }
1080        }
1081    }
1082    if let Some(token) = optional_string(fields, &["bearer_token"]) {
1083        headers.insert("Authorization".to_string(), format!("Bearer {token}"));
1084    }
1085    if let Some(key) = optional_string(fields, &["api_key"]) {
1086        headers.insert("X-API-Key".to_string(), key);
1087    }
1088    headers.into_iter().collect()
1089}
1090
1091fn line_endpoint_label(path: &str) -> String {
1092    if path == "-" {
1093        "stdio".to_string()
1094    } else {
1095        Path::new(path)
1096            .file_name()
1097            .and_then(|value| value.to_str())
1098            .unwrap_or("line")
1099            .to_string()
1100    }
1101}
1102
1103fn open_line_input(name: &str, path: &str) -> Result<LineIngress<Box<dyn BufRead>>, String> {
1104    if path == "-" {
1105        return Ok(LineIngress::new(
1106            name,
1107            Box::new(BufReader::new(std::io::stdin())) as Box<dyn BufRead>,
1108        ));
1109    }
1110
1111    let file = File::open(path).map_err(|error| format!("open line input {path}: {error}"))?;
1112    Ok(LineIngress::new(
1113        name,
1114        Box::new(BufReader::new(file)) as Box<dyn BufRead>,
1115    ))
1116}
1117
1118fn open_line_output(name: &str, path: &str) -> Result<LineEgress<Box<dyn Write>>, String> {
1119    if path == "-" {
1120        return Ok(LineEgress::new(
1121            name,
1122            Box::new(std::io::stdout()) as Box<dyn Write>,
1123        ));
1124    }
1125
1126    let file = File::create(path).map_err(|error| format!("open line output {path}: {error}"))?;
1127    Ok(LineEgress::new(name, Box::new(file) as Box<dyn Write>))
1128}