1use std::net::{SocketAddr, TcpListener};
2use std::sync::mpsc::{self, Receiver, Sender};
3use std::thread::JoinHandle;
4use std::time::Duration;
5
6use tokio::runtime::Runtime;
7use tokio::sync::oneshot;
8use tokio_stream::wrappers::TcpListenerStream;
9use tonic::transport::{Endpoint, Server};
10use tonic::{Request, Response, Status};
11
12use crate::{Adapter, AdapterError, EgressAdapter, IngressAdapter};
13
14mod grpc_proto {
15 #![allow(unreachable_pub)]
16
17 tonic::include_proto!("byteor.adapters.v1");
18}
19
20#[derive(Clone)]
21struct GrpcIngressService {
22 tx: Sender<Vec<u8>>,
23 max_message_bytes: usize,
24}
25
26#[tonic::async_trait]
27impl grpc_proto::message_bridge_server::MessageBridge for GrpcIngressService {
28 async fn publish(
29 &self,
30 request: Request<grpc_proto::Message>,
31 ) -> Result<Response<grpc_proto::PublishAck>, Status> {
32 let payload = request.into_inner().payload;
33 if payload.len() > self.max_message_bytes {
34 return Err(Status::resource_exhausted(format!(
35 "payload too large len={} max={}",
36 payload.len(),
37 self.max_message_bytes
38 )));
39 }
40
41 self.tx
42 .send(payload)
43 .map_err(|_| Status::unavailable("grpc ingress receiver dropped"))?;
44
45 Ok(Response::new(grpc_proto::PublishAck { accepted: true }))
46 }
47}
48
49pub struct GrpcIngress {
54 name: String,
55 addr: SocketAddr,
56 rx: Receiver<Vec<u8>>,
57 shutdown: Option<oneshot::Sender<()>>,
58 thread: Option<JoinHandle<()>>,
59}
60
61impl GrpcIngress {
62 pub fn bind(
64 name: impl Into<String>,
65 bind: &str,
66 max_message_bytes: usize,
67 ) -> Result<Self, AdapterError> {
68 let listener = TcpListener::bind(bind).map_err(|source| AdapterError::Io {
69 op: "grpc_bind",
70 source,
71 })?;
72 listener
73 .set_nonblocking(true)
74 .map_err(|source| AdapterError::Io {
75 op: "grpc_set_nonblocking",
76 source,
77 })?;
78 let addr = listener.local_addr().map_err(|source| AdapterError::Io {
79 op: "grpc_local_addr",
80 source,
81 })?;
82
83 let (tx, rx) = mpsc::channel();
84 let (shutdown_tx, shutdown_rx) = oneshot::channel();
85 let (ready_tx, ready_rx) = mpsc::channel();
86 let name = name.into();
87 let thread = std::thread::spawn(move || {
88 let runtime = tokio::runtime::Builder::new_multi_thread()
89 .enable_all()
90 .build()
91 .expect("grpc ingress runtime");
92
93 runtime.block_on(async move {
94 let listener = tokio::net::TcpListener::from_std(listener)
95 .expect("grpc ingress tokio listener");
96 let incoming = TcpListenerStream::new(listener);
97 let service = GrpcIngressService {
98 tx,
99 max_message_bytes,
100 };
101 let _ = ready_tx.send(());
102
103 let _ = Server::builder()
104 .add_service(grpc_proto::message_bridge_server::MessageBridgeServer::new(
105 service,
106 ))
107 .serve_with_incoming_shutdown(incoming, async {
108 let _ = shutdown_rx.await;
109 })
110 .await;
111 });
112 });
113
114 ready_rx
115 .recv_timeout(Duration::from_secs(2))
116 .map_err(|err| AdapterError::Grpc {
117 op: "grpc_bind_ready",
118 detail: err.to_string(),
119 })?;
120
121 Ok(Self {
122 name,
123 addr,
124 rx,
125 shutdown: Some(shutdown_tx),
126 thread: Some(thread),
127 })
128 }
129
130 pub fn local_addr(&self) -> SocketAddr {
132 self.addr
133 }
134
135 pub fn endpoint(&self) -> String {
137 format!("http://{}", self.addr)
138 }
139}
140
141impl Drop for GrpcIngress {
142 fn drop(&mut self) {
143 if let Some(shutdown) = self.shutdown.take() {
144 let _ = shutdown.send(());
145 }
146 if let Some(thread) = self.thread.take() {
147 let _ = thread.join();
148 }
149 }
150}
151
152impl Adapter for GrpcIngress {
153 fn name(&self) -> &str {
154 &self.name
155 }
156
157 fn transport_kind(&self) -> &str {
158 "grpc"
159 }
160}
161
162impl IngressAdapter for GrpcIngress {
163 fn read_next(&mut self, out: &mut [u8]) -> Result<Option<usize>, AdapterError> {
164 let msg = match self.rx.recv() {
165 Ok(msg) => msg,
166 Err(_) => return Ok(None),
167 };
168 if msg.len() > out.len() {
169 return Err(AdapterError::Grpc {
170 op: "grpc_read_next",
171 detail: format!("payload too large len={} buf={}", msg.len(), out.len()),
172 });
173 }
174 out[..msg.len()].copy_from_slice(&msg);
175 Ok(Some(msg.len()))
176 }
177}
178
179pub struct GrpcEgress {
183 name: String,
184 endpoint: String,
185 timeout: Duration,
186 runtime: Runtime,
187}
188
189impl GrpcEgress {
190 pub fn new(
192 name: impl Into<String>,
193 endpoint: impl Into<String>,
194 timeout: Duration,
195 ) -> Result<Self, AdapterError> {
196 let endpoint = endpoint.into();
197 if !(endpoint.starts_with("http://") || endpoint.starts_with("https://")) {
198 return Err(AdapterError::Config {
199 detail: "grpc egress endpoint must start with http:// or https://".into(),
200 });
201 }
202
203 let runtime = tokio::runtime::Builder::new_current_thread()
204 .enable_all()
205 .build()
206 .map_err(|err| AdapterError::Grpc {
207 op: "grpc_runtime_build",
208 detail: err.to_string(),
209 })?;
210
211 Ok(Self {
212 name: name.into(),
213 endpoint,
214 timeout,
215 runtime,
216 })
217 }
218}
219
220impl Adapter for GrpcEgress {
221 fn name(&self) -> &str {
222 &self.name
223 }
224
225 fn transport_kind(&self) -> &str {
226 "grpc"
227 }
228}
229
230impl EgressAdapter for GrpcEgress {
231 fn write_msg(&mut self, msg: &[u8]) -> Result<(), AdapterError> {
232 let endpoint = self.endpoint.clone();
233 let timeout = self.timeout;
234 let payload = msg.to_vec();
235
236 self.runtime.block_on(async move {
237 let channel = Endpoint::from_shared(endpoint.clone())
238 .map_err(|err| AdapterError::Grpc {
239 op: "grpc_endpoint_parse",
240 detail: err.to_string(),
241 })?
242 .timeout(timeout)
243 .connect()
244 .await
245 .map_err(|err| AdapterError::Grpc {
246 op: "grpc_connect",
247 detail: err.to_string(),
248 })?;
249
250 let mut client = grpc_proto::message_bridge_client::MessageBridgeClient::new(channel);
251 client
252 .publish(grpc_proto::Message { payload })
253 .await
254 .map_err(|err| AdapterError::Grpc {
255 op: "grpc_publish",
256 detail: err.to_string(),
257 })?;
258 Ok(())
259 })
260 }
261
262 fn flush(&mut self) -> Result<(), AdapterError> {
263 Ok(())
264 }
265}