indexbus_platform_http/
server.rs1use crate::errors::Result;
19
20use std::sync::atomic::{AtomicU8, Ordering};
21use std::sync::Arc;
22
23pub const DEFAULT_HEALTHZ_PATH: &str = "/healthz";
25
26pub const DEFAULT_READYZ_PATH: &str = "/readyz";
28
29#[derive(Clone, Copy, Debug, Eq, PartialEq)]
31pub enum ReadinessState {
32 NotReady = 0,
34 Ready = 1,
36 ShuttingDown = 2,
38}
39
40impl ReadinessState {
41 fn from_u8(value: u8) -> Self {
42 match value {
43 1 => Self::Ready,
44 2 => Self::ShuttingDown,
45 _ => Self::NotReady,
46 }
47 }
48}
49
50#[derive(Clone, Debug, Default)]
59pub struct Readiness {
60 state: Arc<AtomicU8>,
61}
62
63impl Readiness {
64 pub fn new() -> Self {
66 Self::default()
67 }
68
69 pub fn state(&self) -> ReadinessState {
71 ReadinessState::from_u8(self.state.load(Ordering::Acquire))
72 }
73
74 pub fn is_ready(&self) -> bool {
78 self.state() == ReadinessState::Ready
79 }
80
81 pub fn mark_not_ready(&self) {
83 self.state
84 .store(ReadinessState::NotReady as u8, Ordering::Release);
85 }
86
87 pub fn mark_ready(&self) {
89 self.state
90 .store(ReadinessState::Ready as u8, Ordering::Release);
91 }
92
93 pub fn mark_shutting_down(&self) {
95 self.state
96 .store(ReadinessState::ShuttingDown as u8, Ordering::Release);
97 }
98}
99
100#[derive(Clone, Debug, Default)]
110pub struct HttpServer {
111 readiness: Readiness,
112 healthz_path: &'static str,
113 readyz_path: &'static str,
114}
115
116impl HttpServer {
117 pub fn new() -> Self {
119 Self {
120 readiness: Readiness::new(),
121 healthz_path: DEFAULT_HEALTHZ_PATH,
122 readyz_path: DEFAULT_READYZ_PATH,
123 }
124 }
125
126 pub fn readiness(&self) -> Readiness {
128 self.readiness.clone()
129 }
130
131 pub fn healthz_path(&self) -> &'static str {
133 self.healthz_path
134 }
135
136 pub fn readyz_path(&self) -> &'static str {
138 self.readyz_path
139 }
140
141 pub fn healthcheck(&self) -> Result<()> {
148 Ok(())
149 }
150}
151
152#[cfg(feature = "axum")]
153impl HttpServer {
154 pub fn axum_router(&self) -> axum::Router {
156 axum_router(self.readiness.clone(), self.healthz_path, self.readyz_path)
157 }
158}
159
160#[cfg(feature = "axum")]
161pub fn axum_router(
169 readiness: Readiness,
170 healthz_path: &'static str,
171 readyz_path: &'static str,
172) -> axum::Router {
173 use axum::routing::get;
174
175 axum::Router::new()
176 .route(healthz_path, get(axum_healthz))
177 .route(readyz_path, get(axum_readyz))
178 .with_state(readiness)
179}
180
181#[cfg(feature = "axum")]
182async fn axum_healthz() -> (http::StatusCode, &'static str) {
183 (http::StatusCode::OK, "ok")
184}
185
186#[cfg(feature = "axum")]
187async fn axum_readyz(
188 axum::extract::State(readiness): axum::extract::State<Readiness>,
189) -> (http::StatusCode, &'static str) {
190 if readiness.is_ready() {
191 (http::StatusCode::OK, "ready")
192 } else {
193 (http::StatusCode::SERVICE_UNAVAILABLE, "not_ready")
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200
201 #[cfg(feature = "axum")]
202 fn block_on<T>(mut fut: impl std::future::Future<Output = T>) -> T {
203 use std::pin::Pin;
204 use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
205
206 fn no_op(_: *const ()) {}
207 fn clone(_: *const ()) -> RawWaker {
208 RawWaker::new(std::ptr::null(), &VTABLE)
209 }
210 static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, no_op, no_op, no_op);
211
212 let waker = unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) };
213 let mut cx = Context::from_waker(&waker);
214
215 let mut fut = unsafe { Pin::new_unchecked(&mut fut) };
218 loop {
219 match fut.as_mut().poll(&mut cx) {
220 Poll::Ready(val) => return val,
221 Poll::Pending => std::thread::yield_now(),
222 }
223 }
224 }
225
226 #[test]
227 fn readiness_defaults_to_not_ready() {
228 let readiness = Readiness::new();
229 assert_eq!(readiness.state(), ReadinessState::NotReady);
230 assert!(!readiness.is_ready());
231 }
232
233 #[test]
234 fn readiness_transitions() {
235 let readiness = Readiness::new();
236
237 readiness.mark_ready();
238 assert_eq!(readiness.state(), ReadinessState::Ready);
239 assert!(readiness.is_ready());
240
241 readiness.mark_shutting_down();
242 assert_eq!(readiness.state(), ReadinessState::ShuttingDown);
243 assert!(!readiness.is_ready());
244
245 readiness.mark_not_ready();
246 assert_eq!(readiness.state(), ReadinessState::NotReady);
247 assert!(!readiness.is_ready());
248 }
249
250 #[test]
251 fn http_server_exposes_readiness() {
252 let server = HttpServer::new();
253 let readiness = server.readiness();
254 readiness.mark_ready();
255 assert!(server.readiness().is_ready());
256 }
257
258 #[cfg(feature = "axum")]
259 #[test]
260 fn axum_routes_expose_healthz_and_readyz() {
261 use axum::body::Body;
262 use http::Request;
263 use tower::ServiceExt;
264
265 let server = HttpServer::new();
266 let app = server.axum_router();
267
268 let res = block_on(
269 app.clone().oneshot(
270 Request::builder()
271 .method("GET")
272 .uri(server.healthz_path())
273 .body(Body::empty())
274 .unwrap(),
275 ),
276 )
277 .unwrap();
278 assert_eq!(res.status(), http::StatusCode::OK);
279
280 let res = block_on(
281 app.clone().oneshot(
282 Request::builder()
283 .method("GET")
284 .uri(server.readyz_path())
285 .body(Body::empty())
286 .unwrap(),
287 ),
288 )
289 .unwrap();
290 assert_eq!(res.status(), http::StatusCode::SERVICE_UNAVAILABLE);
291
292 server.readiness().mark_ready();
293 let res = block_on(
294 app.oneshot(
295 Request::builder()
296 .method("GET")
297 .uri(server.readyz_path())
298 .body(Body::empty())
299 .unwrap(),
300 ),
301 )
302 .unwrap();
303 assert_eq!(res.status(), http::StatusCode::OK);
304 }
305
306 #[cfg(feature = "axum")]
307 #[test]
308 fn axum_router_allows_custom_paths() {
309 use axum::body::Body;
310 use http::Request;
311 use tower::ServiceExt;
312
313 let readiness = Readiness::new();
314 let app = axum_router(readiness.clone(), "/_h", "/_r");
315
316 let res = block_on(
317 app.clone().oneshot(
318 Request::builder()
319 .method("GET")
320 .uri("/_h")
321 .body(Body::empty())
322 .unwrap(),
323 ),
324 )
325 .unwrap();
326 assert_eq!(res.status(), http::StatusCode::OK);
327
328 readiness.mark_ready();
329 let res = block_on(
330 app.oneshot(
331 Request::builder()
332 .method("GET")
333 .uri("/_r")
334 .body(Body::empty())
335 .unwrap(),
336 ),
337 )
338 .unwrap();
339 assert_eq!(res.status(), http::StatusCode::OK);
340 }
341}