1use std::collections::VecDeque;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use bytes::Bytes;
6use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod};
7use futures_util::SinkExt;
8use serde_json::{Map, Value};
9use tokio::pin;
10use tokio::runtime::Runtime;
11use tokio_postgres::types::Json;
12use tokio_postgres::{NoTls, SimpleQueryMessage};
13
14use crate::{Adapter, AdapterError, EgressAdapter, IngressAdapter};
15
16pub(crate) struct PostgresFlushResult {
17 pub rows_affected: u64,
18}
19
20#[derive(Debug)]
21struct PostgresIngressStatus {
22 query_state: String,
23 row_format: String,
24 last_query_rows: u64,
25 pending_rows: u64,
26}
27
28impl PostgresIngressStatus {
29 fn new(row_format: PostgresRowFormat) -> Self {
30 Self {
31 query_state: "idle".to_string(),
32 row_format: match row_format {
33 PostgresRowFormat::Json => "json".to_string(),
34 PostgresRowFormat::Columns => "columns".to_string(),
35 },
36 last_query_rows: 0,
37 pending_rows: 0,
38 }
39 }
40
41 fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
42 let mut details = serde_json::Map::new();
43 details.insert(
44 "query_state".to_string(),
45 serde_json::Value::String(self.query_state.clone()),
46 );
47 details.insert(
48 "row_format".to_string(),
49 serde_json::Value::String(self.row_format.clone()),
50 );
51 details.insert(
52 "last_query_rows".to_string(),
53 serde_json::Value::from(self.last_query_rows),
54 );
55 details.insert(
56 "pending_rows".to_string(),
57 serde_json::Value::from(self.pending_rows),
58 );
59 details
60 }
61}
62
63#[derive(Debug)]
64struct PostgresEgressStatus {
65 write_state: String,
66 write_mode: String,
67 buffered_rows: u64,
68 last_flush_rows: u64,
69 last_rows_affected: Option<u64>,
70}
71
72impl PostgresEgressStatus {
73 fn new(mode: PostgresWriteMode) -> Self {
74 Self {
75 write_state: "idle".to_string(),
76 write_mode: match mode {
77 PostgresWriteMode::Insert => "insert".to_string(),
78 PostgresWriteMode::Upsert => "upsert".to_string(),
79 PostgresWriteMode::Copy => "copy".to_string(),
80 },
81 buffered_rows: 0,
82 last_flush_rows: 0,
83 last_rows_affected: None,
84 }
85 }
86
87 fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
88 let mut details = serde_json::Map::new();
89 details.insert(
90 "write_state".to_string(),
91 serde_json::Value::String(self.write_state.clone()),
92 );
93 details.insert(
94 "write_mode".to_string(),
95 serde_json::Value::String(self.write_mode.clone()),
96 );
97 details.insert(
98 "buffered_rows".to_string(),
99 serde_json::Value::from(self.buffered_rows),
100 );
101 details.insert(
102 "last_flush_rows".to_string(),
103 serde_json::Value::from(self.last_flush_rows),
104 );
105 if let Some(rows_affected) = self.last_rows_affected {
106 details.insert(
107 "last_rows_affected".to_string(),
108 serde_json::Value::from(rows_affected),
109 );
110 }
111 details
112 }
113}
114
115#[derive(Clone, Copy, Debug, PartialEq, Eq)]
117pub enum PostgresRowFormat {
118 Json,
120 Columns,
122}
123
124impl PostgresRowFormat {
125 fn parse(value: Option<&str>) -> Result<Self, AdapterError> {
126 match value.map(str::trim).filter(|value| !value.is_empty()) {
127 None | Some("json") => Ok(Self::Json),
128 Some("columns") => Ok(Self::Columns),
129 Some(other) => Err(AdapterError::Config {
130 detail: format!("unsupported postgres row_format: {other}"),
131 }),
132 }
133 }
134}
135
136#[derive(Clone, Copy, Debug, PartialEq, Eq)]
138pub enum PostgresWriteMode {
139 Insert,
141 Upsert,
143 Copy,
145}
146
147impl PostgresWriteMode {
148 fn parse(value: &str) -> Result<Self, AdapterError> {
149 match value.trim() {
150 "insert" => Ok(Self::Insert),
151 "upsert" => Ok(Self::Upsert),
152 "copy" => Ok(Self::Copy),
153 other => Err(AdapterError::Config {
154 detail: format!("unsupported postgres write mode: {other}"),
155 }),
156 }
157 }
158}
159
160pub(crate) trait PostgresIngressClient: Send {
162 fn read_next(&mut self, timeout: Duration) -> Result<Option<Vec<u8>>, AdapterError>;
163}
164
165pub(crate) trait PostgresEgressClient: Send {
167 fn flush_rows(&mut self, rows: &[Map<String, Value>]) -> Result<PostgresFlushResult, AdapterError>;
168}
169
170struct AsyncPostgresIngressClient {
171 runtime: Runtime,
172 pool: Pool,
173 query: String,
174 row_format: PostgresRowFormat,
175 max_row_bytes: usize,
176 pending: VecDeque<Vec<u8>>,
177 status: Arc<Mutex<PostgresIngressStatus>>,
178}
179
180impl AsyncPostgresIngressClient {
181 fn connect(
182 dsn: &str,
183 query: &str,
184 row_format: PostgresRowFormat,
185 max_row_bytes: usize,
186 status: Arc<Mutex<PostgresIngressStatus>>,
187 ) -> Result<Self, AdapterError> {
188 Ok(Self {
189 runtime: build_runtime("postgres_ingress_runtime_build")?,
190 pool: build_pool(dsn)?,
191 query: query.trim().to_string(),
192 row_format,
193 max_row_bytes,
194 pending: VecDeque::new(),
195 status,
196 })
197 }
198
199 async fn fetch_pending_json(
200 pool: Pool,
201 query: String,
202 max_row_bytes: usize,
203 ) -> Result<Vec<Vec<u8>>, AdapterError> {
204 let client = pool.get().await.map_err(|err| AdapterError::Postgres {
205 op: "postgres_pool_get",
206 detail: err.to_string(),
207 })?;
208 let wrapped_query = format!("SELECT row_to_json(t)::text FROM ({query}) AS t");
209 let rows = client
210 .query(wrapped_query.as_str(), &[])
211 .await
212 .map_err(|err| AdapterError::Postgres {
213 op: "postgres_query_rows",
214 detail: err.to_string(),
215 })?;
216
217 rows.into_iter()
218 .map(|row| {
219 let payload: String = row.get(0);
220 enforce_payload_limit(
221 "postgres_ingress_row_to_json",
222 payload.as_bytes(),
223 max_row_bytes,
224 )
225 })
226 .collect()
227 }
228
229 async fn fetch_pending_columns(
230 pool: Pool,
231 query: String,
232 max_row_bytes: usize,
233 ) -> Result<Vec<Vec<u8>>, AdapterError> {
234 let client = pool.get().await.map_err(|err| AdapterError::Postgres {
235 op: "postgres_pool_get",
236 detail: err.to_string(),
237 })?;
238 let messages =
239 client
240 .simple_query(query.as_str())
241 .await
242 .map_err(|err| AdapterError::Postgres {
243 op: "postgres_simple_query",
244 detail: err.to_string(),
245 })?;
246
247 let mut payloads = Vec::new();
248 for message in messages {
249 let SimpleQueryMessage::Row(row) = message else {
250 continue;
251 };
252
253 let columns = row
254 .columns()
255 .iter()
256 .map(|column| Value::String(column.name().to_string()))
257 .collect::<Vec<_>>();
258 let values = row
259 .columns()
260 .iter()
261 .enumerate()
262 .map(|(index, _)| match row.get(index) {
263 Some(value) => Value::String(value.to_string()),
264 None => Value::Null,
265 })
266 .collect::<Vec<_>>();
267 let payload = serde_json::to_vec(&serde_json::json!({
268 "columns": columns,
269 "values": values,
270 }))
271 .map_err(|err| AdapterError::Postgres {
272 op: "postgres_columns_serialize",
273 detail: err.to_string(),
274 })?;
275 payloads.push(enforce_payload_limit(
276 "postgres_columns_serialize",
277 &payload,
278 max_row_bytes,
279 )?);
280 }
281
282 Ok(payloads)
283 }
284}
285
286impl PostgresIngressClient for AsyncPostgresIngressClient {
287 fn read_next(&mut self, timeout: Duration) -> Result<Option<Vec<u8>>, AdapterError> {
288 if let Some(payload) = self.pending.pop_front() {
289 self.status.lock().unwrap().pending_rows = self.pending.len() as u64;
290 return Ok(Some(payload));
291 }
292
293 self.status.lock().unwrap().query_state = "querying".to_string();
294
295 let pool = self.pool.clone();
296 let query = self.query.clone();
297 let max_row_bytes = self.max_row_bytes;
298 let fetched = match self.row_format {
299 PostgresRowFormat::Json => {
300 self.runtime
301 .block_on(Self::fetch_pending_json(pool, query, max_row_bytes))?
302 }
303 PostgresRowFormat::Columns => {
304 self.runtime
305 .block_on(Self::fetch_pending_columns(pool, query, max_row_bytes))?
306 }
307 };
308
309 if fetched.is_empty() {
310 let mut status = self.status.lock().unwrap();
311 status.query_state = "idle".to_string();
312 status.last_query_rows = 0;
313 status.pending_rows = 0;
314 std::thread::sleep(timeout);
315 return Ok(None);
316 }
317
318 {
319 let mut status = self.status.lock().unwrap();
320 status.query_state = "rows_buffered".to_string();
321 status.last_query_rows = fetched.len() as u64;
322 status.pending_rows = fetched.len().saturating_sub(1) as u64;
323 }
324 self.pending.extend(fetched);
325 Ok(self.pending.pop_front())
326 }
327}
328
329struct AsyncPostgresEgressClient {
330 runtime: Runtime,
331 pool: Pool,
332 table: String,
333 mode: PostgresWriteMode,
334 conflict_key: Option<String>,
335}
336
337impl AsyncPostgresEgressClient {
338 fn connect(
339 dsn: &str,
340 table: &str,
341 mode: PostgresWriteMode,
342 conflict_key: Option<&str>,
343 ) -> Result<Self, AdapterError> {
344 let table = quote_relation(table)?;
345 let conflict_key = conflict_key.map(quote_identifier).transpose()?;
346 if matches!(mode, PostgresWriteMode::Upsert) && conflict_key.is_none() {
347 return Err(AdapterError::Config {
348 detail: "postgres upsert mode requires a conflict_key".into(),
349 });
350 }
351
352 Ok(Self {
353 runtime: build_runtime("postgres_egress_runtime_build")?,
354 pool: build_pool(dsn)?,
355 table,
356 mode,
357 conflict_key,
358 })
359 }
360
361 async fn flush_insert_or_upsert(
362 pool: Pool,
363 table: String,
364 mode: PostgresWriteMode,
365 conflict_key: Option<String>,
366 rows: Vec<Map<String, Value>>,
367 ) -> Result<PostgresFlushResult, AdapterError> {
368 let columns = union_keys(&rows)?;
369 if columns.is_empty() {
370 return Err(AdapterError::Config {
371 detail: "postgres egress row payload must contain at least one column".into(),
372 });
373 }
374
375 if let Some(conflict_key) = conflict_key.as_deref() {
376 let conflict_field = conflict_key.trim_matches('"');
377 for row in &rows {
378 if !row.contains_key(conflict_field) {
379 return Err(AdapterError::Config {
380 detail: format!(
381 "postgres upsert payload missing conflict_key field: {conflict_field}"
382 ),
383 });
384 }
385 }
386 }
387
388 let rows_json = Value::Array(rows.into_iter().map(Value::Object).collect());
389 let sql = build_insert_sql(&table, &columns, mode, conflict_key.as_deref())?;
390 let client = pool.get().await.map_err(|err| AdapterError::Postgres {
391 op: "postgres_pool_get",
392 detail: err.to_string(),
393 })?;
394 let statement =
395 client
396 .prepare_cached(sql.as_str())
397 .await
398 .map_err(|err| AdapterError::Postgres {
399 op: "postgres_prepare_cached",
400 detail: err.to_string(),
401 })?;
402 client
403 .execute(&statement, &[&Json(rows_json)])
404 .await
405 .map_err(|err| AdapterError::Postgres {
406 op: "postgres_execute",
407 detail: err.to_string(),
408 })
409 .map(|rows_affected| PostgresFlushResult { rows_affected })
410 }
411
412 async fn flush_copy(
413 pool: Pool,
414 table: String,
415 rows: Vec<Map<String, Value>>,
416 ) -> Result<PostgresFlushResult, AdapterError> {
417 let columns = union_keys(&rows)?;
418 if columns.is_empty() {
419 return Err(AdapterError::Config {
420 detail: "postgres copy payload must contain at least one column".into(),
421 });
422 }
423
424 let copy_sql = format!(
425 "COPY {} ({}) FROM STDIN WITH (FORMAT csv, NULL '\\N')",
426 table,
427 columns.join(", ")
428 );
429 let csv = render_copy_csv(&rows, &columns)?;
430 let client = pool.get().await.map_err(|err| AdapterError::Postgres {
431 op: "postgres_pool_get",
432 detail: err.to_string(),
433 })?;
434 let sink =
435 client
436 .copy_in(copy_sql.as_str())
437 .await
438 .map_err(|err| AdapterError::Postgres {
439 op: "postgres_copy_in",
440 detail: err.to_string(),
441 })?;
442 pin!(sink);
443 sink.send(Bytes::from(csv))
444 .await
445 .map_err(|err| AdapterError::Postgres {
446 op: "postgres_copy_send",
447 detail: err.to_string(),
448 })?;
449 let rows_affected = sink.as_mut().finish().await.map_err(|err| AdapterError::Postgres {
450 op: "postgres_copy_close",
451 detail: err.to_string(),
452 })?;
453 Ok(PostgresFlushResult { rows_affected })
454 }
455}
456
457impl PostgresEgressClient for AsyncPostgresEgressClient {
458 fn flush_rows(&mut self, rows: &[Map<String, Value>]) -> Result<PostgresFlushResult, AdapterError> {
459 let rows = rows.to_vec();
460 let pool = self.pool.clone();
461 let table = self.table.clone();
462 match self.mode {
463 PostgresWriteMode::Insert | PostgresWriteMode::Upsert => {
464 self.runtime.block_on(Self::flush_insert_or_upsert(
465 pool,
466 table,
467 self.mode,
468 self.conflict_key.clone(),
469 rows,
470 ))
471 }
472 PostgresWriteMode::Copy => self.runtime.block_on(Self::flush_copy(pool, table, rows)),
473 }
474 }
475}
476
477fn build_runtime(op: &'static str) -> Result<Runtime, AdapterError> {
478 tokio::runtime::Builder::new_current_thread()
479 .enable_all()
480 .build()
481 .map_err(|err| AdapterError::Postgres {
482 op,
483 detail: err.to_string(),
484 })
485}
486
487fn build_pool(dsn: &str) -> Result<Pool, AdapterError> {
488 let config = dsn
489 .parse::<tokio_postgres::Config>()
490 .map_err(|err| AdapterError::Config {
491 detail: format!("invalid postgres dsn: {err}"),
492 })?;
493 let manager = Manager::from_config(
494 config,
495 NoTls,
496 ManagerConfig {
497 recycling_method: RecyclingMethod::Fast,
498 },
499 );
500 Pool::builder(manager)
501 .max_size(16)
502 .build()
503 .map_err(|err| AdapterError::Postgres {
504 op: "postgres_pool_build",
505 detail: err.to_string(),
506 })
507}
508
509fn enforce_payload_limit(
510 op: &'static str,
511 payload: &[u8],
512 max_row_bytes: usize,
513) -> Result<Vec<u8>, AdapterError> {
514 if payload.len() > max_row_bytes {
515 return Err(AdapterError::Postgres {
516 op,
517 detail: format!(
518 "postgres row too large len={} max={max_row_bytes}",
519 payload.len()
520 ),
521 });
522 }
523 Ok(payload.to_vec())
524}
525
526fn quote_relation(value: &str) -> Result<String, AdapterError> {
527 let segments = value
528 .split('.')
529 .map(quote_identifier)
530 .collect::<Result<Vec<_>, _>>()?;
531 if segments.is_empty() {
532 return Err(AdapterError::Config {
533 detail: "postgres table must not be empty".into(),
534 });
535 }
536 Ok(segments.join("."))
537}
538
539fn quote_identifier(value: &str) -> Result<String, AdapterError> {
540 let value = value.trim();
541 let mut chars = value.chars();
542 let Some(first) = chars.next() else {
543 return Err(AdapterError::Config {
544 detail: "postgres identifier must not be empty".into(),
545 });
546 };
547 if !(first == '_' || first.is_ascii_alphabetic())
548 || !chars.all(|ch| ch == '_' || ch.is_ascii_alphanumeric())
549 {
550 return Err(AdapterError::Config {
551 detail: format!("unsupported postgres identifier: {value}"),
552 });
553 }
554 Ok(format!("\"{value}\""))
555}
556
557fn union_keys(rows: &[Map<String, Value>]) -> Result<Vec<String>, AdapterError> {
558 let mut keys = rows
559 .iter()
560 .flat_map(|row| row.keys())
561 .map(|key| quote_identifier(key))
562 .collect::<Result<Vec<_>, _>>()?;
563 keys.sort();
564 keys.dedup();
565 Ok(keys)
566}
567
568fn build_insert_sql(
569 table: &str,
570 columns: &[String],
571 mode: PostgresWriteMode,
572 conflict_key: Option<&str>,
573) -> Result<String, AdapterError> {
574 let columns_csv = columns.join(", ");
575 let mut sql = format!(
576 "INSERT INTO {table} ({columns_csv}) SELECT {columns_csv} FROM jsonb_populate_recordset(NULL::{table}, $1::jsonb)"
577 );
578 if matches!(mode, PostgresWriteMode::Upsert) {
579 let conflict_key = conflict_key.ok_or_else(|| AdapterError::Config {
580 detail: "postgres upsert mode requires a conflict_key".into(),
581 })?;
582 let updates = columns
583 .iter()
584 .filter(|column| column.as_str() != conflict_key)
585 .map(|column| format!("{column} = EXCLUDED.{column}"))
586 .collect::<Vec<_>>();
587 if updates.is_empty() {
588 sql.push_str(&format!(" ON CONFLICT ({conflict_key}) DO NOTHING"));
589 } else {
590 sql.push_str(&format!(
591 " ON CONFLICT ({conflict_key}) DO UPDATE SET {}",
592 updates.join(", ")
593 ));
594 }
595 }
596 Ok(sql)
597}
598
599fn render_copy_csv(
600 rows: &[Map<String, Value>],
601 columns: &[String],
602) -> Result<Vec<u8>, AdapterError> {
603 let mut csv = String::new();
604 for row in rows {
605 for (index, column) in columns.iter().enumerate() {
606 if index > 0 {
607 csv.push(',');
608 }
609 let key = column.trim_matches('"');
610 push_csv_field(&mut csv, row.get(key))?;
611 }
612 csv.push('\n');
613 }
614 Ok(csv.into_bytes())
615}
616
617fn push_csv_field(out: &mut String, value: Option<&Value>) -> Result<(), AdapterError> {
618 match value {
619 None | Some(Value::Null) => out.push_str("\\N"),
620 Some(Value::String(text)) => push_csv_string(out, text),
621 Some(Value::Bool(flag)) => push_csv_string(out, if *flag { "true" } else { "false" }),
622 Some(Value::Number(number)) => push_csv_string(out, &number.to_string()),
623 Some(other) => {
624 let text = serde_json::to_string(other).map_err(|err| AdapterError::Postgres {
625 op: "postgres_copy_serialize",
626 detail: err.to_string(),
627 })?;
628 push_csv_string(out, &text);
629 }
630 }
631 Ok(())
632}
633
634fn push_csv_string(out: &mut String, value: &str) {
635 let must_quote = value == "\\N"
636 || value.contains(',')
637 || value.contains('"')
638 || value.contains('\n')
639 || value.contains('\r');
640 if must_quote {
641 out.push('"');
642 for ch in value.chars() {
643 if ch == '"' {
644 out.push('"');
645 }
646 out.push(ch);
647 }
648 out.push('"');
649 } else {
650 out.push_str(value);
651 }
652}
653
654fn parse_rows_payload(payload: &[u8]) -> Result<Vec<Map<String, Value>>, AdapterError> {
655 let value: Value = serde_json::from_slice(payload).map_err(|err| AdapterError::Postgres {
656 op: "postgres_payload_parse",
657 detail: err.to_string(),
658 })?;
659 match value {
660 Value::Object(object) => Ok(vec![normalize_row_object(object)?]),
661 Value::Array(items) => items
662 .into_iter()
663 .map(|item| match item {
664 Value::Object(object) => normalize_row_object(object),
665 _ => Err(AdapterError::Config {
666 detail: "postgres payload arrays must contain JSON objects".into(),
667 }),
668 })
669 .collect(),
670 _ => Err(AdapterError::Config {
671 detail: "postgres payload must be a JSON object or array of objects".into(),
672 }),
673 }
674}
675
676fn normalize_row_object(
677 mut object: Map<String, Value>,
678) -> Result<Map<String, Value>, AdapterError> {
679 if let (Some(columns), Some(values)) = (object.remove("columns"), object.remove("values")) {
680 return row_from_columns_payload(columns, values);
681 }
682 Ok(object)
683}
684
685fn row_from_columns_payload(
686 columns: Value,
687 values: Value,
688) -> Result<Map<String, Value>, AdapterError> {
689 let Value::Array(columns) = columns else {
690 return Err(AdapterError::Config {
691 detail: "postgres columns payload must provide an array of column names".into(),
692 });
693 };
694 let Value::Array(values) = values else {
695 return Err(AdapterError::Config {
696 detail: "postgres columns payload must provide an array of values".into(),
697 });
698 };
699 if columns.len() != values.len() {
700 return Err(AdapterError::Config {
701 detail: "postgres columns payload column/value lengths do not match".into(),
702 });
703 }
704
705 let mut row = Map::new();
706 for (column, value) in columns.into_iter().zip(values) {
707 let Value::String(column_name) = column else {
708 return Err(AdapterError::Config {
709 detail: "postgres columns payload column names must be strings".into(),
710 });
711 };
712 row.insert(column_name, value);
713 }
714 Ok(row)
715}
716
717pub struct PostgresIngress {
722 name: String,
723 client: Box<dyn PostgresIngressClient>,
724 poll_interval: Duration,
725 status: Arc<Mutex<PostgresIngressStatus>>,
726}
727
728impl PostgresIngress {
729 pub fn new(
731 name: impl Into<String>,
732 dsn: &str,
733 query: &str,
734 poll_interval: Duration,
735 max_row_bytes: usize,
736 row_format: Option<&str>,
737 ) -> Result<Self, AdapterError> {
738 let row_format = PostgresRowFormat::parse(row_format)?;
739 let status = Arc::new(Mutex::new(PostgresIngressStatus::new(row_format)));
740 let client = AsyncPostgresIngressClient::connect(
741 dsn,
742 query,
743 row_format,
744 max_row_bytes,
745 Arc::clone(&status),
746 )?;
747 Ok(Self {
748 name: name.into(),
749 client: Box::new(client),
750 poll_interval,
751 status,
752 })
753 }
754
755 #[cfg(test)]
756 pub(crate) fn with_client(
758 name: impl Into<String>,
759 client: impl PostgresIngressClient + 'static,
760 poll_interval: Duration,
761 ) -> Self {
762 Self {
763 name: name.into(),
764 client: Box::new(client),
765 poll_interval,
766 status: Arc::new(Mutex::new(PostgresIngressStatus::new(PostgresRowFormat::Json))),
767 }
768 }
769}
770
771impl Adapter for PostgresIngress {
772 fn name(&self) -> &str {
773 &self.name
774 }
775
776 fn transport_kind(&self) -> &str {
777 "postgres"
778 }
779
780 fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
781 self.status.lock().unwrap().status_fields()
782 }
783}
784
785impl IngressAdapter for PostgresIngress {
786 fn read_next(&mut self, out: &mut [u8]) -> Result<Option<usize>, AdapterError> {
787 let Some(payload) = self.client.read_next(self.poll_interval)? else {
788 return Ok(None);
789 };
790 if payload.len() > out.len() {
791 return Err(AdapterError::Postgres {
792 op: "postgres_read_next",
793 detail: format!("payload too large len={} buf={}", payload.len(), out.len()),
794 });
795 }
796 out[..payload.len()].copy_from_slice(&payload);
797 Ok(Some(payload.len()))
798 }
799}
800
801pub struct PostgresEgress {
806 name: String,
807 client: Box<dyn PostgresEgressClient>,
808 buffered_rows: Vec<Map<String, Value>>,
809 status: Arc<Mutex<PostgresEgressStatus>>,
810}
811
812impl PostgresEgress {
813 pub fn new(
815 name: impl Into<String>,
816 dsn: &str,
817 table: &str,
818 mode: &str,
819 conflict_key: Option<&str>,
820 ) -> Result<Self, AdapterError> {
821 let mode = PostgresWriteMode::parse(mode)?;
822 let client = AsyncPostgresEgressClient::connect(
823 dsn,
824 table,
825 mode,
826 conflict_key,
827 )?;
828 Ok(Self {
829 name: name.into(),
830 client: Box::new(client),
831 buffered_rows: Vec::new(),
832 status: Arc::new(Mutex::new(PostgresEgressStatus::new(mode))),
833 })
834 }
835
836 #[cfg(test)]
837 pub(crate) fn with_client(
839 name: impl Into<String>,
840 client: impl PostgresEgressClient + 'static,
841 ) -> Self {
842 Self {
843 name: name.into(),
844 client: Box::new(client),
845 buffered_rows: Vec::new(),
846 status: Arc::new(Mutex::new(PostgresEgressStatus::new(PostgresWriteMode::Insert))),
847 }
848 }
849}
850
851impl Adapter for PostgresEgress {
852 fn name(&self) -> &str {
853 &self.name
854 }
855
856 fn transport_kind(&self) -> &str {
857 "postgres"
858 }
859
860 fn status_fields(&self) -> serde_json::Map<String, serde_json::Value> {
861 self.status.lock().unwrap().status_fields()
862 }
863}
864
865impl EgressAdapter for PostgresEgress {
866 fn write_msg(&mut self, msg: &[u8]) -> Result<(), AdapterError> {
867 self.buffered_rows.extend(parse_rows_payload(msg)?);
868 let mut status = self.status.lock().unwrap();
869 status.write_state = "buffering".to_string();
870 status.buffered_rows = self.buffered_rows.len() as u64;
871 Ok(())
872 }
873
874 fn flush(&mut self) -> Result<(), AdapterError> {
875 if self.buffered_rows.is_empty() {
876 return Ok(());
877 }
878 {
879 let mut status = self.status.lock().unwrap();
880 status.write_state = "flushing".to_string();
881 status.last_flush_rows = self.buffered_rows.len() as u64;
882 }
883 let result = self.client.flush_rows(&self.buffered_rows)?;
884 self.buffered_rows.clear();
885 let mut status = self.status.lock().unwrap();
886 status.write_state = "idle".to_string();
887 status.buffered_rows = 0;
888 status.last_rows_affected = Some(result.rows_affected);
889 Ok(())
890 }
891}