byteor_adapters/
grpc.rs

1use std::net::{SocketAddr, TcpListener};
2use std::sync::mpsc::{self, Receiver, Sender};
3use std::thread::JoinHandle;
4use std::time::Duration;
5
6use tokio::runtime::Runtime;
7use tokio::sync::oneshot;
8use tokio_stream::wrappers::TcpListenerStream;
9use tonic::transport::{Endpoint, Server};
10use tonic::{Request, Response, Status};
11
12use crate::{Adapter, AdapterError, EgressAdapter, IngressAdapter};
13
14mod grpc_proto {
15    #![allow(unreachable_pub)]
16
17    tonic::include_proto!("byteor.adapters.v1");
18}
19
20#[derive(Clone)]
21struct GrpcIngressService {
22    tx: Sender<Vec<u8>>,
23    max_message_bytes: usize,
24}
25
26#[tonic::async_trait]
27impl grpc_proto::message_bridge_server::MessageBridge for GrpcIngressService {
28    async fn publish(
29        &self,
30        request: Request<grpc_proto::Message>,
31    ) -> Result<Response<grpc_proto::PublishAck>, Status> {
32        let payload = request.into_inner().payload;
33        if payload.len() > self.max_message_bytes {
34            return Err(Status::resource_exhausted(format!(
35                "payload too large len={} max={}",
36                payload.len(),
37                self.max_message_bytes
38            )));
39        }
40
41        self.tx
42            .send(payload)
43            .map_err(|_| Status::unavailable("grpc ingress receiver dropped"))?;
44
45        Ok(Response::new(grpc_proto::PublishAck { accepted: true }))
46    }
47}
48
49/// Minimal blocking gRPC ingress adapter.
50///
51/// This adapter exposes a unary `Publish(bytes)` service and surfaces each request payload as one
52/// ingress message.
53pub struct GrpcIngress {
54    name: String,
55    addr: SocketAddr,
56    rx: Receiver<Vec<u8>>,
57    shutdown: Option<oneshot::Sender<()>>,
58    thread: Option<JoinHandle<()>>,
59}
60
61impl GrpcIngress {
62    /// Bind a blocking gRPC ingress adapter.
63    pub fn bind(
64        name: impl Into<String>,
65        bind: &str,
66        max_message_bytes: usize,
67    ) -> Result<Self, AdapterError> {
68        let listener = TcpListener::bind(bind).map_err(|source| AdapterError::Io {
69            op: "grpc_bind",
70            source,
71        })?;
72        listener
73            .set_nonblocking(true)
74            .map_err(|source| AdapterError::Io {
75                op: "grpc_set_nonblocking",
76                source,
77            })?;
78        let addr = listener.local_addr().map_err(|source| AdapterError::Io {
79            op: "grpc_local_addr",
80            source,
81        })?;
82
83        let (tx, rx) = mpsc::channel();
84        let (shutdown_tx, shutdown_rx) = oneshot::channel();
85        let (ready_tx, ready_rx) = mpsc::channel();
86        let name = name.into();
87        let thread = std::thread::spawn(move || {
88            let runtime = tokio::runtime::Builder::new_multi_thread()
89                .enable_all()
90                .build()
91                .expect("grpc ingress runtime");
92
93            runtime.block_on(async move {
94                let listener = tokio::net::TcpListener::from_std(listener)
95                    .expect("grpc ingress tokio listener");
96                let incoming = TcpListenerStream::new(listener);
97                let service = GrpcIngressService {
98                    tx,
99                    max_message_bytes,
100                };
101                let _ = ready_tx.send(());
102
103                let _ = Server::builder()
104                    .add_service(grpc_proto::message_bridge_server::MessageBridgeServer::new(
105                        service,
106                    ))
107                    .serve_with_incoming_shutdown(incoming, async {
108                        let _ = shutdown_rx.await;
109                    })
110                    .await;
111            });
112        });
113
114                ready_rx
115                    .recv_timeout(Duration::from_secs(2))
116                    .map_err(|err| AdapterError::Grpc {
117                        op: "grpc_bind_ready",
118                        detail: err.to_string(),
119                    })?;
120
121        Ok(Self {
122            name,
123            addr,
124            rx,
125            shutdown: Some(shutdown_tx),
126            thread: Some(thread),
127        })
128    }
129
130    /// Return the bound socket address.
131    pub fn local_addr(&self) -> SocketAddr {
132        self.addr
133    }
134
135    /// Return a client endpoint URL for the adapter.
136    pub fn endpoint(&self) -> String {
137        format!("http://{}", self.addr)
138    }
139}
140
141impl Drop for GrpcIngress {
142    fn drop(&mut self) {
143        if let Some(shutdown) = self.shutdown.take() {
144            let _ = shutdown.send(());
145        }
146        if let Some(thread) = self.thread.take() {
147            let _ = thread.join();
148        }
149    }
150}
151
152impl Adapter for GrpcIngress {
153    fn name(&self) -> &str {
154        &self.name
155    }
156
157    fn transport_kind(&self) -> &str {
158        "grpc"
159    }
160}
161
162impl IngressAdapter for GrpcIngress {
163    fn read_next(&mut self, out: &mut [u8]) -> Result<Option<usize>, AdapterError> {
164        let msg = match self.rx.recv() {
165            Ok(msg) => msg,
166            Err(_) => return Ok(None),
167        };
168        if msg.len() > out.len() {
169            return Err(AdapterError::Grpc {
170                op: "grpc_read_next",
171                detail: format!("payload too large len={} buf={}", msg.len(), out.len()),
172            });
173        }
174        out[..msg.len()].copy_from_slice(&msg);
175        Ok(Some(msg.len()))
176    }
177}
178
179/// Minimal blocking gRPC egress adapter.
180///
181/// Each `write_msg()` performs one unary `Publish(bytes)` call to the configured endpoint.
182pub struct GrpcEgress {
183    name: String,
184    endpoint: String,
185    timeout: Duration,
186    runtime: Runtime,
187}
188
189impl GrpcEgress {
190    /// Create a blocking gRPC egress adapter.
191    pub fn new(
192        name: impl Into<String>,
193        endpoint: impl Into<String>,
194        timeout: Duration,
195    ) -> Result<Self, AdapterError> {
196        let endpoint = endpoint.into();
197        if !(endpoint.starts_with("http://") || endpoint.starts_with("https://")) {
198            return Err(AdapterError::Config {
199                detail: "grpc egress endpoint must start with http:// or https://".into(),
200            });
201        }
202
203        let runtime = tokio::runtime::Builder::new_current_thread()
204            .enable_all()
205            .build()
206            .map_err(|err| AdapterError::Grpc {
207                op: "grpc_runtime_build",
208                detail: err.to_string(),
209            })?;
210
211        Ok(Self {
212            name: name.into(),
213            endpoint,
214            timeout,
215            runtime,
216        })
217    }
218}
219
220impl Adapter for GrpcEgress {
221    fn name(&self) -> &str {
222        &self.name
223    }
224
225    fn transport_kind(&self) -> &str {
226        "grpc"
227    }
228}
229
230impl EgressAdapter for GrpcEgress {
231    fn write_msg(&mut self, msg: &[u8]) -> Result<(), AdapterError> {
232        let endpoint = self.endpoint.clone();
233        let timeout = self.timeout;
234        let payload = msg.to_vec();
235
236        self.runtime.block_on(async move {
237            let channel = Endpoint::from_shared(endpoint.clone())
238                .map_err(|err| AdapterError::Grpc {
239                    op: "grpc_endpoint_parse",
240                    detail: err.to_string(),
241                })?
242                .timeout(timeout)
243                .connect()
244                .await
245                .map_err(|err| AdapterError::Grpc {
246                    op: "grpc_connect",
247                    detail: err.to_string(),
248                })?;
249
250            let mut client = grpc_proto::message_bridge_client::MessageBridgeClient::new(channel);
251            client
252                .publish(grpc_proto::Message { payload })
253                .await
254                .map_err(|err| AdapterError::Grpc {
255                    op: "grpc_publish",
256                    detail: err.to_string(),
257                })?;
258            Ok(())
259        })
260    }
261
262    fn flush(&mut self) -> Result<(), AdapterError> {
263        Ok(())
264    }
265}