1use std::io::{BufRead, Write};
2
3use crate::{Adapter, AdapterError, EgressAdapter, IngressAdapter};
4
5pub struct LineIngress<R> {
10 name: String,
11 inner: R,
12 scratch: Vec<u8>,
13}
14
15impl<R> LineIngress<R> {
16 pub fn new(name: impl Into<String>, inner: R) -> Self {
18 Self {
19 name: name.into(),
20 inner,
21 scratch: Vec::with_capacity(1024),
22 }
23 }
24
25 fn trim_line(mut line: &[u8]) -> &[u8] {
26 if let Some(stripped) = line.strip_suffix(b"\n") {
27 line = stripped;
28 }
29 if let Some(stripped) = line.strip_suffix(b"\r") {
30 line = stripped;
31 }
32 line
33 }
34}
35
36impl<R> Adapter for LineIngress<R> {
37 fn name(&self) -> &str {
38 &self.name
39 }
40
41 fn transport_kind(&self) -> &str {
42 "line"
43 }
44}
45
46impl<R: BufRead> IngressAdapter for LineIngress<R> {
47 fn read_next(&mut self, out: &mut [u8]) -> Result<Option<usize>, AdapterError> {
48 self.scratch.clear();
49
50 let n = self
51 .inner
52 .read_until(b'\n', &mut self.scratch)
53 .map_err(|e| AdapterError::Io {
54 op: "read_until",
55 source: e,
56 })?;
57
58 if n == 0 {
59 return Ok(None);
60 }
61
62 let line = Self::trim_line(&self.scratch);
63 let n = line.len().min(out.len());
64 out[..n].copy_from_slice(&line[..n]);
65 Ok(Some(n))
66 }
67}
68
69pub struct LineEgress<W> {
73 name: String,
74 inner: W,
75}
76
77impl<W> LineEgress<W> {
78 pub fn new(name: impl Into<String>, inner: W) -> Self {
80 Self {
81 name: name.into(),
82 inner,
83 }
84 }
85}
86
87impl<W> Adapter for LineEgress<W> {
88 fn name(&self) -> &str {
89 &self.name
90 }
91
92 fn transport_kind(&self) -> &str {
93 "line"
94 }
95}
96
97impl<W: Write> EgressAdapter for LineEgress<W> {
98 fn write_msg(&mut self, msg: &[u8]) -> Result<(), AdapterError> {
99 self.inner.write_all(msg).map_err(|e| AdapterError::Io {
100 op: "write_all",
101 source: e,
102 })?;
103 self.inner.write_all(b"\n").map_err(|e| AdapterError::Io {
104 op: "write_newline",
105 source: e,
106 })?;
107 Ok(())
108 }
109
110 fn flush(&mut self) -> Result<(), AdapterError> {
111 self.inner.flush().map_err(|e| AdapterError::Io {
112 op: "flush",
113 source: e,
114 })
115 }
116}