1use std::collections::BTreeMap;
2use std::fs::File;
3use std::io::{BufRead, BufReader, Write};
4use std::path::{Path, PathBuf};
5use std::time::Duration;
6
7use byteor_adapters::{
8 EgressAdapter, GrpcEgress, GrpcIngress, HttpEgress, HttpIngress, IngressAdapter, KafkaEgress,
9 KafkaIngress, LineEgress, LineIngress, NatsEgress, NatsIngress, PostgresEgress,
10 PostgresIngress, RabbitMqOffsetSpecification, RabbitMqStreamsEgress, RabbitMqStreamsIngress,
11 RedisStreamsEgress, RedisStreamsIngress, S3Egress, S3Ingress, WebSocketEgress,
12 WebSocketIngress,
13};
14use serde_json::{Map, Value};
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct BundleAdapterBindings {
18 pub config_path: PathBuf,
19 pub ingress_name: String,
20 pub egress_name: String,
21 pub ingress_adapter: String,
22 pub egress_adapter: String,
23 pub ingress: IngressEndpointConfig,
24 pub egress: EgressEndpointConfig,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub enum IngressEndpointConfig {
29 Line {
30 name: String,
31 path: String,
32 },
33 Http {
34 name: String,
35 bind: String,
36 path: String,
37 max_body_bytes: usize,
38 },
39 Grpc {
40 name: String,
41 bind: String,
42 max_message_bytes: usize,
43 },
44 WebSocket {
45 name: String,
46 bind: String,
47 path: String,
48 max_message_bytes: usize,
49 },
50 Kafka {
51 name: String,
52 brokers: String,
53 group_id: String,
54 topic: String,
55 poll_timeout_ms: u64,
56 max_message_bytes: usize,
57 },
58 Nats {
59 name: String,
60 server_url: String,
61 subject: String,
62 queue_group: Option<String>,
63 read_timeout_ms: u64,
64 max_message_bytes: usize,
65 },
66 RedisStreams {
67 name: String,
68 redis_url: String,
69 stream: String,
70 start_id: String,
71 read_timeout_ms: u64,
72 max_message_bytes: usize,
73 },
74 RabbitMqStreams {
75 name: String,
76 server_url: String,
77 stream: String,
78 consumer_name: Option<String>,
79 offset_spec: RabbitMqOffsetSpecification,
80 read_timeout_ms: u64,
81 max_message_bytes: usize,
82 },
83 S3 {
84 name: String,
85 bucket: String,
86 prefix: Option<String>,
87 region: String,
88 endpoint: Option<String>,
89 poll_interval_ms: u64,
90 max_object_bytes: usize,
91 },
92 Postgres {
93 name: String,
94 dsn: String,
95 query: String,
96 poll_interval_ms: u64,
97 max_row_bytes: usize,
98 row_format: Option<String>,
99 },
100}
101
102impl IngressEndpointConfig {
103 pub fn label(&self) -> String {
104 match self {
105 Self::Line { path, .. } => line_endpoint_label(path),
106 Self::Http { bind, path, .. } => format!("{bind}{path}"),
107 Self::Grpc { bind, .. } => bind.clone(),
108 Self::WebSocket { bind, path, .. } => format!("ws://{bind}{path}"),
109 Self::Kafka { topic, .. } => topic.clone(),
110 Self::Nats { subject, .. } => subject.clone(),
111 Self::RedisStreams { stream, .. } => stream.clone(),
112 Self::RabbitMqStreams { stream, .. } => stream.clone(),
113 Self::S3 { bucket, prefix, .. } => {
114 format!("{bucket}/{}", prefix.clone().unwrap_or_default())
115 }
116 Self::Postgres { query, .. } => query.clone(),
117 }
118 }
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub enum EgressEndpointConfig {
123 Line {
124 name: String,
125 path: String,
126 },
127 Http {
128 name: String,
129 url: String,
130 method: String,
131 headers: Vec<(String, String)>,
132 timeout_ms: u64,
133 max_retries: usize,
134 },
135 Grpc {
136 name: String,
137 endpoint: String,
138 timeout_ms: u64,
139 },
140 WebSocket {
141 name: String,
142 url: String,
143 headers: Vec<(String, String)>,
144 connect_timeout_ms: u64,
145 max_retries: usize,
146 },
147 Kafka {
148 name: String,
149 brokers: String,
150 topic: String,
151 flush_timeout_ms: u64,
152 },
153 Nats {
154 name: String,
155 server_url: String,
156 subject: String,
157 },
158 RedisStreams {
159 name: String,
160 redis_url: String,
161 stream: String,
162 },
163 RabbitMqStreams {
164 name: String,
165 server_url: String,
166 stream: String,
167 publisher_name: Option<String>,
168 flush_timeout_ms: u64,
169 },
170 S3 {
171 name: String,
172 bucket: String,
173 prefix: Option<String>,
174 region: String,
175 endpoint: Option<String>,
176 content_type: Option<String>,
177 },
178 Postgres {
179 name: String,
180 dsn: String,
181 table: String,
182 mode: String,
183 conflict_key: Option<String>,
184 },
185}
186
187impl EgressEndpointConfig {
188 pub fn label(&self) -> String {
189 match self {
190 Self::Line { path, .. } => line_endpoint_label(path),
191 Self::Http { url, .. } => url.clone(),
192 Self::Grpc { endpoint, .. } => endpoint.clone(),
193 Self::WebSocket { url, .. } => url.clone(),
194 Self::Kafka { topic, .. } => topic.clone(),
195 Self::Nats { subject, .. } => subject.clone(),
196 Self::RedisStreams { stream, .. } => stream.clone(),
197 Self::RabbitMqStreams { stream, .. } => stream.clone(),
198 Self::S3 { bucket, prefix, .. } => {
199 format!("{bucket}/{}", prefix.clone().unwrap_or_default())
200 }
201 Self::Postgres { table, .. } => table.clone(),
202 }
203 }
204}
205
206pub fn default_bundle_config_path(spec_path: &Path) -> Option<PathBuf> {
207 let path = spec_path.parent()?.join("config.json");
208 path.is_file().then_some(path)
209}
210
211pub fn load_single_ring_bundle_adapter_bindings(
212 spec_path: &Path,
213 config_path: Option<&Path>,
214) -> Result<Option<BundleAdapterBindings>, String> {
215 let Some(config_path) = config_path
216 .map(Path::to_path_buf)
217 .or_else(|| default_bundle_config_path(spec_path))
218 else {
219 return Ok(None);
220 };
221
222 let config_text = std::fs::read_to_string(&config_path)
223 .map_err(|error| format!("read config.json {}: {error}", config_path.display()))?;
224 let config_json: Value = serde_json::from_str(&config_text)
225 .map_err(|error| format!("parse config.json {}: {error}", config_path.display()))?;
226 validate_bundle_version(&config_json, &config_path)?;
227 let endpoints = config_json
228 .get("endpoints")
229 .and_then(Value::as_object)
230 .ok_or_else(|| format!("config.json {} is missing endpoints", config_path.display()))?;
231
232 let mut ingress = Vec::new();
233 let mut egress = Vec::new();
234 for (name, endpoint) in endpoints {
235 let endpoint_object = endpoint.as_object().ok_or_else(|| {
236 format!(
237 "config.json endpoint {name} must be an object in {}",
238 config_path.display()
239 )
240 })?;
241 let endpoint_kind = endpoint_string(endpoint_object, &["endpointKind", "endpoint_kind"])
242 .ok_or_else(|| {
243 format!(
244 "config.json endpoint {name} is missing endpointKind in {}",
245 config_path.display()
246 )
247 })?;
248 match endpoint_kind {
249 "ingress" => ingress.push((name.clone(), endpoint_object)),
250 "egress" => egress.push((name.clone(), endpoint_object)),
251 other => {
252 return Err(format!(
253 "config.json endpoint {name} has unsupported endpointKind={other} in {}",
254 config_path.display()
255 ));
256 }
257 }
258 }
259
260 if ingress.len() != 1 || egress.len() != 1 {
261 return Err(format!(
262 "adapter-backed SingleRing deploy bundles require exactly one ingress and one egress endpoint in {}",
263 config_path.display()
264 ));
265 }
266
267 let (ingress_name, ingress_object) = ingress.into_iter().next().expect("one ingress");
268 let (egress_name, egress_object) = egress.into_iter().next().expect("one egress");
269 let (ingress_adapter, ingress_config) = parse_ingress_endpoint(&ingress_name, ingress_object)?;
270 let (egress_adapter, egress_config) = parse_egress_endpoint(&egress_name, egress_object)?;
271
272 Ok(Some(BundleAdapterBindings {
273 config_path,
274 ingress_name,
275 egress_name,
276 ingress_adapter,
277 egress_adapter,
278 ingress: ingress_config,
279 egress: egress_config,
280 }))
281}
282
283pub fn open_ingress_endpoint(
284 config: &IngressEndpointConfig,
285) -> Result<Box<dyn IngressAdapter>, String> {
286 match config {
287 IngressEndpointConfig::Line { name, path } => {
288 Ok(Box::new(open_line_input(name.as_str(), path.as_str())?))
289 }
290 IngressEndpointConfig::Http {
291 name,
292 bind,
293 path,
294 max_body_bytes,
295 } => HttpIngress::bind(name.clone(), bind, path.clone(), *max_body_bytes)
296 .map(|adapter| Box::new(adapter) as Box<dyn IngressAdapter>)
297 .map_err(|error| format!("open http ingress {bind}{path}: {error}")),
298 IngressEndpointConfig::Grpc {
299 name,
300 bind,
301 max_message_bytes,
302 } => GrpcIngress::bind(name.clone(), bind, *max_message_bytes)
303 .map(|adapter| Box::new(adapter) as Box<dyn IngressAdapter>)
304 .map_err(|error| format!("open grpc ingress {bind}: {error}")),
305 IngressEndpointConfig::WebSocket {
306 name,
307 bind,
308 path,
309 max_message_bytes,
310 } => WebSocketIngress::bind(name.clone(), bind, path.clone(), *max_message_bytes)
311 .map(|adapter| Box::new(adapter) as Box<dyn IngressAdapter>)
312 .map_err(|error| format!("open websocket ingress {bind}{path}: {error}")),
313 IngressEndpointConfig::Kafka {
314 name,
315 brokers,
316 group_id,
317 topic,
318 poll_timeout_ms,
319 max_message_bytes,
320 } => KafkaIngress::new(
321 name.clone(),
322 brokers,
323 group_id,
324 topic,
325 Duration::from_millis(*poll_timeout_ms),
326 *max_message_bytes,
327 )
328 .map(|adapter| Box::new(adapter) as Box<dyn IngressAdapter>)
329 .map_err(|error| format!("open kafka ingress {topic}: {error}")),
330 IngressEndpointConfig::Nats {
331 name,
332 server_url,
333 subject,
334 queue_group,
335 read_timeout_ms,
336 max_message_bytes,
337 } => NatsIngress::new(
338 name.clone(),
339 server_url,
340 subject,
341 queue_group.as_deref(),
342 Duration::from_millis(*read_timeout_ms),
343 *max_message_bytes,
344 )
345 .map(|adapter| Box::new(adapter) as Box<dyn IngressAdapter>)
346 .map_err(|error| format!("open nats ingress {subject}: {error}")),
347 IngressEndpointConfig::RedisStreams {
348 name,
349 redis_url,
350 stream,
351 start_id,
352 read_timeout_ms,
353 max_message_bytes,
354 } => RedisStreamsIngress::new(
355 name.clone(),
356 redis_url,
357 stream,
358 start_id,
359 Duration::from_millis(*read_timeout_ms),
360 *max_message_bytes,
361 )
362 .map(|adapter| Box::new(adapter) as Box<dyn IngressAdapter>)
363 .map_err(|error| format!("open redis streams ingress {stream}: {error}")),
364 IngressEndpointConfig::RabbitMqStreams {
365 name,
366 server_url,
367 stream,
368 consumer_name,
369 offset_spec,
370 read_timeout_ms,
371 max_message_bytes,
372 } => RabbitMqStreamsIngress::new(
373 name.clone(),
374 server_url,
375 stream,
376 consumer_name.as_deref(),
377 offset_spec.clone(),
378 Duration::from_millis(*read_timeout_ms),
379 *max_message_bytes,
380 )
381 .map(|adapter| Box::new(adapter) as Box<dyn IngressAdapter>)
382 .map_err(|error| format!("open rabbitmq streams ingress {stream}: {error}")),
383 IngressEndpointConfig::S3 {
384 name,
385 bucket,
386 prefix,
387 region,
388 endpoint,
389 poll_interval_ms,
390 max_object_bytes,
391 } => S3Ingress::new(
392 name.clone(),
393 bucket,
394 prefix.as_deref(),
395 region,
396 endpoint.as_deref(),
397 Duration::from_millis(*poll_interval_ms),
398 *max_object_bytes,
399 )
400 .map(|adapter| Box::new(adapter) as Box<dyn IngressAdapter>)
401 .map_err(|error| format!("open s3 ingress {bucket}: {error}")),
402 IngressEndpointConfig::Postgres {
403 name,
404 dsn,
405 query,
406 poll_interval_ms,
407 max_row_bytes,
408 row_format,
409 } => PostgresIngress::new(
410 name.clone(),
411 dsn,
412 query,
413 Duration::from_millis(*poll_interval_ms),
414 *max_row_bytes,
415 row_format.as_deref(),
416 )
417 .map(|adapter| Box::new(adapter) as Box<dyn IngressAdapter>)
418 .map_err(|error| format!("open postgres ingress {name}: {error}")),
419 }
420}
421
422pub fn open_egress_endpoint(
423 config: &EgressEndpointConfig,
424) -> Result<Box<dyn EgressAdapter>, String> {
425 match config {
426 EgressEndpointConfig::Line { name, path } => {
427 Ok(Box::new(open_line_output(name.as_str(), path.as_str())?))
428 }
429 EgressEndpointConfig::Http {
430 name,
431 url,
432 method,
433 headers,
434 timeout_ms,
435 max_retries,
436 } => HttpEgress::with_method(
437 name.clone(),
438 url.clone(),
439 method.clone(),
440 headers.clone(),
441 Duration::from_millis(*timeout_ms),
442 *max_retries,
443 )
444 .map(|adapter| Box::new(adapter) as Box<dyn EgressAdapter>)
445 .map_err(|error| format!("open http egress {url}: {error}")),
446 EgressEndpointConfig::Grpc {
447 name,
448 endpoint,
449 timeout_ms,
450 } => GrpcEgress::new(
451 name.clone(),
452 endpoint.clone(),
453 Duration::from_millis(*timeout_ms),
454 )
455 .map(|adapter| Box::new(adapter) as Box<dyn EgressAdapter>)
456 .map_err(|error| format!("open grpc egress {endpoint}: {error}")),
457 EgressEndpointConfig::WebSocket {
458 name,
459 url,
460 headers,
461 connect_timeout_ms,
462 max_retries,
463 } => WebSocketEgress::new(
464 name.clone(),
465 url.clone(),
466 headers.clone(),
467 Duration::from_millis(*connect_timeout_ms),
468 *max_retries,
469 )
470 .map(|adapter| Box::new(adapter) as Box<dyn EgressAdapter>)
471 .map_err(|error| format!("open websocket egress {url}: {error}")),
472 EgressEndpointConfig::Kafka {
473 name,
474 brokers,
475 topic,
476 flush_timeout_ms,
477 } => KafkaEgress::new(
478 name.clone(),
479 brokers,
480 topic,
481 Duration::from_millis(*flush_timeout_ms),
482 )
483 .map(|adapter| Box::new(adapter) as Box<dyn EgressAdapter>)
484 .map_err(|error| format!("open kafka egress {topic}: {error}")),
485 EgressEndpointConfig::Nats {
486 name,
487 server_url,
488 subject,
489 } => NatsEgress::new(name.clone(), server_url, subject)
490 .map(|adapter| Box::new(adapter) as Box<dyn EgressAdapter>)
491 .map_err(|error| format!("open nats egress {subject}: {error}")),
492 EgressEndpointConfig::RedisStreams {
493 name,
494 redis_url,
495 stream,
496 } => RedisStreamsEgress::new(name.clone(), redis_url, stream)
497 .map(|adapter| Box::new(adapter) as Box<dyn EgressAdapter>)
498 .map_err(|error| format!("open redis streams egress {stream}: {error}")),
499 EgressEndpointConfig::RabbitMqStreams {
500 name,
501 server_url,
502 stream,
503 publisher_name,
504 flush_timeout_ms,
505 } => RabbitMqStreamsEgress::new(
506 name.clone(),
507 server_url,
508 stream,
509 publisher_name.as_deref(),
510 Duration::from_millis(*flush_timeout_ms),
511 )
512 .map(|adapter| Box::new(adapter) as Box<dyn EgressAdapter>)
513 .map_err(|error| format!("open rabbitmq streams egress {stream}: {error}")),
514 EgressEndpointConfig::S3 {
515 name,
516 bucket,
517 prefix,
518 region,
519 endpoint,
520 content_type,
521 } => S3Egress::new(
522 name.clone(),
523 bucket,
524 prefix.as_deref(),
525 region,
526 endpoint.as_deref(),
527 content_type.as_deref(),
528 )
529 .map(|adapter| Box::new(adapter) as Box<dyn EgressAdapter>)
530 .map_err(|error| format!("open s3 egress {bucket}: {error}")),
531 EgressEndpointConfig::Postgres {
532 name,
533 dsn,
534 table,
535 mode,
536 conflict_key,
537 } => PostgresEgress::new(name.clone(), dsn, table, mode, conflict_key.as_deref())
538 .map(|adapter| Box::new(adapter) as Box<dyn EgressAdapter>)
539 .map_err(|error| format!("open postgres egress {table}: {error}")),
540 }
541}
542
543fn parse_ingress_endpoint(
544 name: &str,
545 endpoint: &Map<String, Value>,
546) -> Result<(String, IngressEndpointConfig), String> {
547 let adapter_type = endpoint_string(endpoint, &["adapterType", "adapter_type"])
548 .ok_or_else(|| format!("config.json endpoint {name} is missing adapterType"))?;
549 let fields = endpoint_fields(endpoint)?;
550
551 match adapter_type {
552 "line_ingress" => Ok((
553 "line".to_string(),
554 IngressEndpointConfig::Line {
555 name: name.to_string(),
556 path: required_string(&fields, &["path", "source_path"], name, adapter_type)?,
557 },
558 )),
559 "http_ingress" => Ok((
560 "http".to_string(),
561 IngressEndpointConfig::Http {
562 name: name.to_string(),
563 bind: required_string(&fields, &["bind", "bind_address"], name, adapter_type)?,
564 path: optional_string(&fields, &["path", "route_path"])
565 .unwrap_or_else(|| "/ingest".to_string()),
566 max_body_bytes: optional_usize_checked(
567 &fields,
568 &["max_body_bytes", "max_message_bytes"],
569 name,
570 adapter_type,
571 )?
572 .unwrap_or(1024 * 1024),
573 },
574 )),
575 "grpc_ingress" => Ok((
576 "grpc".to_string(),
577 IngressEndpointConfig::Grpc {
578 name: name.to_string(),
579 bind: required_string(&fields, &["bind", "bind_address"], name, adapter_type)?,
580 max_message_bytes: optional_usize_checked(
581 &fields,
582 &["max_message_bytes"],
583 name,
584 adapter_type,
585 )?
586 .unwrap_or(1024 * 1024),
587 },
588 )),
589 "websocket_ingress" => Ok((
590 "websocket".to_string(),
591 IngressEndpointConfig::WebSocket {
592 name: name.to_string(),
593 bind: required_string(&fields, &["bind", "bind_address"], name, adapter_type)?,
594 path: optional_string(&fields, &["path", "route_path"])
595 .unwrap_or_else(|| "/".to_string()),
596 max_message_bytes: optional_usize_checked(
597 &fields,
598 &["max_message_bytes"],
599 name,
600 adapter_type,
601 )?
602 .unwrap_or(1024 * 1024),
603 },
604 )),
605 "kafka_ingress" => Ok((
606 "kafka".to_string(),
607 IngressEndpointConfig::Kafka {
608 name: name.to_string(),
609 brokers: required_string(
610 &fields,
611 &["brokers", "broker_endpoints"],
612 name,
613 adapter_type,
614 )?,
615 group_id: required_string(
616 &fields,
617 &["group_id", "consumer_group", "topic_and_consumer_group"],
618 name,
619 adapter_type,
620 )?,
621 topic: required_string(&fields, &["topic"], name, adapter_type)?,
622 poll_timeout_ms: optional_u64_checked(
623 &fields,
624 &["poll_timeout_ms"],
625 name,
626 adapter_type,
627 )?
628 .unwrap_or(1_000),
629 max_message_bytes: optional_usize_checked(
630 &fields,
631 &["max_message_bytes"],
632 name,
633 adapter_type,
634 )?
635 .unwrap_or(1024 * 1024),
636 },
637 )),
638 "nats_ingress" => Ok((
639 "nats".to_string(),
640 IngressEndpointConfig::Nats {
641 name: name.to_string(),
642 server_url: required_string(&fields, &["server_url"], name, adapter_type)?,
643 subject: required_string(&fields, &["subject"], name, adapter_type)?,
644 queue_group: optional_string(&fields, &["queue_group"]),
645 read_timeout_ms: optional_u64_checked(
646 &fields,
647 &["read_timeout_ms"],
648 name,
649 adapter_type,
650 )?
651 .unwrap_or(1_000),
652 max_message_bytes: optional_usize_checked(
653 &fields,
654 &["max_message_bytes"],
655 name,
656 adapter_type,
657 )?
658 .unwrap_or(1024 * 1024),
659 },
660 )),
661 "redis_streams_ingress" => Ok((
662 "redis_streams".to_string(),
663 IngressEndpointConfig::RedisStreams {
664 name: name.to_string(),
665 redis_url: required_string(&fields, &["redis_url"], name, adapter_type)?,
666 stream: required_string(&fields, &["stream", "stream_key"], name, adapter_type)?,
667 start_id: optional_string(&fields, &["start_id"])
668 .unwrap_or_else(|| "$".to_string()),
669 read_timeout_ms: optional_u64_checked(
670 &fields,
671 &["read_timeout_ms"],
672 name,
673 adapter_type,
674 )?
675 .unwrap_or(1_000),
676 max_message_bytes: optional_usize_checked(
677 &fields,
678 &["max_message_bytes"],
679 name,
680 adapter_type,
681 )?
682 .unwrap_or(1024 * 1024),
683 },
684 )),
685 "rabbitmq_streams_ingress" => Ok((
686 "rabbitmq_streams".to_string(),
687 IngressEndpointConfig::RabbitMqStreams {
688 name: name.to_string(),
689 server_url: required_string(
690 &fields,
691 &["server_url", "broker_endpoint"],
692 name,
693 adapter_type,
694 )?,
695 stream: required_string(&fields, &["stream", "stream_name"], name, adapter_type)?,
696 consumer_name: optional_string(&fields, &["consumer_name"]),
697 offset_spec: parse_rabbitmq_offset(
698 optional_string(&fields, &["offset_spec"]).as_deref(),
699 )?,
700 read_timeout_ms: optional_u64_checked(
701 &fields,
702 &["read_timeout_ms"],
703 name,
704 adapter_type,
705 )?
706 .unwrap_or(1_000),
707 max_message_bytes: optional_usize_checked(
708 &fields,
709 &["max_message_bytes"],
710 name,
711 adapter_type,
712 )?
713 .unwrap_or(1024 * 1024),
714 },
715 )),
716 "s3_ingress" => {
717 let (region, endpoint) = parse_region_endpoint(&fields, name, adapter_type)?;
718 Ok((
719 "s3".to_string(),
720 IngressEndpointConfig::S3 {
721 name: name.to_string(),
722 bucket: required_string(&fields, &["bucket"], name, adapter_type)?,
723 prefix: optional_string(&fields, &["prefix"]),
724 region,
725 endpoint,
726 poll_interval_ms: optional_u64_checked(
727 &fields,
728 &["poll_interval_ms"],
729 name,
730 adapter_type,
731 )?
732 .unwrap_or(5_000),
733 max_object_bytes: optional_usize_checked(
734 &fields,
735 &["max_object_bytes"],
736 name,
737 adapter_type,
738 )?
739 .unwrap_or(1024 * 1024),
740 },
741 ))
742 }
743 "postgres_ingress" => Ok((
744 "postgres".to_string(),
745 IngressEndpointConfig::Postgres {
746 name: name.to_string(),
747 dsn: required_string(&fields, &["dsn", "connection_target"], name, adapter_type)?,
748 query: required_string(&fields, &["query", "sql_query"], name, adapter_type)?,
749 poll_interval_ms: optional_u64_checked(
750 &fields,
751 &["poll_interval_ms"],
752 name,
753 adapter_type,
754 )?
755 .unwrap_or(5_000),
756 max_row_bytes: optional_usize_checked(
757 &fields,
758 &["max_row_bytes"],
759 name,
760 adapter_type,
761 )?
762 .unwrap_or(1024 * 1024),
763 row_format: optional_string(&fields, &["row_format"]),
764 },
765 )),
766 other => Err(format!(
767 "config.json endpoint {name} uses unsupported ingress adapterType={other}"
768 )),
769 }
770}
771
772fn parse_egress_endpoint(
773 name: &str,
774 endpoint: &Map<String, Value>,
775) -> Result<(String, EgressEndpointConfig), String> {
776 let adapter_type = endpoint_string(endpoint, &["adapterType", "adapter_type"])
777 .ok_or_else(|| format!("config.json endpoint {name} is missing adapterType"))?;
778 let fields = endpoint_fields(endpoint)?;
779
780 match adapter_type {
781 "line_egress" => Ok((
782 "line".to_string(),
783 EgressEndpointConfig::Line {
784 name: name.to_string(),
785 path: required_string(&fields, &["path", "sink_path"], name, adapter_type)?,
786 },
787 )),
788 "http_egress" => Ok((
789 "http".to_string(),
790 EgressEndpointConfig::Http {
791 name: name.to_string(),
792 url: required_string(&fields, &["url", "url_or_bind_address"], name, adapter_type)?,
793 method: optional_string(&fields, &["method", "route_or_method"])
794 .unwrap_or_else(|| "POST".to_string())
795 .to_ascii_uppercase(),
796 headers: resolved_http_headers(&fields),
797 timeout_ms: optional_u64_checked(
798 &fields,
799 &["timeout_ms", "deadline_ms"],
800 name,
801 adapter_type,
802 )?
803 .unwrap_or(10_000),
804 max_retries: optional_usize_checked(
805 &fields,
806 &["max_retries", "retry_budget"],
807 name,
808 adapter_type,
809 )?
810 .unwrap_or(0),
811 },
812 )),
813 "grpc_egress" => Ok((
814 "grpc".to_string(),
815 EgressEndpointConfig::Grpc {
816 name: name.to_string(),
817 endpoint: required_string(&fields, &["endpoint", "target"], name, adapter_type)?,
818 timeout_ms: optional_u64_checked(
819 &fields,
820 &["timeout_ms", "deadline_ms"],
821 name,
822 adapter_type,
823 )?
824 .unwrap_or(8_000),
825 },
826 )),
827 "websocket_egress" => Ok((
828 "websocket".to_string(),
829 EgressEndpointConfig::WebSocket {
830 name: name.to_string(),
831 url: required_string(&fields, &["url"], name, adapter_type)?,
832 headers: resolved_http_headers(&fields),
833 connect_timeout_ms: optional_u64_checked(
834 &fields,
835 &["connect_timeout_ms", "reconnect_seconds"],
836 name,
837 adapter_type,
838 )?
839 .map(|value| {
840 if fields.contains_key("reconnect_seconds") {
841 value.saturating_mul(1_000)
842 } else {
843 value
844 }
845 })
846 .unwrap_or(5_000),
847 max_retries: optional_usize_checked(
848 &fields,
849 &["max_retries"],
850 name,
851 adapter_type,
852 )?
853 .unwrap_or(0),
854 },
855 )),
856 "kafka_egress" => Ok((
857 "kafka".to_string(),
858 EgressEndpointConfig::Kafka {
859 name: name.to_string(),
860 brokers: required_string(
861 &fields,
862 &["brokers", "broker_endpoints"],
863 name,
864 adapter_type,
865 )?,
866 topic: required_string(&fields, &["topic"], name, adapter_type)?,
867 flush_timeout_ms: optional_u64_checked(
868 &fields,
869 &["flush_timeout_ms"],
870 name,
871 adapter_type,
872 )?
873 .unwrap_or(1_000),
874 },
875 )),
876 "nats_egress" => Ok((
877 "nats".to_string(),
878 EgressEndpointConfig::Nats {
879 name: name.to_string(),
880 server_url: required_string(&fields, &["server_url"], name, adapter_type)?,
881 subject: required_string(&fields, &["subject"], name, adapter_type)?,
882 },
883 )),
884 "redis_streams_egress" => Ok((
885 "redis_streams".to_string(),
886 EgressEndpointConfig::RedisStreams {
887 name: name.to_string(),
888 redis_url: required_string(&fields, &["redis_url"], name, adapter_type)?,
889 stream: required_string(&fields, &["stream", "stream_key"], name, adapter_type)?,
890 },
891 )),
892 "rabbitmq_streams_egress" => Ok((
893 "rabbitmq_streams".to_string(),
894 EgressEndpointConfig::RabbitMqStreams {
895 name: name.to_string(),
896 server_url: required_string(
897 &fields,
898 &["server_url", "broker_endpoint"],
899 name,
900 adapter_type,
901 )?,
902 stream: required_string(&fields, &["stream", "stream_name"], name, adapter_type)?,
903 publisher_name: optional_string(&fields, &["publisher_name"]),
904 flush_timeout_ms: optional_u64_checked(
905 &fields,
906 &["flush_timeout_ms", "confirm_timeout_ms"],
907 name,
908 adapter_type,
909 )?
910 .unwrap_or(5_000),
911 },
912 )),
913 "s3_egress" => {
914 let (region, endpoint) = parse_region_endpoint(&fields, name, adapter_type)?;
915 Ok((
916 "s3".to_string(),
917 EgressEndpointConfig::S3 {
918 name: name.to_string(),
919 bucket: required_string(&fields, &["bucket"], name, adapter_type)?,
920 prefix: optional_string(&fields, &["prefix"]),
921 region,
922 endpoint,
923 content_type: optional_string(&fields, &["content_type"]),
924 },
925 ))
926 }
927 "postgres_egress" => Ok((
928 "postgres".to_string(),
929 EgressEndpointConfig::Postgres {
930 name: name.to_string(),
931 dsn: required_string(&fields, &["dsn", "connection_target"], name, adapter_type)?,
932 table: required_string(&fields, &["table", "target_table"], name, adapter_type)?,
933 mode: optional_string(&fields, &["mode", "write_mode"])
934 .unwrap_or_else(|| "insert".to_string()),
935 conflict_key: optional_string(&fields, &["conflict_key"]),
936 },
937 )),
938 other => Err(format!(
939 "config.json endpoint {name} uses unsupported egress adapterType={other}"
940 )),
941 }
942}
943
944fn endpoint_fields(endpoint: &Map<String, Value>) -> Result<Map<String, Value>, String> {
945 Ok(endpoint
946 .get("fields")
947 .and_then(Value::as_object)
948 .cloned()
949 .unwrap_or_else(Map::new))
950}
951
952fn endpoint_string<'a>(endpoint: &'a Map<String, Value>, keys: &[&str]) -> Option<&'a str> {
953 keys.iter()
954 .find_map(|key| endpoint.get(*key).and_then(Value::as_str))
955}
956
957fn validate_bundle_version(config_json: &Value, config_path: &Path) -> Result<(), String> {
958 match parse_u64(config_json.get("version")) {
959 Some(1) => Ok(()),
960 Some(other) => Err(format!(
961 "config.json {} has unsupported version={other}; expected version=1",
962 config_path.display()
963 )),
964 None => Err(format!(
965 "config.json {} is missing version=1",
966 config_path.display()
967 )),
968 }
969}
970
971fn required_string(
972 fields: &Map<String, Value>,
973 keys: &[&str],
974 endpoint_name: &str,
975 adapter_type: &str,
976) -> Result<String, String> {
977 optional_string(fields, keys).ok_or_else(|| {
978 format!(
979 "config.json endpoint {endpoint_name} ({adapter_type}) is missing one of: {}",
980 keys.join(", ")
981 )
982 })
983}
984
985fn optional_string(fields: &Map<String, Value>, keys: &[&str]) -> Option<String> {
986 keys.iter().find_map(|key| {
987 fields
988 .get(*key)
989 .and_then(Value::as_str)
990 .map(str::trim)
991 .filter(|value| !value.is_empty())
992 .map(ToOwned::to_owned)
993 })
994}
995
996fn optional_u64_checked(
997 fields: &Map<String, Value>,
998 keys: &[&str],
999 endpoint_name: &str,
1000 adapter_type: &str,
1001) -> Result<Option<u64>, String> {
1002 for key in keys {
1003 if let Some(value) = fields.get(*key) {
1004 return parse_u64(Some(value)).map(Some).ok_or_else(|| {
1005 format!(
1006 "config.json endpoint {endpoint_name} ({adapter_type}) has invalid {key}: expected unsigned integer"
1007 )
1008 });
1009 }
1010 }
1011 Ok(None)
1012}
1013
1014fn optional_usize_checked(
1015 fields: &Map<String, Value>,
1016 keys: &[&str],
1017 endpoint_name: &str,
1018 adapter_type: &str,
1019) -> Result<Option<usize>, String> {
1020 for key in keys {
1021 if let Some(value) = fields.get(*key) {
1022 return parse_usize(Some(value)).map(Some).ok_or_else(|| {
1023 format!(
1024 "config.json endpoint {endpoint_name} ({adapter_type}) has invalid {key}: expected unsigned integer"
1025 )
1026 });
1027 }
1028 }
1029 Ok(None)
1030}
1031
1032fn parse_u64(value: Option<&Value>) -> Option<u64> {
1033 match value? {
1034 Value::Number(number) => number.as_u64(),
1035 Value::String(text) => text.trim().parse().ok(),
1036 _ => None,
1037 }
1038}
1039
1040fn parse_usize(value: Option<&Value>) -> Option<usize> {
1041 parse_u64(value).and_then(|value| usize::try_from(value).ok())
1042}
1043
1044fn parse_rabbitmq_offset(value: Option<&str>) -> Result<RabbitMqOffsetSpecification, String> {
1045 match value.unwrap_or("next").trim().to_ascii_lowercase().as_str() {
1046 "first" => Ok(RabbitMqOffsetSpecification::First),
1047 "last" => Ok(RabbitMqOffsetSpecification::Last),
1048 "next" => Ok(RabbitMqOffsetSpecification::Next),
1049 other => Err(format!(
1050 "unsupported rabbitmq_streams ingress offset_spec={other}; expected first, last, or next"
1051 )),
1052 }
1053}
1054
1055fn parse_region_endpoint(
1056 fields: &Map<String, Value>,
1057 endpoint_name: &str,
1058 adapter_type: &str,
1059) -> Result<(String, Option<String>), String> {
1060 if let Some(region) = optional_string(fields, &["region"]) {
1061 return Ok((region, optional_string(fields, &["endpoint"])));
1062 }
1063
1064 let region_or_endpoint =
1065 required_string(fields, &["region_or_endpoint"], endpoint_name, adapter_type)?;
1066 if region_or_endpoint.starts_with("http://") || region_or_endpoint.starts_with("https://") {
1067 Ok(("us-east-1".to_string(), Some(region_or_endpoint)))
1068 } else {
1069 Ok((region_or_endpoint, None))
1070 }
1071}
1072
1073fn resolved_http_headers(fields: &Map<String, Value>) -> Vec<(String, String)> {
1074 let mut headers = BTreeMap::new();
1075 if let Some(Value::Object(map)) = fields.get("headers") {
1076 for (name, value) in map {
1077 if let Some(value) = value.as_str() {
1078 headers.insert(name.clone(), value.to_string());
1079 }
1080 }
1081 }
1082 if let Some(token) = optional_string(fields, &["bearer_token"]) {
1083 headers.insert("Authorization".to_string(), format!("Bearer {token}"));
1084 }
1085 if let Some(key) = optional_string(fields, &["api_key"]) {
1086 headers.insert("X-API-Key".to_string(), key);
1087 }
1088 headers.into_iter().collect()
1089}
1090
1091fn line_endpoint_label(path: &str) -> String {
1092 if path == "-" {
1093 "stdio".to_string()
1094 } else {
1095 Path::new(path)
1096 .file_name()
1097 .and_then(|value| value.to_str())
1098 .unwrap_or("line")
1099 .to_string()
1100 }
1101}
1102
1103fn open_line_input(name: &str, path: &str) -> Result<LineIngress<Box<dyn BufRead>>, String> {
1104 if path == "-" {
1105 return Ok(LineIngress::new(
1106 name,
1107 Box::new(BufReader::new(std::io::stdin())) as Box<dyn BufRead>,
1108 ));
1109 }
1110
1111 let file = File::open(path).map_err(|error| format!("open line input {path}: {error}"))?;
1112 Ok(LineIngress::new(
1113 name,
1114 Box::new(BufReader::new(file)) as Box<dyn BufRead>,
1115 ))
1116}
1117
1118fn open_line_output(name: &str, path: &str) -> Result<LineEgress<Box<dyn Write>>, String> {
1119 if path == "-" {
1120 return Ok(LineEgress::new(
1121 name,
1122 Box::new(std::io::stdout()) as Box<dyn Write>,
1123 ));
1124 }
1125
1126 let file = File::create(path).map_err(|error| format!("open line output {path}: {error}"))?;
1127 Ok(LineEgress::new(name, Box::new(file) as Box<dyn Write>))
1128}