1use std::time::Duration;
2
3use redis::Value;
4
5use crate::{Adapter, AdapterError, EgressAdapter, IngressAdapter};
6
7pub(crate) trait RedisStreamsIngressClient: Send {
9 fn read_next(&mut self, timeout: Duration) -> Result<Option<(String, Vec<u8>)>, AdapterError>;
10}
11
12pub(crate) trait RedisStreamsEgressClient: Send {
14 fn append(&mut self, payload: &[u8]) -> Result<(), AdapterError>;
15}
16
17struct RedisIngressClient {
18 connection: redis::Connection,
19 stream: String,
20 last_id: String,
21}
22
23impl RedisIngressClient {
24 fn connect(
25 redis_url: &str,
26 stream: &str,
27 start_id: impl Into<String>,
28 ) -> Result<Self, AdapterError> {
29 let client = redis::Client::open(redis_url).map_err(|err| AdapterError::Redis {
30 op: "redis_client_open",
31 detail: err.to_string(),
32 })?;
33 let connection = client.get_connection().map_err(|err| AdapterError::Redis {
34 op: "redis_get_connection",
35 detail: err.to_string(),
36 })?;
37 Ok(Self {
38 connection,
39 stream: stream.into(),
40 last_id: start_id.into(),
41 })
42 }
43}
44
45impl RedisStreamsIngressClient for RedisIngressClient {
46 fn read_next(&mut self, timeout: Duration) -> Result<Option<(String, Vec<u8>)>, AdapterError> {
47 let timeout_ms = timeout.as_millis().min(usize::MAX as u128) as usize;
48 let reply: Value = redis::cmd("XREAD")
49 .arg("COUNT")
50 .arg(1)
51 .arg("BLOCK")
52 .arg(timeout_ms)
53 .arg("STREAMS")
54 .arg(&self.stream)
55 .arg(&self.last_id)
56 .query(&mut self.connection)
57 .map_err(|err| AdapterError::Redis {
58 op: "redis_xread",
59 detail: err.to_string(),
60 })?;
61 let Some((id, payload)) = extract_stream_payload(reply)? else {
62 return Ok(None);
63 };
64 self.last_id = id.clone();
65 Ok(Some((id, payload)))
66 }
67}
68
69struct RedisEgressClient {
70 connection: redis::Connection,
71 stream: String,
72}
73
74impl RedisEgressClient {
75 fn connect(redis_url: &str, stream: &str) -> Result<Self, AdapterError> {
76 let client = redis::Client::open(redis_url).map_err(|err| AdapterError::Redis {
77 op: "redis_client_open",
78 detail: err.to_string(),
79 })?;
80 let connection = client.get_connection().map_err(|err| AdapterError::Redis {
81 op: "redis_get_connection",
82 detail: err.to_string(),
83 })?;
84 Ok(Self {
85 connection,
86 stream: stream.into(),
87 })
88 }
89}
90
91impl RedisStreamsEgressClient for RedisEgressClient {
92 fn append(&mut self, payload: &[u8]) -> Result<(), AdapterError> {
93 let _: String = redis::cmd("XADD")
94 .arg(&self.stream)
95 .arg("*")
96 .arg("payload")
97 .arg(payload)
98 .query(&mut self.connection)
99 .map_err(|err| AdapterError::Redis {
100 op: "redis_xadd",
101 detail: err.to_string(),
102 })?;
103 Ok(())
104 }
105}
106
107fn value_to_bytes(value: Value) -> Result<Vec<u8>, AdapterError> {
108 match value {
109 Value::BulkString(bytes) => Ok(bytes),
110 Value::SimpleString(text) => Ok(text.into_bytes()),
111 other => Err(AdapterError::Redis {
112 op: "redis_value_to_bytes",
113 detail: format!("unexpected redis value: {other:?}"),
114 }),
115 }
116}
117
118fn value_to_string(value: Value) -> Result<String, AdapterError> {
119 let bytes = value_to_bytes(value)?;
120 String::from_utf8(bytes).map_err(|err| AdapterError::Redis {
121 op: "redis_value_to_string",
122 detail: err.to_string(),
123 })
124}
125
126fn extract_stream_payload(value: Value) -> Result<Option<(String, Vec<u8>)>, AdapterError> {
127 let Value::Array(streams) = value else {
128 return match value {
129 Value::Nil => Ok(None),
130 other => Err(AdapterError::Redis {
131 op: "redis_extract_stream_payload",
132 detail: format!("unexpected xread reply: {other:?}"),
133 }),
134 };
135 };
136 let Some(stream_entry) = streams.into_iter().next() else {
137 return Ok(None);
138 };
139 let Value::Array(stream_fields) = stream_entry else {
140 return Err(AdapterError::Redis {
141 op: "redis_extract_stream_payload",
142 detail: "missing stream entry array".into(),
143 });
144 };
145 let mut stream_iter = stream_fields.into_iter();
146 let _stream_name = stream_iter.next();
147 let Some(messages_value) = stream_iter.next() else {
148 return Ok(None);
149 };
150 let Value::Array(messages) = messages_value else {
151 return Err(AdapterError::Redis {
152 op: "redis_extract_stream_payload",
153 detail: "missing messages array".into(),
154 });
155 };
156 let Some(message_value) = messages.into_iter().next() else {
157 return Ok(None);
158 };
159 let Value::Array(message_fields) = message_value else {
160 return Err(AdapterError::Redis {
161 op: "redis_extract_stream_payload",
162 detail: "missing message tuple".into(),
163 });
164 };
165 let mut message_iter = message_fields.into_iter();
166 let Some(id_value) = message_iter.next() else {
167 return Err(AdapterError::Redis {
168 op: "redis_extract_stream_payload",
169 detail: "missing message id".into(),
170 });
171 };
172 let id = value_to_string(id_value)?;
173 let Some(fields_value) = message_iter.next() else {
174 return Err(AdapterError::Redis {
175 op: "redis_extract_stream_payload",
176 detail: "missing message fields".into(),
177 });
178 };
179 let Value::Array(fields) = fields_value else {
180 return Err(AdapterError::Redis {
181 op: "redis_extract_stream_payload",
182 detail: "message fields must be an array".into(),
183 });
184 };
185 let mut fields_iter = fields.into_iter();
186 while let Some(field_name) = fields_iter.next() {
187 let Some(field_value) = fields_iter.next() else {
188 break;
189 };
190 if value_to_string(field_name)? == "payload" {
191 return Ok(Some((id, value_to_bytes(field_value)?)));
192 }
193 }
194 Err(AdapterError::Redis {
195 op: "redis_extract_stream_payload",
196 detail: "stream entry missing payload field".into(),
197 })
198}
199
200pub struct RedisStreamsIngress {
205 name: String,
206 client: Box<dyn RedisStreamsIngressClient>,
207 read_timeout: Duration,
208 max_message_bytes: usize,
209}
210
211impl RedisStreamsIngress {
212 pub fn new(
214 name: impl Into<String>,
215 redis_url: &str,
216 stream: &str,
217 start_id: &str,
218 read_timeout: Duration,
219 max_message_bytes: usize,
220 ) -> Result<Self, AdapterError> {
221 let client = RedisIngressClient::connect(redis_url, stream, start_id)?;
222 Ok(Self {
223 name: name.into(),
224 client: Box::new(client),
225 read_timeout,
226 max_message_bytes,
227 })
228 }
229
230 #[cfg(test)]
231 pub(crate) fn with_client(
233 name: impl Into<String>,
234 client: impl RedisStreamsIngressClient + 'static,
235 read_timeout: Duration,
236 max_message_bytes: usize,
237 ) -> Self {
238 Self {
239 name: name.into(),
240 client: Box::new(client),
241 read_timeout,
242 max_message_bytes,
243 }
244 }
245}
246
247impl Adapter for RedisStreamsIngress {
248 fn name(&self) -> &str {
249 &self.name
250 }
251
252 fn transport_kind(&self) -> &str {
253 "redis_streams"
254 }
255}
256
257impl IngressAdapter for RedisStreamsIngress {
258 fn read_next(&mut self, out: &mut [u8]) -> Result<Option<usize>, AdapterError> {
259 let Some((_id, payload)) = self.client.read_next(self.read_timeout)? else {
260 return Ok(None);
261 };
262 if payload.len() > self.max_message_bytes || payload.len() > out.len() {
263 return Err(AdapterError::Redis {
264 op: "redis_streams_read_next",
265 detail: format!(
266 "payload too large len={} max={} buf={}",
267 payload.len(),
268 self.max_message_bytes,
269 out.len()
270 ),
271 });
272 }
273 out[..payload.len()].copy_from_slice(&payload);
274 Ok(Some(payload.len()))
275 }
276}
277
278pub struct RedisStreamsEgress {
282 name: String,
283 client: Box<dyn RedisStreamsEgressClient>,
284}
285
286impl RedisStreamsEgress {
287 pub fn new(
289 name: impl Into<String>,
290 redis_url: &str,
291 stream: &str,
292 ) -> Result<Self, AdapterError> {
293 let client = RedisEgressClient::connect(redis_url, stream)?;
294 Ok(Self {
295 name: name.into(),
296 client: Box::new(client),
297 })
298 }
299
300 #[cfg(test)]
301 pub(crate) fn with_client(
303 name: impl Into<String>,
304 client: impl RedisStreamsEgressClient + 'static,
305 ) -> Self {
306 Self {
307 name: name.into(),
308 client: Box::new(client),
309 }
310 }
311}
312
313impl Adapter for RedisStreamsEgress {
314 fn name(&self) -> &str {
315 &self.name
316 }
317
318 fn transport_kind(&self) -> &str {
319 "redis_streams"
320 }
321}
322
323impl EgressAdapter for RedisStreamsEgress {
324 fn write_msg(&mut self, msg: &[u8]) -> Result<(), AdapterError> {
325 self.client.append(msg)
326 }
327
328 fn flush(&mut self) -> Result<(), AdapterError> {
329 Ok(())
330 }
331}