byteor_pipeline_kernel/
step.rs

1//! Step primitives.
2
3use crate::lane::{
4    LaneRx, LaneRxBorrow, LaneRxSlot, LaneSlot, LaneTx, LaneTxSlot, LaneTxSlotError, LaneTxWith,
5    LaneTxWithResult,
6};
7use crate::lane::{LaneTxError, LaneTxWithError};
8
9use byteor_pipeline_spec::OnFullV1;
10use indexbus_core::WaitStrategy;
11
12/// Outcome of a stage transform.
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14#[non_exhaustive]
15pub enum TransformOutcome {
16    /// Drop the message.
17    Drop,
18    /// Forward the original input.
19    ForwardInput,
20    /// Forward the first N bytes of the output buffer.
21    ForwardOutput(usize),
22}
23
24/// Receive → transform → publish primitive.
25pub fn recv_transform_publish_step<Rx, Tx, W, F>(
26    rx: &mut Rx,
27    tx: &mut Tx,
28    input: &mut [u8],
29    output: &mut [u8],
30    on_full: OnFullV1,
31    wait: &mut W,
32    mut transform: F,
33) -> core::result::Result<bool, LaneTxError>
34where
35    Rx: LaneRx,
36    Tx: LaneTx,
37    W: WaitStrategy,
38    F: FnMut(&[u8], &mut [u8]) -> TransformOutcome,
39{
40    let Some(n) = rx.recv(input) else {
41        return Ok(false);
42    };
43    let input = &input[..n];
44    match transform(input, output) {
45        TransformOutcome::Drop => {}
46        TransformOutcome::ForwardInput => {
47            publish_with_policy(tx, input, on_full, wait)?;
48        }
49        TransformOutcome::ForwardOutput(out_n) => {
50            let out_n = out_n.min(output.len());
51            publish_with_policy(tx, &output[..out_n], on_full, wait)?;
52        }
53    }
54
55    Ok(true)
56}
57
58/// Receive-with (borrowed) → transform → publish primitive.
59///
60/// This is the same logical primitive as [`recv_transform_publish_step`], but avoids copying the
61/// input bytes into an intermediate buffer.
62pub fn recv_with_transform_publish_step<Rx, Tx, F>(
63    rx: &mut Rx,
64    tx: &mut Tx,
65    output: &mut [u8],
66    on_full: OnFullV1,
67    wait: &mut impl WaitStrategy,
68    mut transform: F,
69) -> core::result::Result<bool, LaneTxError>
70where
71    Rx: LaneRxBorrow,
72    Tx: LaneTx,
73    F: FnMut(&[u8], &mut [u8]) -> TransformOutcome,
74{
75    let res = rx
76        .recv_with(|input| match transform(input, output) {
77            TransformOutcome::Drop => Ok(()),
78            TransformOutcome::ForwardInput => publish_with_policy(tx, input, on_full, wait),
79            TransformOutcome::ForwardOutput(out_n) => {
80                let out_n = out_n.min(output.len());
81                publish_with_policy(tx, &output[..out_n], on_full, wait)
82            }
83        })
84        .map_err(|_| LaneTxError::Failed)?;
85
86    match res {
87        None => Ok(false),
88        Some(r) => {
89            r?;
90            Ok(true)
91        }
92    }
93}
94
95/// Receive-with (borrowed) → transform → publish, up to `batch` messages.
96///
97/// Returns `Ok(true)` if at least one message was processed.
98pub fn recv_with_transform_publish_batch<Rx, Tx, F>(
99    rx: &mut Rx,
100    tx: &mut Tx,
101    output: &mut [u8],
102    on_full: OnFullV1,
103    wait: &mut impl WaitStrategy,
104    batch: usize,
105    mut transform: F,
106) -> core::result::Result<bool, LaneTxError>
107where
108    Rx: LaneRxBorrow,
109    Tx: LaneTx,
110    F: FnMut(&[u8], &mut [u8]) -> TransformOutcome,
111{
112    let batch = batch.max(1);
113    let mut did_any = false;
114
115    for _ in 0..batch {
116        let did = recv_with_transform_publish_step(rx, tx, output, on_full, wait, &mut transform)?;
117        if !did {
118            break;
119        }
120        did_any = true;
121    }
122
123    Ok(did_any)
124}
125
126/// Receive → (MapOk) transform → publish-with (slot write).
127///
128/// This avoids an intermediate output buffer copy by writing directly into the lane slot.
129pub fn recv_map_ok_publish_with_step<Rx, Tx>(
130    rx: &mut Rx,
131    tx: &mut Tx,
132    input: &mut [u8],
133    on_full: OnFullV1,
134    wait: &mut impl WaitStrategy,
135    map_ok: fn(&[u8], &mut [u8]) -> usize,
136) -> core::result::Result<bool, LaneTxError>
137where
138    Rx: LaneRx,
139    Tx: LaneTxWith,
140{
141    let Some(n) = rx.recv(input) else {
142        return Ok(false);
143    };
144    let input = &input[..n];
145    publish_with_policy_with(tx, on_full, wait, |slot| map_ok(input, slot))?;
146    Ok(true)
147}
148
149/// Receive-with (borrowed) → (MapOk) transform → publish-with (slot write).
150///
151/// This avoids both the input copy (borrowed receive) and the output copy (slot write).
152pub fn recv_with_map_ok_publish_with_step<Rx, Tx>(
153    rx: &mut Rx,
154    tx: &mut Tx,
155    on_full: OnFullV1,
156    wait: &mut impl WaitStrategy,
157    map_ok: fn(&[u8], &mut [u8]) -> usize,
158) -> core::result::Result<bool, LaneTxError>
159where
160    Rx: LaneRxBorrow,
161    Tx: LaneTxWith,
162{
163    let res = rx
164        .recv_with(|input| publish_with_policy_with(tx, on_full, wait, |slot| map_ok(input, slot)))
165        .map_err(|_| LaneTxError::Failed)?;
166    match res {
167        None => Ok(false),
168        Some(r) => {
169            r?;
170            Ok(true)
171        }
172    }
173}
174
175/// Receive-with (borrowed) → (Map) transform → publish-with (slot write), fallible.
176///
177/// This avoids both the input copy (borrowed receive) and the output copy (slot write), while
178/// allowing the stage to return an error without publishing.
179pub fn recv_with_map_publish_with_result_step<Rx, Tx, E>(
180    rx: &mut Rx,
181    tx: &mut Tx,
182    on_full: OnFullV1,
183    wait: &mut impl WaitStrategy,
184    map: impl FnMut(&[u8], &mut [u8]) -> core::result::Result<usize, E>,
185) -> core::result::Result<bool, LaneTxWithError<E>>
186where
187    Rx: LaneRxBorrow,
188    Tx: LaneTxWithResult,
189{
190    let mut map = map;
191    let res = rx
192        .recv_with(|input| {
193            publish_with_policy_with_result(tx, on_full, wait, |slot| map(input, slot))
194        })
195        .map_err(|_| LaneTxWithError::Failed)?;
196    match res {
197        None => Ok(false),
198        Some(r) => {
199            r?;
200            Ok(true)
201        }
202    }
203}
204
205/// Receive-with (borrowed) → forward → publish-with (slot write).
206///
207/// This avoids the intermediate input buffer copy by borrowing the input bytes and copying them
208/// directly into the publish slot.
209pub fn recv_with_forward_publish_with_step<Rx, Tx>(
210    rx: &mut Rx,
211    tx: &mut Tx,
212    on_full: OnFullV1,
213    wait: &mut impl WaitStrategy,
214) -> core::result::Result<bool, LaneTxError>
215where
216    Rx: LaneRxBorrow,
217    Tx: LaneTxWith,
218{
219    let res = rx
220        .recv_with(|input| {
221            publish_with_policy_with(tx, on_full, wait, |slot| {
222                let n = input.len().min(slot.len());
223                slot[..n].copy_from_slice(&input[..n]);
224                n
225            })
226        })
227        .map_err(|_| LaneTxError::Failed)?;
228    match res {
229        None => Ok(false),
230        Some(r) => {
231            r?;
232            Ok(true)
233        }
234    }
235}
236
237/// Receive-slot → forward → publish-slot.
238///
239/// If the TX is incompatible with the slot's pool, this falls back to copying the slot bytes
240/// into a normal `publish`.
241pub fn recv_slot_forward_publish_slot_step<Rx, Tx>(
242    rx: &mut Rx,
243    tx: &mut Tx,
244    on_full: OnFullV1,
245    wait: &mut impl WaitStrategy,
246) -> core::result::Result<bool, LaneTxError>
247where
248    Rx: LaneRxSlot,
249    Tx: LaneTxSlot<Rx::Slot> + LaneTx,
250{
251    let Some(slot) = rx.try_recv_slot().map_err(|_| LaneTxError::Failed)? else {
252        return Ok(false);
253    };
254
255    publish_slot_with_policy_or_copy(tx, slot, on_full, wait)?;
256    Ok(true)
257}
258
259/// Receive-slot → filter → (forward via slot if true).
260///
261/// The `filter` closure returns `true` to forward or `false` to drop.
262pub fn recv_slot_filter_forward_publish_slot_step<Rx, Tx, F>(
263    rx: &mut Rx,
264    tx: &mut Tx,
265    on_full: OnFullV1,
266    wait: &mut impl WaitStrategy,
267    mut filter: F,
268) -> core::result::Result<bool, LaneTxError>
269where
270    Rx: LaneRxSlot,
271    Tx: LaneTxSlot<Rx::Slot> + LaneTx,
272    Rx::Slot: LaneSlot,
273    F: FnMut(&[u8]) -> bool,
274{
275    let Some(slot) = rx.try_recv_slot().map_err(|_| LaneTxError::Failed)? else {
276        return Ok(false);
277    };
278
279    if !filter(slot.as_bytes()) {
280        // Dropping releases the slot back to the pool.
281        return Ok(true);
282    }
283
284    publish_slot_with_policy_or_copy(tx, slot, on_full, wait)?;
285    Ok(true)
286}
287
288/// Receive-slot → inspect → (forward via slot).
289///
290/// The `inspect` closure returns `true` to forward or `false` to drop.
291pub fn recv_slot_inspect_forward_publish_slot_step<Rx, Tx, F>(
292    rx: &mut Rx,
293    tx: &mut Tx,
294    on_full: OnFullV1,
295    wait: &mut impl WaitStrategy,
296    mut inspect: F,
297) -> core::result::Result<bool, LaneTxError>
298where
299    Rx: LaneRxSlot,
300    Tx: LaneTxSlot<Rx::Slot> + LaneTx,
301    Rx::Slot: LaneSlot,
302    F: FnMut(&[u8]) -> bool,
303{
304    let Some(slot) = rx.try_recv_slot().map_err(|_| LaneTxError::Failed)? else {
305        return Ok(false);
306    };
307
308    if !inspect(slot.as_bytes()) {
309        return Ok(true);
310    }
311
312    publish_slot_with_policy_or_copy(tx, slot, on_full, wait)?;
313    Ok(true)
314}
315
316/// Receive-slot → transform → (forward via slot on `ForwardInput`).
317///
318/// This enables a transform stage to do true zero-copy forwarding for no-op cases.
319///
320/// - `Drop` drops the slot (releasing it back to the pool).
321/// - `ForwardInput` forwards the input slot to `tx` when compatible, otherwise falls back to copying.
322/// - `ForwardOutput(n)` publishes the output bytes (copying) and drops the input slot.
323pub fn recv_slot_transform_forward_publish_slot_step<Rx, Tx, F>(
324    rx: &mut Rx,
325    tx: &mut Tx,
326    output: &mut [u8],
327    on_full: OnFullV1,
328    wait: &mut impl WaitStrategy,
329    mut transform: F,
330) -> core::result::Result<bool, LaneTxError>
331where
332    Rx: LaneRxSlot,
333    Tx: LaneTxSlot<Rx::Slot> + LaneTx,
334    Rx::Slot: LaneSlot,
335    F: FnMut(&[u8], &mut [u8]) -> TransformOutcome,
336{
337    let Some(slot) = rx.try_recv_slot().map_err(|_| LaneTxError::Failed)? else {
338        return Ok(false);
339    };
340
341    match transform(slot.as_bytes(), output) {
342        TransformOutcome::Drop => {
343            // Dropping releases the slot back to the pool.
344        }
345        TransformOutcome::ForwardInput => {
346            publish_slot_with_policy_or_copy(tx, slot, on_full, wait)?;
347        }
348        TransformOutcome::ForwardOutput(out_n) => {
349            let out_n = out_n.min(output.len());
350            publish_with_policy(tx, &output[..out_n], on_full, wait)?;
351            // Dropping releases the slot back to the pool.
352        }
353    }
354
355    Ok(true)
356}
357
358/// Publish an owned slot with `on_full` backpressure semantics.
359///
360/// On `Incompatible`, falls back to copying the slot bytes into `LaneTx::publish`.
361#[inline]
362pub fn publish_slot_with_policy_or_copy<Tx, S>(
363    tx: &mut Tx,
364    mut slot: S,
365    on_full: OnFullV1,
366    wait: &mut impl WaitStrategy,
367) -> core::result::Result<(), LaneTxError>
368where
369    Tx: LaneTxSlot<S> + LaneTx,
370    S: LaneSlot,
371{
372    loop {
373        match tx.publish_slot(slot) {
374            Ok(()) => {
375                wait.reset();
376                return Ok(());
377            }
378            Err(LaneTxSlotError::Full(s)) => match on_full {
379                OnFullV1::Drop => return Ok(()),
380                OnFullV1::Block => {
381                    slot = s;
382                    wait.wait();
383                    continue;
384                }
385            },
386            Err(LaneTxSlotError::Incompatible(s)) => {
387                // Fall back to copying into the destination.
388                publish_with_policy_wait(tx, s.as_bytes(), on_full, wait)?;
389                return Ok(());
390            }
391            Err(LaneTxSlotError::Failed(_s)) => return Err(LaneTxError::Failed),
392        }
393    }
394}
395
396/// Receive → transform → publish, up to `batch` messages.
397///
398/// Returns `Ok(true)` if at least one message was processed.
399#[allow(clippy::too_many_arguments)]
400pub fn recv_transform_publish_batch<Rx, Tx, F>(
401    rx: &mut Rx,
402    tx: &mut Tx,
403    input: &mut [u8],
404    output: &mut [u8],
405    on_full: OnFullV1,
406    wait: &mut impl WaitStrategy,
407    batch: usize,
408    mut transform: F,
409) -> core::result::Result<bool, LaneTxError>
410where
411    Rx: LaneRx,
412    Tx: LaneTx,
413    F: FnMut(&[u8], &mut [u8]) -> TransformOutcome,
414{
415    let batch = batch.max(1);
416    let mut did_any = false;
417
418    for _ in 0..batch {
419        let Some(n) = rx.recv(input) else {
420            break;
421        };
422        did_any = true;
423        let input = &input[..n];
424        match transform(input, output) {
425            TransformOutcome::Drop => {}
426            TransformOutcome::ForwardInput => {
427                publish_with_policy_wait(tx, input, on_full, wait)?;
428            }
429            TransformOutcome::ForwardOutput(out_n) => {
430                let out_n = out_n.min(output.len());
431                publish_with_policy_wait(tx, &output[..out_n], on_full, wait)?;
432            }
433        }
434    }
435
436    Ok(did_any)
437}
438
439/// Receive from one of N RX lanes (round-robin) → publish.
440///
441/// This is a building block for fan-in roles (merge/join) without requiring allocs in the hot
442/// loop. The caller owns the round-robin cursor.
443pub fn recv_select_publish_step<Tx>(
444    rxs: &mut [&mut dyn LaneRx],
445    rr: &mut usize,
446    tx: &mut Tx,
447    buf: &mut [u8],
448    on_full: OnFullV1,
449    wait: &mut impl WaitStrategy,
450) -> core::result::Result<bool, LaneTxError>
451where
452    Tx: LaneTx,
453{
454    if rxs.is_empty() {
455        return Ok(false);
456    }
457
458    let n = rxs.len();
459    let start = (*rr) % n;
460
461    for offset in 0..n {
462        let idx = (start + offset) % n;
463        let Some(msg_n) = rxs[idx].recv(buf) else {
464            continue;
465        };
466        publish_with_policy(tx, &buf[..msg_n], on_full, wait)?;
467        *rr = (idx + 1) % n;
468        return Ok(true);
469    }
470
471    // No work: still advance the cursor so that the first-lane check doesn't stay pinned.
472    *rr = (start + 1) % n;
473    Ok(false)
474}
475
476/// Receive from one of N RX lanes (round-robin) → publish, up to `batch` messages.
477///
478/// Returns `Ok(true)` if at least one message was processed.
479pub fn recv_select_publish_batch<Tx>(
480    rxs: &mut [&mut dyn LaneRx],
481    rr: &mut usize,
482    tx: &mut Tx,
483    buf: &mut [u8],
484    on_full: OnFullV1,
485    wait: &mut impl WaitStrategy,
486    batch: usize,
487) -> core::result::Result<bool, LaneTxError>
488where
489    Tx: LaneTx,
490{
491    let batch = batch.max(1);
492    let mut did_any = false;
493
494    for _ in 0..batch {
495        let did = recv_select_publish_step(rxs, rr, tx, buf, on_full, wait)?;
496        if !did {
497            break;
498        }
499        did_any = true;
500    }
501
502    Ok(did_any)
503}
504
505#[inline]
506/// Publish a message to a lane, applying `on_full` backpressure semantics with an explicit wait
507/// strategy.
508///
509/// - On success, resets `wait`.
510/// - On `Full` + `OnFullV1::Block`, calls `wait.wait()` and retries.
511/// - On `Full` + `OnFullV1::Drop`, returns `Ok(())` (drop the message).
512pub fn publish_with_policy<Tx: LaneTx>(
513    tx: &mut Tx,
514    msg: &[u8],
515    on_full: OnFullV1,
516    wait: &mut impl WaitStrategy,
517) -> core::result::Result<(), LaneTxError> {
518    publish_with_policy_wait(tx, msg, on_full, wait)
519}
520
521#[inline]
522fn publish_with_policy_with<Tx: LaneTxWith>(
523    tx: &mut Tx,
524    on_full: OnFullV1,
525    wait: &mut impl WaitStrategy,
526    mut fill: impl FnMut(&mut [u8]) -> usize,
527) -> core::result::Result<(), LaneTxError> {
528    loop {
529        match tx.publish_with(|slot| fill(slot).min(slot.len())) {
530            Ok(()) => {
531                wait.reset();
532                return Ok(());
533            }
534            Err(LaneTxError::Full) => match on_full {
535                OnFullV1::Drop => return Ok(()),
536                OnFullV1::Block => {
537                    wait.wait();
538                    continue;
539                }
540            },
541            Err(e) => return Err(e),
542        }
543    }
544}
545
546#[inline]
547fn publish_with_policy_with_result<Tx: LaneTxWithResult, E>(
548    tx: &mut Tx,
549    on_full: OnFullV1,
550    wait: &mut impl WaitStrategy,
551    mut fill: impl FnMut(&mut [u8]) -> core::result::Result<usize, E>,
552) -> core::result::Result<(), LaneTxWithError<E>> {
553    loop {
554        match tx.publish_with_result(|slot| fill(slot).map(|n| n.min(slot.len()))) {
555            Ok(()) => {
556                wait.reset();
557                return Ok(());
558            }
559            Err(LaneTxWithError::Full) => match on_full {
560                OnFullV1::Drop => return Ok(()),
561                OnFullV1::Block => {
562                    wait.wait();
563                    continue;
564                }
565            },
566            Err(e @ LaneTxWithError::Encode(_)) => return Err(e),
567            Err(e @ LaneTxWithError::Failed) => return Err(e),
568        }
569    }
570}
571
572#[inline]
573fn publish_with_policy_wait<Tx: LaneTx, W: WaitStrategy>(
574    tx: &mut Tx,
575    msg: &[u8],
576    on_full: OnFullV1,
577    wait: &mut W,
578) -> core::result::Result<(), LaneTxError> {
579    loop {
580        match tx.publish(msg) {
581            Ok(()) => {
582                wait.reset();
583                return Ok(());
584            }
585            Err(LaneTxError::Full) => match on_full {
586                OnFullV1::Drop => return Ok(()),
587                OnFullV1::Block => {
588                    wait.wait();
589                    continue;
590                }
591            },
592            Err(e) => return Err(e),
593        }
594    }
595}
596
597#[cfg(test)]
598mod tests {
599    use super::*;
600    use indexbus_core::SpinWait;
601
602    struct Rx {
603        msgs: std::collections::VecDeque<Vec<u8>>,
604    }
605
606    impl LaneRx for Rx {
607        fn recv(&mut self, buf: &mut [u8]) -> Option<usize> {
608            let msg = self.msgs.pop_front()?;
609            let n = msg.len().min(buf.len());
610            buf[..n].copy_from_slice(&msg[..n]);
611            Some(n)
612        }
613    }
614
615    impl LaneRxBorrow for Rx {
616        fn recv_with<F, R>(
617            &mut self,
618            f: F,
619        ) -> core::result::Result<Option<R>, crate::lane::LaneRxError>
620        where
621            F: FnOnce(&[u8]) -> R,
622        {
623            let Some(msg) = self.msgs.pop_front() else {
624                return Ok(None);
625            };
626            Ok(Some(f(&msg)))
627        }
628    }
629
630    #[derive(Default)]
631    struct Tx {
632        out: Vec<Vec<u8>>,
633    }
634
635    impl LaneTx for Tx {
636        fn publish(&mut self, msg: &[u8]) -> core::result::Result<(), LaneTxError> {
637            self.out.push(msg.to_vec());
638            Ok(())
639        }
640    }
641
642    #[test]
643    fn recv_select_publish_round_robin() {
644        let mut rx0 = Rx {
645            msgs: [b"a0".to_vec(), b"a1".to_vec()].into(),
646        };
647        let mut rx1 = Rx {
648            msgs: [b"b0".to_vec(), b"b1".to_vec()].into(),
649        };
650        let mut tx = Tx::default();
651        let mut rr = 0usize;
652        let mut buf = [0u8; 16];
653
654        let mut rxs: [&mut dyn LaneRx; 2] = [&mut rx0, &mut rx1];
655        let mut wait = SpinWait::default();
656
657        // Consume 4 messages.
658        for _ in 0..4 {
659            let did = recv_select_publish_step(
660                &mut rxs,
661                &mut rr,
662                &mut tx,
663                &mut buf,
664                OnFullV1::Block,
665                &mut wait,
666            )
667            .unwrap();
668            assert!(did);
669        }
670
671        assert_eq!(
672            tx.out,
673            vec![
674                b"a0".to_vec(),
675                b"b0".to_vec(),
676                b"a1".to_vec(),
677                b"b1".to_vec()
678            ]
679        );
680    }
681
682    #[test]
683    fn recv_transform_publish_batch_forwards_all_available_up_to_batch() {
684        fn map_ok(input: &[u8], output: &mut [u8]) -> TransformOutcome {
685            let n = input.len().min(output.len());
686            output[..n].copy_from_slice(&input[..n]);
687            TransformOutcome::ForwardOutput(n)
688        }
689
690        let mut rx = Rx {
691            msgs: [b"a".to_vec(), b"b".to_vec(), b"c".to_vec()].into(),
692        };
693        let mut tx = Tx::default();
694        let mut in_buf = [0u8; 16];
695        let mut out_buf = [0u8; 16];
696        let mut wait = SpinWait::default();
697
698        let did = recv_transform_publish_batch(
699            &mut rx,
700            &mut tx,
701            &mut in_buf,
702            &mut out_buf,
703            OnFullV1::Block,
704            &mut wait,
705            32,
706            map_ok,
707        )
708        .unwrap();
709        assert!(did);
710        assert_eq!(tx.out, vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()]);
711
712        let did2 = recv_transform_publish_batch(
713            &mut rx,
714            &mut tx,
715            &mut in_buf,
716            &mut out_buf,
717            OnFullV1::Block,
718            &mut wait,
719            32,
720            map_ok,
721        )
722        .unwrap();
723        assert!(!did2);
724    }
725
726    #[test]
727    fn recv_select_publish_batch_consumes_multiple() {
728        let mut rx0 = Rx {
729            msgs: [b"a0".to_vec(), b"a1".to_vec()].into(),
730        };
731        let mut rx1 = Rx {
732            msgs: [b"b0".to_vec(), b"b1".to_vec()].into(),
733        };
734        let mut tx = Tx::default();
735        let mut rr = 0usize;
736        let mut buf = [0u8; 16];
737        let mut rxs: [&mut dyn LaneRx; 2] = [&mut rx0, &mut rx1];
738        let mut wait = SpinWait::default();
739
740        let did = recv_select_publish_batch(
741            &mut rxs,
742            &mut rr,
743            &mut tx,
744            &mut buf,
745            OnFullV1::Block,
746            &mut wait,
747            4,
748        )
749        .unwrap();
750        assert!(did);
751
752        assert_eq!(
753            tx.out,
754            vec![
755                b"a0".to_vec(),
756                b"b0".to_vec(),
757                b"a1".to_vec(),
758                b"b1".to_vec()
759            ]
760        );
761    }
762
763    #[test]
764    fn recv_with_transform_publish_step_borrows_input() {
765        fn map_ok(input: &[u8], output: &mut [u8]) -> TransformOutcome {
766            let n = input.len().min(output.len());
767            output[..n].copy_from_slice(&input[..n]);
768            TransformOutcome::ForwardOutput(n)
769        }
770
771        let mut rx = Rx {
772            msgs: [b"abc".to_vec()].into(),
773        };
774        let mut tx = Tx::default();
775        let mut out_buf = [0u8; 16];
776        let mut wait = SpinWait::default();
777
778        let did = recv_with_transform_publish_step(
779            &mut rx,
780            &mut tx,
781            &mut out_buf,
782            OnFullV1::Block,
783            &mut wait,
784            map_ok,
785        )
786        .unwrap();
787        assert!(did);
788        assert_eq!(tx.out, vec![b"abc".to_vec()]);
789    }
790
791    #[test]
792    fn recv_with_map_publish_with_result_step_does_not_publish_on_error() {
793        #[derive(Default)]
794        struct TxWithResult {
795            out: Vec<Vec<u8>>,
796        }
797
798        impl LaneTxWithResult for TxWithResult {
799            fn publish_with_result<F, E>(
800                &mut self,
801                f: F,
802            ) -> core::result::Result<(), crate::lane::LaneTxWithError<E>>
803            where
804                F: FnOnce(&mut [u8]) -> core::result::Result<usize, E>,
805            {
806                let mut slot = [0u8; 16];
807                match f(&mut slot) {
808                    Ok(n) => {
809                        self.out.push(slot[..n.min(slot.len())].to_vec());
810                        Ok(())
811                    }
812                    Err(e) => Err(crate::lane::LaneTxWithError::Encode(e)),
813                }
814            }
815        }
816
817        let mut rx = Rx {
818            msgs: [b"abc".to_vec()].into(),
819        };
820        let mut tx = TxWithResult::default();
821        let mut wait = SpinWait::default();
822
823        let err = recv_with_map_publish_with_result_step(
824            &mut rx,
825            &mut tx,
826            OnFullV1::Block,
827            &mut wait,
828            |_input, _slot| Err("map failed"),
829        )
830        .unwrap_err();
831
832        assert!(tx.out.is_empty());
833        assert_eq!(err, crate::lane::LaneTxWithError::Encode("map failed"));
834    }
835
836    #[test]
837    fn recv_slot_transform_forward_publish_slot_forwards_slot_on_forward_input() {
838        #[derive(Clone, Debug, PartialEq, Eq)]
839        struct Slot(Vec<u8>);
840
841        impl LaneSlot for Slot {
842            fn as_bytes(&self) -> &[u8] {
843                &self.0
844            }
845        }
846
847        struct RxSlot {
848            slots: std::collections::VecDeque<Slot>,
849        }
850
851        impl LaneRxSlot for RxSlot {
852            type Slot = Slot;
853
854            fn try_recv_slot(
855                &mut self,
856            ) -> core::result::Result<Option<Self::Slot>, crate::lane::LaneRxError> {
857                Ok(self.slots.pop_front())
858            }
859        }
860
861        #[derive(Default)]
862        struct TxSlot {
863            forwarded: Vec<Vec<u8>>,
864            published: Vec<Vec<u8>>,
865        }
866
867        impl LaneTx for TxSlot {
868            fn publish(&mut self, msg: &[u8]) -> core::result::Result<(), LaneTxError> {
869                self.published.push(msg.to_vec());
870                Ok(())
871            }
872        }
873
874        impl LaneTxSlot<Slot> for TxSlot {
875            fn publish_slot(
876                &mut self,
877                slot: Slot,
878            ) -> core::result::Result<(), LaneTxSlotError<Slot>> {
879                self.forwarded.push(slot.0);
880                Ok(())
881            }
882        }
883
884        let mut rx = RxSlot {
885            slots: [Slot(b"abc".to_vec())].into(),
886        };
887        let mut tx = TxSlot::default();
888        let mut out_buf = [0u8; 16];
889        let mut wait = SpinWait::default();
890
891        let did = recv_slot_transform_forward_publish_slot_step(
892            &mut rx,
893            &mut tx,
894            &mut out_buf,
895            OnFullV1::Block,
896            &mut wait,
897            |_input, _output| TransformOutcome::ForwardInput,
898        )
899        .unwrap();
900        assert!(did);
901        assert_eq!(tx.forwarded, vec![b"abc".to_vec()]);
902        assert!(tx.published.is_empty());
903    }
904
905    #[test]
906    fn recv_slot_transform_forward_publish_slot_publishes_output_on_forward_output() {
907        #[derive(Clone, Debug, PartialEq, Eq)]
908        struct Slot(Vec<u8>);
909
910        impl LaneSlot for Slot {
911            fn as_bytes(&self) -> &[u8] {
912                &self.0
913            }
914        }
915
916        struct RxSlot {
917            slots: std::collections::VecDeque<Slot>,
918        }
919
920        impl LaneRxSlot for RxSlot {
921            type Slot = Slot;
922
923            fn try_recv_slot(
924                &mut self,
925            ) -> core::result::Result<Option<Self::Slot>, crate::lane::LaneRxError> {
926                Ok(self.slots.pop_front())
927            }
928        }
929
930        #[derive(Default)]
931        struct TxSlot {
932            forwarded: Vec<Vec<u8>>,
933            published: Vec<Vec<u8>>,
934        }
935
936        impl LaneTx for TxSlot {
937            fn publish(&mut self, msg: &[u8]) -> core::result::Result<(), LaneTxError> {
938                self.published.push(msg.to_vec());
939                Ok(())
940            }
941        }
942
943        impl LaneTxSlot<Slot> for TxSlot {
944            fn publish_slot(
945                &mut self,
946                slot: Slot,
947            ) -> core::result::Result<(), LaneTxSlotError<Slot>> {
948                self.forwarded.push(slot.0);
949                Ok(())
950            }
951        }
952
953        let mut rx = RxSlot {
954            slots: [Slot(b"abc".to_vec())].into(),
955        };
956        let mut tx = TxSlot::default();
957        let mut out_buf = [0u8; 16];
958        let mut wait = SpinWait::default();
959
960        let did = recv_slot_transform_forward_publish_slot_step(
961            &mut rx,
962            &mut tx,
963            &mut out_buf,
964            OnFullV1::Block,
965            &mut wait,
966            |_input, output| {
967                output[0] = b'x';
968                TransformOutcome::ForwardOutput(1)
969            },
970        )
971        .unwrap();
972        assert!(did);
973        assert!(tx.forwarded.is_empty());
974        assert_eq!(tx.published, vec![b"x".to_vec()]);
975    }
976
977    #[derive(Default)]
978    struct CountingWait {
979        resets: u64,
980        waits: u64,
981    }
982
983    impl WaitStrategy for CountingWait {
984        fn reset(&mut self) {
985            self.resets += 1;
986        }
987
988        fn wait(&mut self) {
989            self.waits += 1;
990        }
991    }
992
993    struct FullOnceTx {
994        calls: u32,
995        published: Vec<Vec<u8>>,
996    }
997
998    impl LaneTx for FullOnceTx {
999        fn publish(&mut self, msg: &[u8]) -> core::result::Result<(), LaneTxError> {
1000            self.calls = self.calls.saturating_add(1);
1001            if self.calls == 1 {
1002                return Err(LaneTxError::Full);
1003            }
1004            self.published.push(msg.to_vec());
1005            Ok(())
1006        }
1007    }
1008
1009    #[test]
1010    fn publish_with_policy_drop_does_not_wait_on_full() {
1011        let mut tx = FullOnceTx {
1012            calls: 0,
1013            published: Vec::new(),
1014        };
1015        let mut wait = CountingWait::default();
1016
1017        publish_with_policy(&mut tx, b"abc", OnFullV1::Drop, &mut wait).unwrap();
1018
1019        // Full + Drop: message is dropped (never published) and no waiting occurs.
1020        assert!(tx.published.is_empty());
1021        assert_eq!(wait.waits, 0);
1022        assert_eq!(wait.resets, 0);
1023    }
1024
1025    #[test]
1026    fn publish_with_policy_block_waits_and_retries_on_full() {
1027        let mut tx = FullOnceTx {
1028            calls: 0,
1029            published: Vec::new(),
1030        };
1031        let mut wait = CountingWait::default();
1032
1033        publish_with_policy(&mut tx, b"abc", OnFullV1::Block, &mut wait).unwrap();
1034
1035        // Full + Block: wait once, retry, then publish.
1036        assert_eq!(tx.published, vec![b"abc".to_vec()]);
1037        assert_eq!(wait.waits, 1);
1038        assert_eq!(wait.resets, 1);
1039    }
1040}