byteor_adapters/
redis_streams.rs

1use std::time::Duration;
2
3use redis::Value;
4
5use crate::{Adapter, AdapterError, EgressAdapter, IngressAdapter};
6
7/// Internal Redis Streams ingress client contract used by the adapter and crate-local tests.
8pub(crate) trait RedisStreamsIngressClient: Send {
9    fn read_next(&mut self, timeout: Duration) -> Result<Option<(String, Vec<u8>)>, AdapterError>;
10}
11
12/// Internal Redis Streams egress client contract used by the adapter and crate-local tests.
13pub(crate) trait RedisStreamsEgressClient: Send {
14    fn append(&mut self, payload: &[u8]) -> Result<(), AdapterError>;
15}
16
17struct RedisIngressClient {
18    connection: redis::Connection,
19    stream: String,
20    last_id: String,
21}
22
23impl RedisIngressClient {
24    fn connect(
25        redis_url: &str,
26        stream: &str,
27        start_id: impl Into<String>,
28    ) -> Result<Self, AdapterError> {
29        let client = redis::Client::open(redis_url).map_err(|err| AdapterError::Redis {
30            op: "redis_client_open",
31            detail: err.to_string(),
32        })?;
33        let connection = client.get_connection().map_err(|err| AdapterError::Redis {
34            op: "redis_get_connection",
35            detail: err.to_string(),
36        })?;
37        Ok(Self {
38            connection,
39            stream: stream.into(),
40            last_id: start_id.into(),
41        })
42    }
43}
44
45impl RedisStreamsIngressClient for RedisIngressClient {
46    fn read_next(&mut self, timeout: Duration) -> Result<Option<(String, Vec<u8>)>, AdapterError> {
47        let timeout_ms = timeout.as_millis().min(usize::MAX as u128) as usize;
48        let reply: Value = redis::cmd("XREAD")
49            .arg("COUNT")
50            .arg(1)
51            .arg("BLOCK")
52            .arg(timeout_ms)
53            .arg("STREAMS")
54            .arg(&self.stream)
55            .arg(&self.last_id)
56            .query(&mut self.connection)
57            .map_err(|err| AdapterError::Redis {
58                op: "redis_xread",
59                detail: err.to_string(),
60            })?;
61        let Some((id, payload)) = extract_stream_payload(reply)? else {
62            return Ok(None);
63        };
64        self.last_id = id.clone();
65        Ok(Some((id, payload)))
66    }
67}
68
69struct RedisEgressClient {
70    connection: redis::Connection,
71    stream: String,
72}
73
74impl RedisEgressClient {
75    fn connect(redis_url: &str, stream: &str) -> Result<Self, AdapterError> {
76        let client = redis::Client::open(redis_url).map_err(|err| AdapterError::Redis {
77            op: "redis_client_open",
78            detail: err.to_string(),
79        })?;
80        let connection = client.get_connection().map_err(|err| AdapterError::Redis {
81            op: "redis_get_connection",
82            detail: err.to_string(),
83        })?;
84        Ok(Self {
85            connection,
86            stream: stream.into(),
87        })
88    }
89}
90
91impl RedisStreamsEgressClient for RedisEgressClient {
92    fn append(&mut self, payload: &[u8]) -> Result<(), AdapterError> {
93        let _: String = redis::cmd("XADD")
94            .arg(&self.stream)
95            .arg("*")
96            .arg("payload")
97            .arg(payload)
98            .query(&mut self.connection)
99            .map_err(|err| AdapterError::Redis {
100                op: "redis_xadd",
101                detail: err.to_string(),
102            })?;
103        Ok(())
104    }
105}
106
107fn value_to_bytes(value: Value) -> Result<Vec<u8>, AdapterError> {
108    match value {
109        Value::BulkString(bytes) => Ok(bytes),
110        Value::SimpleString(text) => Ok(text.into_bytes()),
111        other => Err(AdapterError::Redis {
112            op: "redis_value_to_bytes",
113            detail: format!("unexpected redis value: {other:?}"),
114        }),
115    }
116}
117
118fn value_to_string(value: Value) -> Result<String, AdapterError> {
119    let bytes = value_to_bytes(value)?;
120    String::from_utf8(bytes).map_err(|err| AdapterError::Redis {
121        op: "redis_value_to_string",
122        detail: err.to_string(),
123    })
124}
125
126fn extract_stream_payload(value: Value) -> Result<Option<(String, Vec<u8>)>, AdapterError> {
127    let Value::Array(streams) = value else {
128        return match value {
129            Value::Nil => Ok(None),
130            other => Err(AdapterError::Redis {
131                op: "redis_extract_stream_payload",
132                detail: format!("unexpected xread reply: {other:?}"),
133            }),
134        };
135    };
136    let Some(stream_entry) = streams.into_iter().next() else {
137        return Ok(None);
138    };
139    let Value::Array(stream_fields) = stream_entry else {
140        return Err(AdapterError::Redis {
141            op: "redis_extract_stream_payload",
142            detail: "missing stream entry array".into(),
143        });
144    };
145    let mut stream_iter = stream_fields.into_iter();
146    let _stream_name = stream_iter.next();
147    let Some(messages_value) = stream_iter.next() else {
148        return Ok(None);
149    };
150    let Value::Array(messages) = messages_value else {
151        return Err(AdapterError::Redis {
152            op: "redis_extract_stream_payload",
153            detail: "missing messages array".into(),
154        });
155    };
156    let Some(message_value) = messages.into_iter().next() else {
157        return Ok(None);
158    };
159    let Value::Array(message_fields) = message_value else {
160        return Err(AdapterError::Redis {
161            op: "redis_extract_stream_payload",
162            detail: "missing message tuple".into(),
163        });
164    };
165    let mut message_iter = message_fields.into_iter();
166    let Some(id_value) = message_iter.next() else {
167        return Err(AdapterError::Redis {
168            op: "redis_extract_stream_payload",
169            detail: "missing message id".into(),
170        });
171    };
172    let id = value_to_string(id_value)?;
173    let Some(fields_value) = message_iter.next() else {
174        return Err(AdapterError::Redis {
175            op: "redis_extract_stream_payload",
176            detail: "missing message fields".into(),
177        });
178    };
179    let Value::Array(fields) = fields_value else {
180        return Err(AdapterError::Redis {
181            op: "redis_extract_stream_payload",
182            detail: "message fields must be an array".into(),
183        });
184    };
185    let mut fields_iter = fields.into_iter();
186    while let Some(field_name) = fields_iter.next() {
187        let Some(field_value) = fields_iter.next() else {
188            break;
189        };
190        if value_to_string(field_name)? == "payload" {
191            return Ok(Some((id, value_to_bytes(field_value)?)));
192        }
193    }
194    Err(AdapterError::Redis {
195        op: "redis_extract_stream_payload",
196        detail: "stream entry missing payload field".into(),
197    })
198}
199
200/// Minimal blocking Redis Streams ingress adapter.
201///
202/// This adapter reads from one Redis stream and exposes each `payload` field as one ingress
203/// message.
204pub struct RedisStreamsIngress {
205    name: String,
206    client: Box<dyn RedisStreamsIngressClient>,
207    read_timeout: Duration,
208    max_message_bytes: usize,
209}
210
211impl RedisStreamsIngress {
212    /// Create a blocking Redis Streams ingress adapter.
213    pub fn new(
214        name: impl Into<String>,
215        redis_url: &str,
216        stream: &str,
217        start_id: &str,
218        read_timeout: Duration,
219        max_message_bytes: usize,
220    ) -> Result<Self, AdapterError> {
221        let client = RedisIngressClient::connect(redis_url, stream, start_id)?;
222        Ok(Self {
223            name: name.into(),
224            client: Box::new(client),
225            read_timeout,
226            max_message_bytes,
227        })
228    }
229
230    #[cfg(test)]
231    /// Build a Redis Streams ingress adapter around a crate-local fake client.
232    pub(crate) fn with_client(
233        name: impl Into<String>,
234        client: impl RedisStreamsIngressClient + 'static,
235        read_timeout: Duration,
236        max_message_bytes: usize,
237    ) -> Self {
238        Self {
239            name: name.into(),
240            client: Box::new(client),
241            read_timeout,
242            max_message_bytes,
243        }
244    }
245}
246
247impl Adapter for RedisStreamsIngress {
248    fn name(&self) -> &str {
249        &self.name
250    }
251
252    fn transport_kind(&self) -> &str {
253        "redis_streams"
254    }
255}
256
257impl IngressAdapter for RedisStreamsIngress {
258    fn read_next(&mut self, out: &mut [u8]) -> Result<Option<usize>, AdapterError> {
259        let Some((_id, payload)) = self.client.read_next(self.read_timeout)? else {
260            return Ok(None);
261        };
262        if payload.len() > self.max_message_bytes || payload.len() > out.len() {
263            return Err(AdapterError::Redis {
264                op: "redis_streams_read_next",
265                detail: format!(
266                    "payload too large len={} max={} buf={}",
267                    payload.len(),
268                    self.max_message_bytes,
269                    out.len()
270                ),
271            });
272        }
273        out[..payload.len()].copy_from_slice(&payload);
274        Ok(Some(payload.len()))
275    }
276}
277
278/// Minimal blocking Redis Streams egress adapter.
279///
280/// Each `write_msg()` appends one stream entry with a single `payload` field.
281pub struct RedisStreamsEgress {
282    name: String,
283    client: Box<dyn RedisStreamsEgressClient>,
284}
285
286impl RedisStreamsEgress {
287    /// Create a blocking Redis Streams egress adapter.
288    pub fn new(
289        name: impl Into<String>,
290        redis_url: &str,
291        stream: &str,
292    ) -> Result<Self, AdapterError> {
293        let client = RedisEgressClient::connect(redis_url, stream)?;
294        Ok(Self {
295            name: name.into(),
296            client: Box::new(client),
297        })
298    }
299
300    #[cfg(test)]
301    /// Build a Redis Streams egress adapter around a crate-local fake client.
302    pub(crate) fn with_client(
303        name: impl Into<String>,
304        client: impl RedisStreamsEgressClient + 'static,
305    ) -> Self {
306        Self {
307            name: name.into(),
308            client: Box::new(client),
309        }
310    }
311}
312
313impl Adapter for RedisStreamsEgress {
314    fn name(&self) -> &str {
315        &self.name
316    }
317
318    fn transport_kind(&self) -> &str {
319        "redis_streams"
320    }
321}
322
323impl EgressAdapter for RedisStreamsEgress {
324    fn write_msg(&mut self, msg: &[u8]) -> Result<(), AdapterError> {
325        self.client.append(msg)
326    }
327
328    fn flush(&mut self) -> Result<(), AdapterError> {
329        Ok(())
330    }
331}