1use std::time::Duration;
2
3use futures_util::StreamExt;
4use tokio::runtime::Runtime;
5
6use crate::{Adapter, AdapterError, EgressAdapter, IngressAdapter};
7
8pub(crate) trait NatsIngressClient: Send {
10 fn read_next(&mut self, timeout: Duration) -> Result<Option<Vec<u8>>, AdapterError>;
11}
12
13pub(crate) trait NatsEgressClient: Send {
15 fn publish(&mut self, payload: &[u8]) -> Result<(), AdapterError>;
16 fn flush(&mut self) -> Result<(), AdapterError>;
17}
18
19struct AsyncNatsIngressClient {
20 runtime: Runtime,
21 subscriber: async_nats::Subscriber,
22}
23
24impl AsyncNatsIngressClient {
25 fn connect(
26 server_url: &str,
27 subject: &str,
28 queue_group: Option<&str>,
29 ) -> Result<Self, AdapterError> {
30 let runtime = tokio::runtime::Builder::new_current_thread()
31 .enable_all()
32 .build()
33 .map_err(|err| AdapterError::Nats {
34 op: "nats_runtime_build",
35 detail: err.to_string(),
36 })?;
37 let (client, subscriber) = runtime.block_on(async {
38 let client =
39 async_nats::connect(server_url)
40 .await
41 .map_err(|err| AdapterError::Nats {
42 op: "nats_connect",
43 detail: err.to_string(),
44 })?;
45 let subscriber = match queue_group {
46 Some(group) => client
47 .queue_subscribe(subject.to_string(), group.to_string())
48 .await
49 .map_err(|err| AdapterError::Nats {
50 op: "nats_queue_subscribe",
51 detail: err.to_string(),
52 })?,
53 None => client.subscribe(subject.to_string()).await.map_err(|err| {
54 AdapterError::Nats {
55 op: "nats_subscribe",
56 detail: err.to_string(),
57 }
58 })?,
59 };
60 Ok::<_, AdapterError>((client, subscriber))
61 })?;
62 drop(client);
63 Ok(Self {
64 runtime,
65 subscriber,
66 })
67 }
68}
69
70impl NatsIngressClient for AsyncNatsIngressClient {
71 fn read_next(&mut self, timeout: Duration) -> Result<Option<Vec<u8>>, AdapterError> {
72 let subscriber = &mut self.subscriber;
73 self.runtime.block_on(async move {
74 match tokio::time::timeout(timeout, subscriber.next()).await {
75 Ok(Some(message)) => Ok(Some(message.payload.to_vec())),
76 Ok(None) => Ok(None),
77 Err(_) => Ok(None),
78 }
79 })
80 }
81}
82
83struct AsyncNatsEgressClient {
84 runtime: Runtime,
85 client: async_nats::Client,
86 subject: String,
87}
88
89impl AsyncNatsEgressClient {
90 fn connect(server_url: &str, subject: impl Into<String>) -> Result<Self, AdapterError> {
91 let runtime = tokio::runtime::Builder::new_current_thread()
92 .enable_all()
93 .build()
94 .map_err(|err| AdapterError::Nats {
95 op: "nats_runtime_build",
96 detail: err.to_string(),
97 })?;
98 let client = runtime.block_on(async {
99 async_nats::connect(server_url)
100 .await
101 .map_err(|err| AdapterError::Nats {
102 op: "nats_connect",
103 detail: err.to_string(),
104 })
105 })?;
106 Ok(Self {
107 runtime,
108 client,
109 subject: subject.into(),
110 })
111 }
112}
113
114impl NatsEgressClient for AsyncNatsEgressClient {
115 fn publish(&mut self, payload: &[u8]) -> Result<(), AdapterError> {
116 let client = self.client.clone();
117 let subject = self.subject.clone();
118 let payload = payload.to_vec();
119 self.runtime.block_on(async move {
120 client
121 .publish(subject, payload.into())
122 .await
123 .map_err(|err| AdapterError::Nats {
124 op: "nats_publish",
125 detail: err.to_string(),
126 })
127 })
128 }
129
130 fn flush(&mut self) -> Result<(), AdapterError> {
131 let client = self.client.clone();
132 self.runtime.block_on(async move {
133 client.flush().await.map_err(|err| AdapterError::Nats {
134 op: "nats_flush",
135 detail: err.to_string(),
136 })
137 })
138 }
139}
140
141pub struct NatsIngress {
146 name: String,
147 client: Box<dyn NatsIngressClient>,
148 read_timeout: Duration,
149 max_message_bytes: usize,
150}
151
152impl NatsIngress {
153 pub fn new(
155 name: impl Into<String>,
156 server_url: &str,
157 subject: &str,
158 queue_group: Option<&str>,
159 read_timeout: Duration,
160 max_message_bytes: usize,
161 ) -> Result<Self, AdapterError> {
162 let client = AsyncNatsIngressClient::connect(server_url, subject, queue_group)?;
163 Ok(Self {
164 name: name.into(),
165 client: Box::new(client),
166 read_timeout,
167 max_message_bytes,
168 })
169 }
170
171 #[cfg(test)]
172 pub(crate) fn with_client(
174 name: impl Into<String>,
175 client: impl NatsIngressClient + 'static,
176 read_timeout: Duration,
177 max_message_bytes: usize,
178 ) -> Self {
179 Self {
180 name: name.into(),
181 client: Box::new(client),
182 read_timeout,
183 max_message_bytes,
184 }
185 }
186}
187
188impl Adapter for NatsIngress {
189 fn name(&self) -> &str {
190 &self.name
191 }
192
193 fn transport_kind(&self) -> &str {
194 "nats"
195 }
196}
197
198impl IngressAdapter for NatsIngress {
199 fn read_next(&mut self, out: &mut [u8]) -> Result<Option<usize>, AdapterError> {
200 let Some(payload) = self.client.read_next(self.read_timeout)? else {
201 return Ok(None);
202 };
203 if payload.len() > self.max_message_bytes || payload.len() > out.len() {
204 return Err(AdapterError::Nats {
205 op: "nats_read_next",
206 detail: format!(
207 "payload too large len={} max={} buf={}",
208 payload.len(),
209 self.max_message_bytes,
210 out.len()
211 ),
212 });
213 }
214 out[..payload.len()].copy_from_slice(&payload);
215 Ok(Some(payload.len()))
216 }
217}
218
219pub struct NatsEgress {
223 name: String,
224 client: Box<dyn NatsEgressClient>,
225}
226
227impl NatsEgress {
228 pub fn new(
230 name: impl Into<String>,
231 server_url: &str,
232 subject: &str,
233 ) -> Result<Self, AdapterError> {
234 let client = AsyncNatsEgressClient::connect(server_url, subject)?;
235 Ok(Self {
236 name: name.into(),
237 client: Box::new(client),
238 })
239 }
240
241 #[cfg(test)]
242 pub(crate) fn with_client(
244 name: impl Into<String>,
245 client: impl NatsEgressClient + 'static,
246 ) -> Self {
247 Self {
248 name: name.into(),
249 client: Box::new(client),
250 }
251 }
252}
253
254impl Adapter for NatsEgress {
255 fn name(&self) -> &str {
256 &self.name
257 }
258
259 fn transport_kind(&self) -> &str {
260 "nats"
261 }
262}
263
264impl EgressAdapter for NatsEgress {
265 fn write_msg(&mut self, msg: &[u8]) -> Result<(), AdapterError> {
266 self.client.publish(msg)
267 }
268
269 fn flush(&mut self) -> Result<(), AdapterError> {
270 self.client.flush()
271 }
272}