byteor_adapters/
postgres.rs

1use std::collections::VecDeque;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use bytes::Bytes;
6use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod};
7use futures_util::SinkExt;
8use serde_json::{Map, Value};
9use tokio::pin;
10use tokio::runtime::Runtime;
11use tokio_postgres::types::Json;
12use tokio_postgres::{NoTls, SimpleQueryMessage};
13
14use crate::{Adapter, AdapterError, EgressAdapter, IngressAdapter};
15
16pub(crate) struct PostgresFlushResult {
17    pub rows_affected: u64,
18}
19
20#[derive(Debug)]
21struct PostgresIngressStatus {
22    query_state: String,
23    row_format: String,
24    last_query_rows: u64,
25    pending_rows: u64,
26}
27
28impl PostgresIngressStatus {
29    fn new(row_format: PostgresRowFormat) -> Self {
30        Self {
31            query_state: "idle".to_string(),
32            row_format: match row_format {
33                PostgresRowFormat::Json => "json".to_string(),
34                PostgresRowFormat::Columns => "columns".to_string(),
35            },
36            last_query_rows: 0,
37            pending_rows: 0,
38        }
39    }
40
41    fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
42        let mut details = serde_json::Map::new();
43        details.insert(
44            "query_state".to_string(),
45            serde_json::Value::String(self.query_state.clone()),
46        );
47        details.insert(
48            "row_format".to_string(),
49            serde_json::Value::String(self.row_format.clone()),
50        );
51        details.insert(
52            "last_query_rows".to_string(),
53            serde_json::Value::from(self.last_query_rows),
54        );
55        details.insert(
56            "pending_rows".to_string(),
57            serde_json::Value::from(self.pending_rows),
58        );
59        details
60    }
61}
62
63#[derive(Debug)]
64struct PostgresEgressStatus {
65    write_state: String,
66    write_mode: String,
67    buffered_rows: u64,
68    last_flush_rows: u64,
69    last_rows_affected: Option<u64>,
70}
71
72impl PostgresEgressStatus {
73    fn new(mode: PostgresWriteMode) -> Self {
74        Self {
75            write_state: "idle".to_string(),
76            write_mode: match mode {
77                PostgresWriteMode::Insert => "insert".to_string(),
78                PostgresWriteMode::Upsert => "upsert".to_string(),
79                PostgresWriteMode::Copy => "copy".to_string(),
80            },
81            buffered_rows: 0,
82            last_flush_rows: 0,
83            last_rows_affected: None,
84        }
85    }
86
87    fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
88        let mut details = serde_json::Map::new();
89        details.insert(
90            "write_state".to_string(),
91            serde_json::Value::String(self.write_state.clone()),
92        );
93        details.insert(
94            "write_mode".to_string(),
95            serde_json::Value::String(self.write_mode.clone()),
96        );
97        details.insert(
98            "buffered_rows".to_string(),
99            serde_json::Value::from(self.buffered_rows),
100        );
101        details.insert(
102            "last_flush_rows".to_string(),
103            serde_json::Value::from(self.last_flush_rows),
104        );
105        if let Some(rows_affected) = self.last_rows_affected {
106            details.insert(
107                "last_rows_affected".to_string(),
108                serde_json::Value::from(rows_affected),
109            );
110        }
111        details
112    }
113}
114
115/// Result row serialization mode for PostgreSQL ingress.
116#[derive(Clone, Copy, Debug, PartialEq, Eq)]
117pub enum PostgresRowFormat {
118    /// Emit each row as a JSON object keyed by column name.
119    Json,
120    /// Emit each row as `{"columns": [...], "values": [...]}` using string values.
121    Columns,
122}
123
124impl PostgresRowFormat {
125    fn parse(value: Option<&str>) -> Result<Self, AdapterError> {
126        match value.map(str::trim).filter(|value| !value.is_empty()) {
127            None | Some("json") => Ok(Self::Json),
128            Some("columns") => Ok(Self::Columns),
129            Some(other) => Err(AdapterError::Config {
130                detail: format!("unsupported postgres row_format: {other}"),
131            }),
132        }
133    }
134}
135
136/// Persistence mode for PostgreSQL egress.
137#[derive(Clone, Copy, Debug, PartialEq, Eq)]
138pub enum PostgresWriteMode {
139    /// Insert rows into the target table.
140    Insert,
141    /// Upsert rows into the target table using a conflict key.
142    Upsert,
143    /// Bulk load rows into the target table using `COPY FROM STDIN`.
144    Copy,
145}
146
147impl PostgresWriteMode {
148    fn parse(value: &str) -> Result<Self, AdapterError> {
149        match value.trim() {
150            "insert" => Ok(Self::Insert),
151            "upsert" => Ok(Self::Upsert),
152            "copy" => Ok(Self::Copy),
153            other => Err(AdapterError::Config {
154                detail: format!("unsupported postgres write mode: {other}"),
155            }),
156        }
157    }
158}
159
160/// Internal PostgreSQL ingress client contract used by the adapter and crate-local tests.
161pub(crate) trait PostgresIngressClient: Send {
162    fn read_next(&mut self, timeout: Duration) -> Result<Option<Vec<u8>>, AdapterError>;
163}
164
165/// Internal PostgreSQL egress client contract used by the adapter and crate-local tests.
166pub(crate) trait PostgresEgressClient: Send {
167    fn flush_rows(&mut self, rows: &[Map<String, Value>]) -> Result<PostgresFlushResult, AdapterError>;
168}
169
170struct AsyncPostgresIngressClient {
171    runtime: Runtime,
172    pool: Pool,
173    query: String,
174    row_format: PostgresRowFormat,
175    max_row_bytes: usize,
176    pending: VecDeque<Vec<u8>>,
177    status: Arc<Mutex<PostgresIngressStatus>>,
178}
179
180impl AsyncPostgresIngressClient {
181    fn connect(
182        dsn: &str,
183        query: &str,
184        row_format: PostgresRowFormat,
185        max_row_bytes: usize,
186        status: Arc<Mutex<PostgresIngressStatus>>,
187    ) -> Result<Self, AdapterError> {
188        Ok(Self {
189            runtime: build_runtime("postgres_ingress_runtime_build")?,
190            pool: build_pool(dsn)?,
191            query: query.trim().to_string(),
192            row_format,
193            max_row_bytes,
194            pending: VecDeque::new(),
195            status,
196        })
197    }
198
199    async fn fetch_pending_json(
200        pool: Pool,
201        query: String,
202        max_row_bytes: usize,
203    ) -> Result<Vec<Vec<u8>>, AdapterError> {
204        let client = pool.get().await.map_err(|err| AdapterError::Postgres {
205            op: "postgres_pool_get",
206            detail: err.to_string(),
207        })?;
208        let wrapped_query = format!("SELECT row_to_json(t)::text FROM ({query}) AS t");
209        let rows = client
210            .query(wrapped_query.as_str(), &[])
211            .await
212            .map_err(|err| AdapterError::Postgres {
213                op: "postgres_query_rows",
214                detail: err.to_string(),
215            })?;
216
217        rows.into_iter()
218            .map(|row| {
219                let payload: String = row.get(0);
220                enforce_payload_limit(
221                    "postgres_ingress_row_to_json",
222                    payload.as_bytes(),
223                    max_row_bytes,
224                )
225            })
226            .collect()
227    }
228
229    async fn fetch_pending_columns(
230        pool: Pool,
231        query: String,
232        max_row_bytes: usize,
233    ) -> Result<Vec<Vec<u8>>, AdapterError> {
234        let client = pool.get().await.map_err(|err| AdapterError::Postgres {
235            op: "postgres_pool_get",
236            detail: err.to_string(),
237        })?;
238        let messages =
239            client
240                .simple_query(query.as_str())
241                .await
242                .map_err(|err| AdapterError::Postgres {
243                    op: "postgres_simple_query",
244                    detail: err.to_string(),
245                })?;
246
247        let mut payloads = Vec::new();
248        for message in messages {
249            let SimpleQueryMessage::Row(row) = message else {
250                continue;
251            };
252
253            let columns = row
254                .columns()
255                .iter()
256                .map(|column| Value::String(column.name().to_string()))
257                .collect::<Vec<_>>();
258            let values = row
259                .columns()
260                .iter()
261                .enumerate()
262                .map(|(index, _)| match row.get(index) {
263                    Some(value) => Value::String(value.to_string()),
264                    None => Value::Null,
265                })
266                .collect::<Vec<_>>();
267            let payload = serde_json::to_vec(&serde_json::json!({
268                "columns": columns,
269                "values": values,
270            }))
271            .map_err(|err| AdapterError::Postgres {
272                op: "postgres_columns_serialize",
273                detail: err.to_string(),
274            })?;
275            payloads.push(enforce_payload_limit(
276                "postgres_columns_serialize",
277                &payload,
278                max_row_bytes,
279            )?);
280        }
281
282        Ok(payloads)
283    }
284}
285
286impl PostgresIngressClient for AsyncPostgresIngressClient {
287    fn read_next(&mut self, timeout: Duration) -> Result<Option<Vec<u8>>, AdapterError> {
288        if let Some(payload) = self.pending.pop_front() {
289            self.status.lock().unwrap().pending_rows = self.pending.len() as u64;
290            return Ok(Some(payload));
291        }
292
293        self.status.lock().unwrap().query_state = "querying".to_string();
294
295        let pool = self.pool.clone();
296        let query = self.query.clone();
297        let max_row_bytes = self.max_row_bytes;
298        let fetched = match self.row_format {
299            PostgresRowFormat::Json => {
300                self.runtime
301                    .block_on(Self::fetch_pending_json(pool, query, max_row_bytes))?
302            }
303            PostgresRowFormat::Columns => {
304                self.runtime
305                    .block_on(Self::fetch_pending_columns(pool, query, max_row_bytes))?
306            }
307        };
308
309        if fetched.is_empty() {
310            let mut status = self.status.lock().unwrap();
311            status.query_state = "idle".to_string();
312            status.last_query_rows = 0;
313            status.pending_rows = 0;
314            std::thread::sleep(timeout);
315            return Ok(None);
316        }
317
318        {
319            let mut status = self.status.lock().unwrap();
320            status.query_state = "rows_buffered".to_string();
321            status.last_query_rows = fetched.len() as u64;
322            status.pending_rows = fetched.len().saturating_sub(1) as u64;
323        }
324        self.pending.extend(fetched);
325        Ok(self.pending.pop_front())
326    }
327}
328
329struct AsyncPostgresEgressClient {
330    runtime: Runtime,
331    pool: Pool,
332    table: String,
333    mode: PostgresWriteMode,
334    conflict_key: Option<String>,
335}
336
337impl AsyncPostgresEgressClient {
338    fn connect(
339        dsn: &str,
340        table: &str,
341        mode: PostgresWriteMode,
342        conflict_key: Option<&str>,
343    ) -> Result<Self, AdapterError> {
344        let table = quote_relation(table)?;
345        let conflict_key = conflict_key.map(quote_identifier).transpose()?;
346        if matches!(mode, PostgresWriteMode::Upsert) && conflict_key.is_none() {
347            return Err(AdapterError::Config {
348                detail: "postgres upsert mode requires a conflict_key".into(),
349            });
350        }
351
352        Ok(Self {
353            runtime: build_runtime("postgres_egress_runtime_build")?,
354            pool: build_pool(dsn)?,
355            table,
356            mode,
357            conflict_key,
358        })
359    }
360
361    async fn flush_insert_or_upsert(
362        pool: Pool,
363        table: String,
364        mode: PostgresWriteMode,
365        conflict_key: Option<String>,
366        rows: Vec<Map<String, Value>>,
367    ) -> Result<PostgresFlushResult, AdapterError> {
368        let columns = union_keys(&rows)?;
369        if columns.is_empty() {
370            return Err(AdapterError::Config {
371                detail: "postgres egress row payload must contain at least one column".into(),
372            });
373        }
374
375        if let Some(conflict_key) = conflict_key.as_deref() {
376            let conflict_field = conflict_key.trim_matches('"');
377            for row in &rows {
378                if !row.contains_key(conflict_field) {
379                    return Err(AdapterError::Config {
380                        detail: format!(
381                            "postgres upsert payload missing conflict_key field: {conflict_field}"
382                        ),
383                    });
384                }
385            }
386        }
387
388        let rows_json = Value::Array(rows.into_iter().map(Value::Object).collect());
389        let sql = build_insert_sql(&table, &columns, mode, conflict_key.as_deref())?;
390        let client = pool.get().await.map_err(|err| AdapterError::Postgres {
391            op: "postgres_pool_get",
392            detail: err.to_string(),
393        })?;
394        let statement =
395            client
396                .prepare_cached(sql.as_str())
397                .await
398                .map_err(|err| AdapterError::Postgres {
399                    op: "postgres_prepare_cached",
400                    detail: err.to_string(),
401                })?;
402        client
403            .execute(&statement, &[&Json(rows_json)])
404            .await
405            .map_err(|err| AdapterError::Postgres {
406                op: "postgres_execute",
407                detail: err.to_string(),
408            })
409            .map(|rows_affected| PostgresFlushResult { rows_affected })
410    }
411
412    async fn flush_copy(
413        pool: Pool,
414        table: String,
415        rows: Vec<Map<String, Value>>,
416    ) -> Result<PostgresFlushResult, AdapterError> {
417        let columns = union_keys(&rows)?;
418        if columns.is_empty() {
419            return Err(AdapterError::Config {
420                detail: "postgres copy payload must contain at least one column".into(),
421            });
422        }
423
424        let copy_sql = format!(
425            "COPY {} ({}) FROM STDIN WITH (FORMAT csv, NULL '\\N')",
426            table,
427            columns.join(", ")
428        );
429        let csv = render_copy_csv(&rows, &columns)?;
430        let client = pool.get().await.map_err(|err| AdapterError::Postgres {
431            op: "postgres_pool_get",
432            detail: err.to_string(),
433        })?;
434        let sink =
435            client
436                .copy_in(copy_sql.as_str())
437                .await
438                .map_err(|err| AdapterError::Postgres {
439                    op: "postgres_copy_in",
440                    detail: err.to_string(),
441                })?;
442        pin!(sink);
443        sink.send(Bytes::from(csv))
444            .await
445            .map_err(|err| AdapterError::Postgres {
446                op: "postgres_copy_send",
447                detail: err.to_string(),
448            })?;
449        let rows_affected = sink.as_mut().finish().await.map_err(|err| AdapterError::Postgres {
450            op: "postgres_copy_close",
451            detail: err.to_string(),
452        })?;
453        Ok(PostgresFlushResult { rows_affected })
454    }
455}
456
457impl PostgresEgressClient for AsyncPostgresEgressClient {
458    fn flush_rows(&mut self, rows: &[Map<String, Value>]) -> Result<PostgresFlushResult, AdapterError> {
459        let rows = rows.to_vec();
460        let pool = self.pool.clone();
461        let table = self.table.clone();
462        match self.mode {
463            PostgresWriteMode::Insert | PostgresWriteMode::Upsert => {
464                self.runtime.block_on(Self::flush_insert_or_upsert(
465                    pool,
466                    table,
467                    self.mode,
468                    self.conflict_key.clone(),
469                    rows,
470                ))
471            }
472            PostgresWriteMode::Copy => self.runtime.block_on(Self::flush_copy(pool, table, rows)),
473        }
474    }
475}
476
477fn build_runtime(op: &'static str) -> Result<Runtime, AdapterError> {
478    tokio::runtime::Builder::new_current_thread()
479        .enable_all()
480        .build()
481        .map_err(|err| AdapterError::Postgres {
482            op,
483            detail: err.to_string(),
484        })
485}
486
487fn build_pool(dsn: &str) -> Result<Pool, AdapterError> {
488    let config = dsn
489        .parse::<tokio_postgres::Config>()
490        .map_err(|err| AdapterError::Config {
491            detail: format!("invalid postgres dsn: {err}"),
492        })?;
493    let manager = Manager::from_config(
494        config,
495        NoTls,
496        ManagerConfig {
497            recycling_method: RecyclingMethod::Fast,
498        },
499    );
500    Pool::builder(manager)
501        .max_size(16)
502        .build()
503        .map_err(|err| AdapterError::Postgres {
504            op: "postgres_pool_build",
505            detail: err.to_string(),
506        })
507}
508
509fn enforce_payload_limit(
510    op: &'static str,
511    payload: &[u8],
512    max_row_bytes: usize,
513) -> Result<Vec<u8>, AdapterError> {
514    if payload.len() > max_row_bytes {
515        return Err(AdapterError::Postgres {
516            op,
517            detail: format!(
518                "postgres row too large len={} max={max_row_bytes}",
519                payload.len()
520            ),
521        });
522    }
523    Ok(payload.to_vec())
524}
525
526fn quote_relation(value: &str) -> Result<String, AdapterError> {
527    let segments = value
528        .split('.')
529        .map(quote_identifier)
530        .collect::<Result<Vec<_>, _>>()?;
531    if segments.is_empty() {
532        return Err(AdapterError::Config {
533            detail: "postgres table must not be empty".into(),
534        });
535    }
536    Ok(segments.join("."))
537}
538
539fn quote_identifier(value: &str) -> Result<String, AdapterError> {
540    let value = value.trim();
541    let mut chars = value.chars();
542    let Some(first) = chars.next() else {
543        return Err(AdapterError::Config {
544            detail: "postgres identifier must not be empty".into(),
545        });
546    };
547    if !(first == '_' || first.is_ascii_alphabetic())
548        || !chars.all(|ch| ch == '_' || ch.is_ascii_alphanumeric())
549    {
550        return Err(AdapterError::Config {
551            detail: format!("unsupported postgres identifier: {value}"),
552        });
553    }
554    Ok(format!("\"{value}\""))
555}
556
557fn union_keys(rows: &[Map<String, Value>]) -> Result<Vec<String>, AdapterError> {
558    let mut keys = rows
559        .iter()
560        .flat_map(|row| row.keys())
561        .map(|key| quote_identifier(key))
562        .collect::<Result<Vec<_>, _>>()?;
563    keys.sort();
564    keys.dedup();
565    Ok(keys)
566}
567
568fn build_insert_sql(
569    table: &str,
570    columns: &[String],
571    mode: PostgresWriteMode,
572    conflict_key: Option<&str>,
573) -> Result<String, AdapterError> {
574    let columns_csv = columns.join(", ");
575    let mut sql = format!(
576        "INSERT INTO {table} ({columns_csv}) SELECT {columns_csv} FROM jsonb_populate_recordset(NULL::{table}, $1::jsonb)"
577    );
578    if matches!(mode, PostgresWriteMode::Upsert) {
579        let conflict_key = conflict_key.ok_or_else(|| AdapterError::Config {
580            detail: "postgres upsert mode requires a conflict_key".into(),
581        })?;
582        let updates = columns
583            .iter()
584            .filter(|column| column.as_str() != conflict_key)
585            .map(|column| format!("{column} = EXCLUDED.{column}"))
586            .collect::<Vec<_>>();
587        if updates.is_empty() {
588            sql.push_str(&format!(" ON CONFLICT ({conflict_key}) DO NOTHING"));
589        } else {
590            sql.push_str(&format!(
591                " ON CONFLICT ({conflict_key}) DO UPDATE SET {}",
592                updates.join(", ")
593            ));
594        }
595    }
596    Ok(sql)
597}
598
599fn render_copy_csv(
600    rows: &[Map<String, Value>],
601    columns: &[String],
602) -> Result<Vec<u8>, AdapterError> {
603    let mut csv = String::new();
604    for row in rows {
605        for (index, column) in columns.iter().enumerate() {
606            if index > 0 {
607                csv.push(',');
608            }
609            let key = column.trim_matches('"');
610            push_csv_field(&mut csv, row.get(key))?;
611        }
612        csv.push('\n');
613    }
614    Ok(csv.into_bytes())
615}
616
617fn push_csv_field(out: &mut String, value: Option<&Value>) -> Result<(), AdapterError> {
618    match value {
619        None | Some(Value::Null) => out.push_str("\\N"),
620        Some(Value::String(text)) => push_csv_string(out, text),
621        Some(Value::Bool(flag)) => push_csv_string(out, if *flag { "true" } else { "false" }),
622        Some(Value::Number(number)) => push_csv_string(out, &number.to_string()),
623        Some(other) => {
624            let text = serde_json::to_string(other).map_err(|err| AdapterError::Postgres {
625                op: "postgres_copy_serialize",
626                detail: err.to_string(),
627            })?;
628            push_csv_string(out, &text);
629        }
630    }
631    Ok(())
632}
633
634fn push_csv_string(out: &mut String, value: &str) {
635    let must_quote = value == "\\N"
636        || value.contains(',')
637        || value.contains('"')
638        || value.contains('\n')
639        || value.contains('\r');
640    if must_quote {
641        out.push('"');
642        for ch in value.chars() {
643            if ch == '"' {
644                out.push('"');
645            }
646            out.push(ch);
647        }
648        out.push('"');
649    } else {
650        out.push_str(value);
651    }
652}
653
654fn parse_rows_payload(payload: &[u8]) -> Result<Vec<Map<String, Value>>, AdapterError> {
655    let value: Value = serde_json::from_slice(payload).map_err(|err| AdapterError::Postgres {
656        op: "postgres_payload_parse",
657        detail: err.to_string(),
658    })?;
659    match value {
660        Value::Object(object) => Ok(vec![normalize_row_object(object)?]),
661        Value::Array(items) => items
662            .into_iter()
663            .map(|item| match item {
664                Value::Object(object) => normalize_row_object(object),
665                _ => Err(AdapterError::Config {
666                    detail: "postgres payload arrays must contain JSON objects".into(),
667                }),
668            })
669            .collect(),
670        _ => Err(AdapterError::Config {
671            detail: "postgres payload must be a JSON object or array of objects".into(),
672        }),
673    }
674}
675
676fn normalize_row_object(
677    mut object: Map<String, Value>,
678) -> Result<Map<String, Value>, AdapterError> {
679    if let (Some(columns), Some(values)) = (object.remove("columns"), object.remove("values")) {
680        return row_from_columns_payload(columns, values);
681    }
682    Ok(object)
683}
684
685fn row_from_columns_payload(
686    columns: Value,
687    values: Value,
688) -> Result<Map<String, Value>, AdapterError> {
689    let Value::Array(columns) = columns else {
690        return Err(AdapterError::Config {
691            detail: "postgres columns payload must provide an array of column names".into(),
692        });
693    };
694    let Value::Array(values) = values else {
695        return Err(AdapterError::Config {
696            detail: "postgres columns payload must provide an array of values".into(),
697        });
698    };
699    if columns.len() != values.len() {
700        return Err(AdapterError::Config {
701            detail: "postgres columns payload column/value lengths do not match".into(),
702        });
703    }
704
705    let mut row = Map::new();
706    for (column, value) in columns.into_iter().zip(values) {
707        let Value::String(column_name) = column else {
708            return Err(AdapterError::Config {
709                detail: "postgres columns payload column names must be strings".into(),
710            });
711        };
712        row.insert(column_name, value);
713    }
714    Ok(row)
715}
716
717/// Minimal blocking PostgreSQL ingress adapter.
718///
719/// The adapter executes the configured query whenever its internal queue is empty. Query authors
720/// should provide incremental or tailing SQL if they want exactly-once style progression.
721pub struct PostgresIngress {
722    name: String,
723    client: Box<dyn PostgresIngressClient>,
724    poll_interval: Duration,
725    status: Arc<Mutex<PostgresIngressStatus>>,
726}
727
728impl PostgresIngress {
729    /// Create a blocking PostgreSQL ingress adapter.
730    pub fn new(
731        name: impl Into<String>,
732        dsn: &str,
733        query: &str,
734        poll_interval: Duration,
735        max_row_bytes: usize,
736        row_format: Option<&str>,
737    ) -> Result<Self, AdapterError> {
738        let row_format = PostgresRowFormat::parse(row_format)?;
739        let status = Arc::new(Mutex::new(PostgresIngressStatus::new(row_format)));
740        let client = AsyncPostgresIngressClient::connect(
741            dsn,
742            query,
743            row_format,
744            max_row_bytes,
745            Arc::clone(&status),
746        )?;
747        Ok(Self {
748            name: name.into(),
749            client: Box::new(client),
750            poll_interval,
751            status,
752        })
753    }
754
755    #[cfg(test)]
756    /// Build a PostgreSQL ingress adapter around a crate-local fake client.
757    pub(crate) fn with_client(
758        name: impl Into<String>,
759        client: impl PostgresIngressClient + 'static,
760        poll_interval: Duration,
761    ) -> Self {
762        Self {
763            name: name.into(),
764            client: Box::new(client),
765            poll_interval,
766            status: Arc::new(Mutex::new(PostgresIngressStatus::new(PostgresRowFormat::Json))),
767        }
768    }
769}
770
771impl Adapter for PostgresIngress {
772    fn name(&self) -> &str {
773        &self.name
774    }
775
776    fn transport_kind(&self) -> &str {
777        "postgres"
778    }
779
780    fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
781        self.status.lock().unwrap().status_fields()
782    }
783}
784
785impl IngressAdapter for PostgresIngress {
786    fn read_next(&mut self, out: &mut [u8]) -> Result<Option<usize>, AdapterError> {
787        let Some(payload) = self.client.read_next(self.poll_interval)? else {
788            return Ok(None);
789        };
790        if payload.len() > out.len() {
791            return Err(AdapterError::Postgres {
792                op: "postgres_read_next",
793                detail: format!("payload too large len={} buf={}", payload.len(), out.len()),
794            });
795        }
796        out[..payload.len()].copy_from_slice(&payload);
797        Ok(Some(payload.len()))
798    }
799}
800
801/// Minimal blocking PostgreSQL egress adapter.
802///
803/// Input messages must be JSON objects, arrays of JSON objects, or ingress-style
804/// `{"columns": [...], "values": [...]}` envelopes.
805pub struct PostgresEgress {
806    name: String,
807    client: Box<dyn PostgresEgressClient>,
808    buffered_rows: Vec<Map<String, Value>>,
809    status: Arc<Mutex<PostgresEgressStatus>>,
810}
811
812impl PostgresEgress {
813    /// Create a blocking PostgreSQL egress adapter.
814    pub fn new(
815        name: impl Into<String>,
816        dsn: &str,
817        table: &str,
818        mode: &str,
819        conflict_key: Option<&str>,
820    ) -> Result<Self, AdapterError> {
821        let mode = PostgresWriteMode::parse(mode)?;
822        let client = AsyncPostgresEgressClient::connect(
823            dsn,
824            table,
825            mode,
826            conflict_key,
827        )?;
828        Ok(Self {
829            name: name.into(),
830            client: Box::new(client),
831            buffered_rows: Vec::new(),
832            status: Arc::new(Mutex::new(PostgresEgressStatus::new(mode))),
833        })
834    }
835
836    #[cfg(test)]
837    /// Build a PostgreSQL egress adapter around a crate-local fake client.
838    pub(crate) fn with_client(
839        name: impl Into<String>,
840        client: impl PostgresEgressClient + 'static,
841    ) -> Self {
842        Self {
843            name: name.into(),
844            client: Box::new(client),
845            buffered_rows: Vec::new(),
846            status: Arc::new(Mutex::new(PostgresEgressStatus::new(PostgresWriteMode::Insert))),
847        }
848    }
849}
850
851impl Adapter for PostgresEgress {
852    fn name(&self) -> &str {
853        &self.name
854    }
855
856    fn transport_kind(&self) -> &str {
857        "postgres"
858    }
859
860    fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
861        self.status.lock().unwrap().status_fields()
862    }
863}
864
865impl EgressAdapter for PostgresEgress {
866    fn write_msg(&mut self, msg: &[u8]) -> Result<(), AdapterError> {
867        self.buffered_rows.extend(parse_rows_payload(msg)?);
868        let mut status = self.status.lock().unwrap();
869        status.write_state = "buffering".to_string();
870        status.buffered_rows = self.buffered_rows.len() as u64;
871        Ok(())
872    }
873
874    fn flush(&mut self) -> Result<(), AdapterError> {
875        if self.buffered_rows.is_empty() {
876            return Ok(());
877        }
878        {
879            let mut status = self.status.lock().unwrap();
880            status.write_state = "flushing".to_string();
881            status.last_flush_rows = self.buffered_rows.len() as u64;
882        }
883        let result = self.client.flush_rows(&self.buffered_rows)?;
884        self.buffered_rows.clear();
885        let mut status = self.status.lock().unwrap();
886        status.write_state = "idle".to_string();
887        status.buffered_rows = 0;
888        status.last_rows_affected = Some(result.rows_affected);
889        Ok(())
890    }
891}