1use crate::lane::{
4 LaneRx, LaneRxBorrow, LaneRxSlot, LaneSlot, LaneTx, LaneTxSlot, LaneTxSlotError, LaneTxWith,
5 LaneTxWithResult,
6};
7use crate::lane::{LaneTxError, LaneTxWithError};
8
9use byteor_pipeline_spec::OnFullV1;
10use indexbus_core::WaitStrategy;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14#[non_exhaustive]
15pub enum TransformOutcome {
16 Drop,
18 ForwardInput,
20 ForwardOutput(usize),
22}
23
24pub fn recv_transform_publish_step<Rx, Tx, W, F>(
26 rx: &mut Rx,
27 tx: &mut Tx,
28 input: &mut [u8],
29 output: &mut [u8],
30 on_full: OnFullV1,
31 wait: &mut W,
32 mut transform: F,
33) -> core::result::Result<bool, LaneTxError>
34where
35 Rx: LaneRx,
36 Tx: LaneTx,
37 W: WaitStrategy,
38 F: FnMut(&[u8], &mut [u8]) -> TransformOutcome,
39{
40 let Some(n) = rx.recv(input) else {
41 return Ok(false);
42 };
43 let input = &input[..n];
44 match transform(input, output) {
45 TransformOutcome::Drop => {}
46 TransformOutcome::ForwardInput => {
47 publish_with_policy(tx, input, on_full, wait)?;
48 }
49 TransformOutcome::ForwardOutput(out_n) => {
50 let out_n = out_n.min(output.len());
51 publish_with_policy(tx, &output[..out_n], on_full, wait)?;
52 }
53 }
54
55 Ok(true)
56}
57
58pub fn recv_with_transform_publish_step<Rx, Tx, F>(
63 rx: &mut Rx,
64 tx: &mut Tx,
65 output: &mut [u8],
66 on_full: OnFullV1,
67 wait: &mut impl WaitStrategy,
68 mut transform: F,
69) -> core::result::Result<bool, LaneTxError>
70where
71 Rx: LaneRxBorrow,
72 Tx: LaneTx,
73 F: FnMut(&[u8], &mut [u8]) -> TransformOutcome,
74{
75 let res = rx
76 .recv_with(|input| match transform(input, output) {
77 TransformOutcome::Drop => Ok(()),
78 TransformOutcome::ForwardInput => publish_with_policy(tx, input, on_full, wait),
79 TransformOutcome::ForwardOutput(out_n) => {
80 let out_n = out_n.min(output.len());
81 publish_with_policy(tx, &output[..out_n], on_full, wait)
82 }
83 })
84 .map_err(|_| LaneTxError::Failed)?;
85
86 match res {
87 None => Ok(false),
88 Some(r) => {
89 r?;
90 Ok(true)
91 }
92 }
93}
94
95pub fn recv_with_transform_publish_batch<Rx, Tx, F>(
99 rx: &mut Rx,
100 tx: &mut Tx,
101 output: &mut [u8],
102 on_full: OnFullV1,
103 wait: &mut impl WaitStrategy,
104 batch: usize,
105 mut transform: F,
106) -> core::result::Result<bool, LaneTxError>
107where
108 Rx: LaneRxBorrow,
109 Tx: LaneTx,
110 F: FnMut(&[u8], &mut [u8]) -> TransformOutcome,
111{
112 let batch = batch.max(1);
113 let mut did_any = false;
114
115 for _ in 0..batch {
116 let did = recv_with_transform_publish_step(rx, tx, output, on_full, wait, &mut transform)?;
117 if !did {
118 break;
119 }
120 did_any = true;
121 }
122
123 Ok(did_any)
124}
125
126pub fn recv_map_ok_publish_with_step<Rx, Tx>(
130 rx: &mut Rx,
131 tx: &mut Tx,
132 input: &mut [u8],
133 on_full: OnFullV1,
134 wait: &mut impl WaitStrategy,
135 map_ok: fn(&[u8], &mut [u8]) -> usize,
136) -> core::result::Result<bool, LaneTxError>
137where
138 Rx: LaneRx,
139 Tx: LaneTxWith,
140{
141 let Some(n) = rx.recv(input) else {
142 return Ok(false);
143 };
144 let input = &input[..n];
145 publish_with_policy_with(tx, on_full, wait, |slot| map_ok(input, slot))?;
146 Ok(true)
147}
148
149pub fn recv_with_map_ok_publish_with_step<Rx, Tx>(
153 rx: &mut Rx,
154 tx: &mut Tx,
155 on_full: OnFullV1,
156 wait: &mut impl WaitStrategy,
157 map_ok: fn(&[u8], &mut [u8]) -> usize,
158) -> core::result::Result<bool, LaneTxError>
159where
160 Rx: LaneRxBorrow,
161 Tx: LaneTxWith,
162{
163 let res = rx
164 .recv_with(|input| publish_with_policy_with(tx, on_full, wait, |slot| map_ok(input, slot)))
165 .map_err(|_| LaneTxError::Failed)?;
166 match res {
167 None => Ok(false),
168 Some(r) => {
169 r?;
170 Ok(true)
171 }
172 }
173}
174
175pub fn recv_with_map_publish_with_result_step<Rx, Tx, E>(
180 rx: &mut Rx,
181 tx: &mut Tx,
182 on_full: OnFullV1,
183 wait: &mut impl WaitStrategy,
184 map: impl FnMut(&[u8], &mut [u8]) -> core::result::Result<usize, E>,
185) -> core::result::Result<bool, LaneTxWithError<E>>
186where
187 Rx: LaneRxBorrow,
188 Tx: LaneTxWithResult,
189{
190 let mut map = map;
191 let res = rx
192 .recv_with(|input| {
193 publish_with_policy_with_result(tx, on_full, wait, |slot| map(input, slot))
194 })
195 .map_err(|_| LaneTxWithError::Failed)?;
196 match res {
197 None => Ok(false),
198 Some(r) => {
199 r?;
200 Ok(true)
201 }
202 }
203}
204
205pub fn recv_with_forward_publish_with_step<Rx, Tx>(
210 rx: &mut Rx,
211 tx: &mut Tx,
212 on_full: OnFullV1,
213 wait: &mut impl WaitStrategy,
214) -> core::result::Result<bool, LaneTxError>
215where
216 Rx: LaneRxBorrow,
217 Tx: LaneTxWith,
218{
219 let res = rx
220 .recv_with(|input| {
221 publish_with_policy_with(tx, on_full, wait, |slot| {
222 let n = input.len().min(slot.len());
223 slot[..n].copy_from_slice(&input[..n]);
224 n
225 })
226 })
227 .map_err(|_| LaneTxError::Failed)?;
228 match res {
229 None => Ok(false),
230 Some(r) => {
231 r?;
232 Ok(true)
233 }
234 }
235}
236
237pub fn recv_slot_forward_publish_slot_step<Rx, Tx>(
242 rx: &mut Rx,
243 tx: &mut Tx,
244 on_full: OnFullV1,
245 wait: &mut impl WaitStrategy,
246) -> core::result::Result<bool, LaneTxError>
247where
248 Rx: LaneRxSlot,
249 Tx: LaneTxSlot<Rx::Slot> + LaneTx,
250{
251 let Some(slot) = rx.try_recv_slot().map_err(|_| LaneTxError::Failed)? else {
252 return Ok(false);
253 };
254
255 publish_slot_with_policy_or_copy(tx, slot, on_full, wait)?;
256 Ok(true)
257}
258
259pub fn recv_slot_filter_forward_publish_slot_step<Rx, Tx, F>(
263 rx: &mut Rx,
264 tx: &mut Tx,
265 on_full: OnFullV1,
266 wait: &mut impl WaitStrategy,
267 mut filter: F,
268) -> core::result::Result<bool, LaneTxError>
269where
270 Rx: LaneRxSlot,
271 Tx: LaneTxSlot<Rx::Slot> + LaneTx,
272 Rx::Slot: LaneSlot,
273 F: FnMut(&[u8]) -> bool,
274{
275 let Some(slot) = rx.try_recv_slot().map_err(|_| LaneTxError::Failed)? else {
276 return Ok(false);
277 };
278
279 if !filter(slot.as_bytes()) {
280 return Ok(true);
282 }
283
284 publish_slot_with_policy_or_copy(tx, slot, on_full, wait)?;
285 Ok(true)
286}
287
288pub fn recv_slot_inspect_forward_publish_slot_step<Rx, Tx, F>(
292 rx: &mut Rx,
293 tx: &mut Tx,
294 on_full: OnFullV1,
295 wait: &mut impl WaitStrategy,
296 mut inspect: F,
297) -> core::result::Result<bool, LaneTxError>
298where
299 Rx: LaneRxSlot,
300 Tx: LaneTxSlot<Rx::Slot> + LaneTx,
301 Rx::Slot: LaneSlot,
302 F: FnMut(&[u8]) -> bool,
303{
304 let Some(slot) = rx.try_recv_slot().map_err(|_| LaneTxError::Failed)? else {
305 return Ok(false);
306 };
307
308 if !inspect(slot.as_bytes()) {
309 return Ok(true);
310 }
311
312 publish_slot_with_policy_or_copy(tx, slot, on_full, wait)?;
313 Ok(true)
314}
315
316pub fn recv_slot_transform_forward_publish_slot_step<Rx, Tx, F>(
324 rx: &mut Rx,
325 tx: &mut Tx,
326 output: &mut [u8],
327 on_full: OnFullV1,
328 wait: &mut impl WaitStrategy,
329 mut transform: F,
330) -> core::result::Result<bool, LaneTxError>
331where
332 Rx: LaneRxSlot,
333 Tx: LaneTxSlot<Rx::Slot> + LaneTx,
334 Rx::Slot: LaneSlot,
335 F: FnMut(&[u8], &mut [u8]) -> TransformOutcome,
336{
337 let Some(slot) = rx.try_recv_slot().map_err(|_| LaneTxError::Failed)? else {
338 return Ok(false);
339 };
340
341 match transform(slot.as_bytes(), output) {
342 TransformOutcome::Drop => {
343 }
345 TransformOutcome::ForwardInput => {
346 publish_slot_with_policy_or_copy(tx, slot, on_full, wait)?;
347 }
348 TransformOutcome::ForwardOutput(out_n) => {
349 let out_n = out_n.min(output.len());
350 publish_with_policy(tx, &output[..out_n], on_full, wait)?;
351 }
353 }
354
355 Ok(true)
356}
357
358#[inline]
362pub fn publish_slot_with_policy_or_copy<Tx, S>(
363 tx: &mut Tx,
364 mut slot: S,
365 on_full: OnFullV1,
366 wait: &mut impl WaitStrategy,
367) -> core::result::Result<(), LaneTxError>
368where
369 Tx: LaneTxSlot<S> + LaneTx,
370 S: LaneSlot,
371{
372 loop {
373 match tx.publish_slot(slot) {
374 Ok(()) => {
375 wait.reset();
376 return Ok(());
377 }
378 Err(LaneTxSlotError::Full(s)) => match on_full {
379 OnFullV1::Drop => return Ok(()),
380 OnFullV1::Block => {
381 slot = s;
382 wait.wait();
383 continue;
384 }
385 },
386 Err(LaneTxSlotError::Incompatible(s)) => {
387 publish_with_policy_wait(tx, s.as_bytes(), on_full, wait)?;
389 return Ok(());
390 }
391 Err(LaneTxSlotError::Failed(_s)) => return Err(LaneTxError::Failed),
392 }
393 }
394}
395
396#[allow(clippy::too_many_arguments)]
400pub fn recv_transform_publish_batch<Rx, Tx, F>(
401 rx: &mut Rx,
402 tx: &mut Tx,
403 input: &mut [u8],
404 output: &mut [u8],
405 on_full: OnFullV1,
406 wait: &mut impl WaitStrategy,
407 batch: usize,
408 mut transform: F,
409) -> core::result::Result<bool, LaneTxError>
410where
411 Rx: LaneRx,
412 Tx: LaneTx,
413 F: FnMut(&[u8], &mut [u8]) -> TransformOutcome,
414{
415 let batch = batch.max(1);
416 let mut did_any = false;
417
418 for _ in 0..batch {
419 let Some(n) = rx.recv(input) else {
420 break;
421 };
422 did_any = true;
423 let input = &input[..n];
424 match transform(input, output) {
425 TransformOutcome::Drop => {}
426 TransformOutcome::ForwardInput => {
427 publish_with_policy_wait(tx, input, on_full, wait)?;
428 }
429 TransformOutcome::ForwardOutput(out_n) => {
430 let out_n = out_n.min(output.len());
431 publish_with_policy_wait(tx, &output[..out_n], on_full, wait)?;
432 }
433 }
434 }
435
436 Ok(did_any)
437}
438
439pub fn recv_select_publish_step<Tx>(
444 rxs: &mut [&mut dyn LaneRx],
445 rr: &mut usize,
446 tx: &mut Tx,
447 buf: &mut [u8],
448 on_full: OnFullV1,
449 wait: &mut impl WaitStrategy,
450) -> core::result::Result<bool, LaneTxError>
451where
452 Tx: LaneTx,
453{
454 if rxs.is_empty() {
455 return Ok(false);
456 }
457
458 let n = rxs.len();
459 let start = (*rr) % n;
460
461 for offset in 0..n {
462 let idx = (start + offset) % n;
463 let Some(msg_n) = rxs[idx].recv(buf) else {
464 continue;
465 };
466 publish_with_policy(tx, &buf[..msg_n], on_full, wait)?;
467 *rr = (idx + 1) % n;
468 return Ok(true);
469 }
470
471 *rr = (start + 1) % n;
473 Ok(false)
474}
475
476pub fn recv_select_publish_batch<Tx>(
480 rxs: &mut [&mut dyn LaneRx],
481 rr: &mut usize,
482 tx: &mut Tx,
483 buf: &mut [u8],
484 on_full: OnFullV1,
485 wait: &mut impl WaitStrategy,
486 batch: usize,
487) -> core::result::Result<bool, LaneTxError>
488where
489 Tx: LaneTx,
490{
491 let batch = batch.max(1);
492 let mut did_any = false;
493
494 for _ in 0..batch {
495 let did = recv_select_publish_step(rxs, rr, tx, buf, on_full, wait)?;
496 if !did {
497 break;
498 }
499 did_any = true;
500 }
501
502 Ok(did_any)
503}
504
505#[inline]
506pub fn publish_with_policy<Tx: LaneTx>(
513 tx: &mut Tx,
514 msg: &[u8],
515 on_full: OnFullV1,
516 wait: &mut impl WaitStrategy,
517) -> core::result::Result<(), LaneTxError> {
518 publish_with_policy_wait(tx, msg, on_full, wait)
519}
520
521#[inline]
522fn publish_with_policy_with<Tx: LaneTxWith>(
523 tx: &mut Tx,
524 on_full: OnFullV1,
525 wait: &mut impl WaitStrategy,
526 mut fill: impl FnMut(&mut [u8]) -> usize,
527) -> core::result::Result<(), LaneTxError> {
528 loop {
529 match tx.publish_with(|slot| fill(slot).min(slot.len())) {
530 Ok(()) => {
531 wait.reset();
532 return Ok(());
533 }
534 Err(LaneTxError::Full) => match on_full {
535 OnFullV1::Drop => return Ok(()),
536 OnFullV1::Block => {
537 wait.wait();
538 continue;
539 }
540 },
541 Err(e) => return Err(e),
542 }
543 }
544}
545
546#[inline]
547fn publish_with_policy_with_result<Tx: LaneTxWithResult, E>(
548 tx: &mut Tx,
549 on_full: OnFullV1,
550 wait: &mut impl WaitStrategy,
551 mut fill: impl FnMut(&mut [u8]) -> core::result::Result<usize, E>,
552) -> core::result::Result<(), LaneTxWithError<E>> {
553 loop {
554 match tx.publish_with_result(|slot| fill(slot).map(|n| n.min(slot.len()))) {
555 Ok(()) => {
556 wait.reset();
557 return Ok(());
558 }
559 Err(LaneTxWithError::Full) => match on_full {
560 OnFullV1::Drop => return Ok(()),
561 OnFullV1::Block => {
562 wait.wait();
563 continue;
564 }
565 },
566 Err(e @ LaneTxWithError::Encode(_)) => return Err(e),
567 Err(e @ LaneTxWithError::Failed) => return Err(e),
568 }
569 }
570}
571
572#[inline]
573fn publish_with_policy_wait<Tx: LaneTx, W: WaitStrategy>(
574 tx: &mut Tx,
575 msg: &[u8],
576 on_full: OnFullV1,
577 wait: &mut W,
578) -> core::result::Result<(), LaneTxError> {
579 loop {
580 match tx.publish(msg) {
581 Ok(()) => {
582 wait.reset();
583 return Ok(());
584 }
585 Err(LaneTxError::Full) => match on_full {
586 OnFullV1::Drop => return Ok(()),
587 OnFullV1::Block => {
588 wait.wait();
589 continue;
590 }
591 },
592 Err(e) => return Err(e),
593 }
594 }
595}
596
597#[cfg(test)]
598mod tests {
599 use super::*;
600 use indexbus_core::SpinWait;
601
602 struct Rx {
603 msgs: std::collections::VecDeque<Vec<u8>>,
604 }
605
606 impl LaneRx for Rx {
607 fn recv(&mut self, buf: &mut [u8]) -> Option<usize> {
608 let msg = self.msgs.pop_front()?;
609 let n = msg.len().min(buf.len());
610 buf[..n].copy_from_slice(&msg[..n]);
611 Some(n)
612 }
613 }
614
615 impl LaneRxBorrow for Rx {
616 fn recv_with<F, R>(
617 &mut self,
618 f: F,
619 ) -> core::result::Result<Option<R>, crate::lane::LaneRxError>
620 where
621 F: FnOnce(&[u8]) -> R,
622 {
623 let Some(msg) = self.msgs.pop_front() else {
624 return Ok(None);
625 };
626 Ok(Some(f(&msg)))
627 }
628 }
629
630 #[derive(Default)]
631 struct Tx {
632 out: Vec<Vec<u8>>,
633 }
634
635 impl LaneTx for Tx {
636 fn publish(&mut self, msg: &[u8]) -> core::result::Result<(), LaneTxError> {
637 self.out.push(msg.to_vec());
638 Ok(())
639 }
640 }
641
642 #[test]
643 fn recv_select_publish_round_robin() {
644 let mut rx0 = Rx {
645 msgs: [b"a0".to_vec(), b"a1".to_vec()].into(),
646 };
647 let mut rx1 = Rx {
648 msgs: [b"b0".to_vec(), b"b1".to_vec()].into(),
649 };
650 let mut tx = Tx::default();
651 let mut rr = 0usize;
652 let mut buf = [0u8; 16];
653
654 let mut rxs: [&mut dyn LaneRx; 2] = [&mut rx0, &mut rx1];
655 let mut wait = SpinWait::default();
656
657 for _ in 0..4 {
659 let did = recv_select_publish_step(
660 &mut rxs,
661 &mut rr,
662 &mut tx,
663 &mut buf,
664 OnFullV1::Block,
665 &mut wait,
666 )
667 .unwrap();
668 assert!(did);
669 }
670
671 assert_eq!(
672 tx.out,
673 vec![
674 b"a0".to_vec(),
675 b"b0".to_vec(),
676 b"a1".to_vec(),
677 b"b1".to_vec()
678 ]
679 );
680 }
681
682 #[test]
683 fn recv_transform_publish_batch_forwards_all_available_up_to_batch() {
684 fn map_ok(input: &[u8], output: &mut [u8]) -> TransformOutcome {
685 let n = input.len().min(output.len());
686 output[..n].copy_from_slice(&input[..n]);
687 TransformOutcome::ForwardOutput(n)
688 }
689
690 let mut rx = Rx {
691 msgs: [b"a".to_vec(), b"b".to_vec(), b"c".to_vec()].into(),
692 };
693 let mut tx = Tx::default();
694 let mut in_buf = [0u8; 16];
695 let mut out_buf = [0u8; 16];
696 let mut wait = SpinWait::default();
697
698 let did = recv_transform_publish_batch(
699 &mut rx,
700 &mut tx,
701 &mut in_buf,
702 &mut out_buf,
703 OnFullV1::Block,
704 &mut wait,
705 32,
706 map_ok,
707 )
708 .unwrap();
709 assert!(did);
710 assert_eq!(tx.out, vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()]);
711
712 let did2 = recv_transform_publish_batch(
713 &mut rx,
714 &mut tx,
715 &mut in_buf,
716 &mut out_buf,
717 OnFullV1::Block,
718 &mut wait,
719 32,
720 map_ok,
721 )
722 .unwrap();
723 assert!(!did2);
724 }
725
726 #[test]
727 fn recv_select_publish_batch_consumes_multiple() {
728 let mut rx0 = Rx {
729 msgs: [b"a0".to_vec(), b"a1".to_vec()].into(),
730 };
731 let mut rx1 = Rx {
732 msgs: [b"b0".to_vec(), b"b1".to_vec()].into(),
733 };
734 let mut tx = Tx::default();
735 let mut rr = 0usize;
736 let mut buf = [0u8; 16];
737 let mut rxs: [&mut dyn LaneRx; 2] = [&mut rx0, &mut rx1];
738 let mut wait = SpinWait::default();
739
740 let did = recv_select_publish_batch(
741 &mut rxs,
742 &mut rr,
743 &mut tx,
744 &mut buf,
745 OnFullV1::Block,
746 &mut wait,
747 4,
748 )
749 .unwrap();
750 assert!(did);
751
752 assert_eq!(
753 tx.out,
754 vec![
755 b"a0".to_vec(),
756 b"b0".to_vec(),
757 b"a1".to_vec(),
758 b"b1".to_vec()
759 ]
760 );
761 }
762
763 #[test]
764 fn recv_with_transform_publish_step_borrows_input() {
765 fn map_ok(input: &[u8], output: &mut [u8]) -> TransformOutcome {
766 let n = input.len().min(output.len());
767 output[..n].copy_from_slice(&input[..n]);
768 TransformOutcome::ForwardOutput(n)
769 }
770
771 let mut rx = Rx {
772 msgs: [b"abc".to_vec()].into(),
773 };
774 let mut tx = Tx::default();
775 let mut out_buf = [0u8; 16];
776 let mut wait = SpinWait::default();
777
778 let did = recv_with_transform_publish_step(
779 &mut rx,
780 &mut tx,
781 &mut out_buf,
782 OnFullV1::Block,
783 &mut wait,
784 map_ok,
785 )
786 .unwrap();
787 assert!(did);
788 assert_eq!(tx.out, vec![b"abc".to_vec()]);
789 }
790
791 #[test]
792 fn recv_with_map_publish_with_result_step_does_not_publish_on_error() {
793 #[derive(Default)]
794 struct TxWithResult {
795 out: Vec<Vec<u8>>,
796 }
797
798 impl LaneTxWithResult for TxWithResult {
799 fn publish_with_result<F, E>(
800 &mut self,
801 f: F,
802 ) -> core::result::Result<(), crate::lane::LaneTxWithError<E>>
803 where
804 F: FnOnce(&mut [u8]) -> core::result::Result<usize, E>,
805 {
806 let mut slot = [0u8; 16];
807 match f(&mut slot) {
808 Ok(n) => {
809 self.out.push(slot[..n.min(slot.len())].to_vec());
810 Ok(())
811 }
812 Err(e) => Err(crate::lane::LaneTxWithError::Encode(e)),
813 }
814 }
815 }
816
817 let mut rx = Rx {
818 msgs: [b"abc".to_vec()].into(),
819 };
820 let mut tx = TxWithResult::default();
821 let mut wait = SpinWait::default();
822
823 let err = recv_with_map_publish_with_result_step(
824 &mut rx,
825 &mut tx,
826 OnFullV1::Block,
827 &mut wait,
828 |_input, _slot| Err("map failed"),
829 )
830 .unwrap_err();
831
832 assert!(tx.out.is_empty());
833 assert_eq!(err, crate::lane::LaneTxWithError::Encode("map failed"));
834 }
835
836 #[test]
837 fn recv_slot_transform_forward_publish_slot_forwards_slot_on_forward_input() {
838 #[derive(Clone, Debug, PartialEq, Eq)]
839 struct Slot(Vec<u8>);
840
841 impl LaneSlot for Slot {
842 fn as_bytes(&self) -> &[u8] {
843 &self.0
844 }
845 }
846
847 struct RxSlot {
848 slots: std::collections::VecDeque<Slot>,
849 }
850
851 impl LaneRxSlot for RxSlot {
852 type Slot = Slot;
853
854 fn try_recv_slot(
855 &mut self,
856 ) -> core::result::Result<Option<Self::Slot>, crate::lane::LaneRxError> {
857 Ok(self.slots.pop_front())
858 }
859 }
860
861 #[derive(Default)]
862 struct TxSlot {
863 forwarded: Vec<Vec<u8>>,
864 published: Vec<Vec<u8>>,
865 }
866
867 impl LaneTx for TxSlot {
868 fn publish(&mut self, msg: &[u8]) -> core::result::Result<(), LaneTxError> {
869 self.published.push(msg.to_vec());
870 Ok(())
871 }
872 }
873
874 impl LaneTxSlot<Slot> for TxSlot {
875 fn publish_slot(
876 &mut self,
877 slot: Slot,
878 ) -> core::result::Result<(), LaneTxSlotError<Slot>> {
879 self.forwarded.push(slot.0);
880 Ok(())
881 }
882 }
883
884 let mut rx = RxSlot {
885 slots: [Slot(b"abc".to_vec())].into(),
886 };
887 let mut tx = TxSlot::default();
888 let mut out_buf = [0u8; 16];
889 let mut wait = SpinWait::default();
890
891 let did = recv_slot_transform_forward_publish_slot_step(
892 &mut rx,
893 &mut tx,
894 &mut out_buf,
895 OnFullV1::Block,
896 &mut wait,
897 |_input, _output| TransformOutcome::ForwardInput,
898 )
899 .unwrap();
900 assert!(did);
901 assert_eq!(tx.forwarded, vec![b"abc".to_vec()]);
902 assert!(tx.published.is_empty());
903 }
904
905 #[test]
906 fn recv_slot_transform_forward_publish_slot_publishes_output_on_forward_output() {
907 #[derive(Clone, Debug, PartialEq, Eq)]
908 struct Slot(Vec<u8>);
909
910 impl LaneSlot for Slot {
911 fn as_bytes(&self) -> &[u8] {
912 &self.0
913 }
914 }
915
916 struct RxSlot {
917 slots: std::collections::VecDeque<Slot>,
918 }
919
920 impl LaneRxSlot for RxSlot {
921 type Slot = Slot;
922
923 fn try_recv_slot(
924 &mut self,
925 ) -> core::result::Result<Option<Self::Slot>, crate::lane::LaneRxError> {
926 Ok(self.slots.pop_front())
927 }
928 }
929
930 #[derive(Default)]
931 struct TxSlot {
932 forwarded: Vec<Vec<u8>>,
933 published: Vec<Vec<u8>>,
934 }
935
936 impl LaneTx for TxSlot {
937 fn publish(&mut self, msg: &[u8]) -> core::result::Result<(), LaneTxError> {
938 self.published.push(msg.to_vec());
939 Ok(())
940 }
941 }
942
943 impl LaneTxSlot<Slot> for TxSlot {
944 fn publish_slot(
945 &mut self,
946 slot: Slot,
947 ) -> core::result::Result<(), LaneTxSlotError<Slot>> {
948 self.forwarded.push(slot.0);
949 Ok(())
950 }
951 }
952
953 let mut rx = RxSlot {
954 slots: [Slot(b"abc".to_vec())].into(),
955 };
956 let mut tx = TxSlot::default();
957 let mut out_buf = [0u8; 16];
958 let mut wait = SpinWait::default();
959
960 let did = recv_slot_transform_forward_publish_slot_step(
961 &mut rx,
962 &mut tx,
963 &mut out_buf,
964 OnFullV1::Block,
965 &mut wait,
966 |_input, output| {
967 output[0] = b'x';
968 TransformOutcome::ForwardOutput(1)
969 },
970 )
971 .unwrap();
972 assert!(did);
973 assert!(tx.forwarded.is_empty());
974 assert_eq!(tx.published, vec![b"x".to_vec()]);
975 }
976
977 #[derive(Default)]
978 struct CountingWait {
979 resets: u64,
980 waits: u64,
981 }
982
983 impl WaitStrategy for CountingWait {
984 fn reset(&mut self) {
985 self.resets += 1;
986 }
987
988 fn wait(&mut self) {
989 self.waits += 1;
990 }
991 }
992
993 struct FullOnceTx {
994 calls: u32,
995 published: Vec<Vec<u8>>,
996 }
997
998 impl LaneTx for FullOnceTx {
999 fn publish(&mut self, msg: &[u8]) -> core::result::Result<(), LaneTxError> {
1000 self.calls = self.calls.saturating_add(1);
1001 if self.calls == 1 {
1002 return Err(LaneTxError::Full);
1003 }
1004 self.published.push(msg.to_vec());
1005 Ok(())
1006 }
1007 }
1008
1009 #[test]
1010 fn publish_with_policy_drop_does_not_wait_on_full() {
1011 let mut tx = FullOnceTx {
1012 calls: 0,
1013 published: Vec::new(),
1014 };
1015 let mut wait = CountingWait::default();
1016
1017 publish_with_policy(&mut tx, b"abc", OnFullV1::Drop, &mut wait).unwrap();
1018
1019 assert!(tx.published.is_empty());
1021 assert_eq!(wait.waits, 0);
1022 assert_eq!(wait.resets, 0);
1023 }
1024
1025 #[test]
1026 fn publish_with_policy_block_waits_and_retries_on_full() {
1027 let mut tx = FullOnceTx {
1028 calls: 0,
1029 published: Vec::new(),
1030 };
1031 let mut wait = CountingWait::default();
1032
1033 publish_with_policy(&mut tx, b"abc", OnFullV1::Block, &mut wait).unwrap();
1034
1035 assert_eq!(tx.published, vec![b"abc".to_vec()]);
1037 assert_eq!(wait.waits, 1);
1038 assert_eq!(wait.resets, 1);
1039 }
1040}