indexbus_core/
fanout.rs

1use core::sync::atomic::Ordering;
2
3use core::cell::UnsafeCell;
4
5use indexbus_abi::layouts::SharedFanoutLayout;
6use indexbus_abi::{caps, flags, LayoutHeader};
7
8use crate::internal;
9use crate::{Error, PublishWithError, RecvWithError, INDEXBUS_SLOT_DATA_SIZE};
10
11/// `SharedFanoutLayout<N>` wrapper that explicitly opts into interior mutability.
12///
13/// The v1 fanout layout is designed for concurrent mutation through atomics and raw pointers.
14/// Rust requires such mutation through a shared reference to be mediated by [`UnsafeCell`].
15///
16/// This type is `repr(transparent)` over the ABI layout, so it can be used over the same mapped
17/// bytes as `SharedFanoutLayout<N>`.
18#[repr(transparent)]
19pub struct SharedFanoutLayoutCell<const N: usize>(UnsafeCell<SharedFanoutLayout<N>>);
20
21impl<const N: usize> SharedFanoutLayoutCell<N> {
22    /// Borrow a `SharedFanoutLayoutCell` view over a uniquely-mutable `SharedFanoutLayout<N>`.
23    #[inline]
24    pub fn from_mut(shared: &mut SharedFanoutLayout<N>) -> &SharedFanoutLayoutCell<N> {
25        unsafe { &*(shared as *mut SharedFanoutLayout<N> as *const SharedFanoutLayoutCell<N>) }
26    }
27
28    /// Borrow a `SharedFanoutLayoutCell` view over a raw pointer.
29    ///
30    /// # Safety
31    /// `ptr` must be valid for reads/writes for the lifetime `'a`.
32    #[inline]
33    pub unsafe fn from_ptr<'a>(ptr: *mut SharedFanoutLayout<N>) -> &'a SharedFanoutLayoutCell<N> {
34        unsafe { &*(ptr as *const SharedFanoutLayoutCell<N>) }
35    }
36
37    #[inline]
38    fn as_ptr(&self) -> *mut SharedFanoutLayout<N> {
39        self.0.get()
40    }
41}
42
43/// SPSC producer handle for a fanout events region.
44///
45/// Publishes into the producer→router queue.
46///
47/// Semantics (v1):
48/// - at-most-once enqueue into the producer queue
49/// - bounded capacity; `Error::Full` when saturated
50pub struct FanoutProducer<const N: usize> {
51    shared: *mut SharedFanoutLayout<N>,
52}
53
54/// Multi-producer handle for publishing into a fanout layout.
55///
56/// Publishes into the layout's MPSC producer->router queue.
57pub struct FanoutMpscProducer<const N: usize> {
58    shared: *mut SharedFanoutLayout<N>,
59}
60
61/// Fanout router handle.
62///
63/// This is a single-step primitive; long-running routing loops live in `indexbus-route`.
64pub struct FanoutRouter<const N: usize> {
65    shared: *mut SharedFanoutLayout<N>,
66}
67
68/// Consumer handle for a fanout events region.
69///
70/// Consumers only receive messages after they have been routed into their consumer queue.
71/// Without a running router, consumers will not observe new messages.
72pub struct FanoutConsumer<const N: usize> {
73    shared: *mut SharedFanoutLayout<N>,
74    consumer: usize,
75}
76
77// Thread-safety notes:
78// - Handles are thin wrappers around a raw pointer to a shared-memory mapping. The caller must
79//   ensure the mapped memory outlives all handles.
80// - `FanoutProducer` must be used by a single thread at a time.
81// - `FanoutMpscProducer` is safe to share across threads.
82// - `FanoutRouter` and `FanoutConsumer` must be used by a single thread at a time.
83unsafe impl<const N: usize> Send for FanoutProducer<N> {}
84unsafe impl<const N: usize> Send for FanoutMpscProducer<N> {}
85unsafe impl<const N: usize> Sync for FanoutMpscProducer<N> {}
86unsafe impl<const N: usize> Send for FanoutRouter<N> {}
87unsafe impl<const N: usize> Send for FanoutConsumer<N> {}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90/// Selects which producer queue the router drains from.
91pub enum RouterSource {
92    /// Route from the single-producer producer queue.
93    Spsc,
94    /// Route from the multi-producer producer queue.
95    Mpsc,
96}
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
99/// Routing policy for a single message.
100pub enum RouterMode {
101    /// Route each message to all consumer queues.
102    Broadcast,
103    /// Route each message to at most one consumer queue.
104    WorkQueue,
105}
106
107/// Result of a single `FanoutRouter` routing step.
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub struct RouteOnceResult {
110    /// Whether a message was dequeued from the selected source queue.
111    pub routed: bool,
112    /// Number of consumer queues the message was enqueued into.
113    pub delivered: u32,
114    /// Number of enqueue failures (typically due to full consumer queues).
115    ///
116    /// Note: In `RouterMode::Broadcast`, a single message can be dropped for some consumers but
117    /// still delivered to others.
118    pub dropped: u32,
119}
120
121/// Detailed result of a single credit-aware routing step.
122///
123/// This is used by credit-based routing to attribute drops to "no-credit" vs
124/// "queue-full" and (best-effort) to particular consumers.
125///
126/// Notes:
127/// - Masks are meaningful only for `N <= 64`. For larger `N`, masks may be returned as `0`.
128/// - This API is best-effort and is intended for edge-layer policy engines (`indexbus-route`).
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130pub struct RouteOnceCreditResult {
131    /// Whether a message was dequeued from the selected source queue.
132    pub routed: bool,
133
134    /// Number of consumer queues the message was enqueued into.
135    pub delivered: u32,
136
137    /// Number of consumer enqueue failures due to full destination queues.
138    pub dropped_full: u32,
139
140    /// Number of drops attributable to credit exhaustion / ineligible consumers.
141    pub dropped_no_credit: u32,
142
143    /// Consumers that successfully received the message.
144    pub delivered_mask: u64,
145
146    /// Consumers that were eligible but had full queues.
147    pub dropped_full_mask: u64,
148
149    /// Consumers that were ineligible due to no-credit.
150    pub dropped_no_credit_mask: u64,
151}
152
153/// Initialize a shared fanout events layout in-place.
154///
155/// Safe because `&mut` guarantees exclusive access.
156///
157/// `layout_bytes` should reflect the total mapped byte length of the region.
158/// If `capabilities` includes `INDEXBUS_CAP_SUPPORTS_BLOCKING`, the mapping is expected to
159/// include an appended wake section at the next 64B boundary (see `validate_fanout_layout`).
160pub fn init_shared_fanout_layout_with_layout_bytes<const N: usize>(
161    shared: &mut SharedFanoutLayout<N>,
162    capabilities: u32,
163    layout_bytes: u32,
164) {
165    let initialized = unsafe { internal::atomics::atomic_u32(&shared.initialized) };
166    initialized.store(1, Ordering::Release);
167
168    shared.header = LayoutHeader::new_v1_with_layout_bytes(
169        capabilities,
170        flags::INDEXBUS_REGION_KIND_FANOUT,
171        layout_bytes,
172    );
173
174    unsafe {
175        internal::events::pool_init_once(core::ptr::addr_of_mut!(shared.slot_pool));
176        internal::events::index_queue_init(core::ptr::addr_of_mut!(shared.producer_queue));
177        internal::events::mpsc_queue_init(core::ptr::addr_of_mut!(shared.producer_queue_mpsc));
178        internal::atomics::atomic_u32(&shared.router_rr).store(0, Ordering::Release);
179        for q in &mut shared.consumer_queues {
180            internal::events::index_queue_init(q as *mut _);
181        }
182    }
183
184    initialized.store(2, Ordering::Release);
185}
186
187/// Initialize a shared fanout events layout in-place with default v1 capabilities.
188///
189/// This is a convenience wrapper around `init_shared_fanout_layout_with_layout_bytes` using:
190/// - `caps::INDEXBUS_CAP_SUPPORTS_EVENTS | caps::INDEXBUS_CAP_SUPPORTS_FANOUT`
191/// - `size_of::<SharedFanoutLayout<N>>()` as `layout_bytes`
192pub fn init_shared_fanout_layout<const N: usize>(shared: &mut SharedFanoutLayout<N>) {
193    init_shared_fanout_layout_with_layout_bytes::<N>(
194        shared,
195        caps::INDEXBUS_CAP_SUPPORTS_EVENTS | caps::INDEXBUS_CAP_SUPPORTS_FANOUT,
196        core::mem::size_of::<SharedFanoutLayout<N>>() as u32,
197    )
198}
199
200/// Create handles over a fanout layout.
201///
202/// Returns `Err` if the region is not initialized or is not v1-compatible.
203///
204/// The returned handles borrow the region by raw pointer; the caller must keep the underlying
205/// `SharedFanoutLayout<N>` alive (and mapped) for as long as the handles are used.
206pub fn fanout_handles<const N: usize>(
207    shared: &SharedFanoutLayoutCell<N>,
208    consumer: usize,
209) -> Result<(FanoutProducer<N>, FanoutRouter<N>, FanoutConsumer<N>), Error> {
210    let ptr = shared.as_ptr();
211    internal::validate::validate_fanout_layout::<N>(unsafe { &*ptr })?;
212    Ok((
213        FanoutProducer { shared: ptr },
214        FanoutRouter { shared: ptr },
215        FanoutConsumer {
216            shared: ptr,
217            consumer,
218        },
219    ))
220}
221
222/// Create an MPSC producer handle over a fanout layout.
223///
224/// Returns `Err` if the region is not initialized or is not v1-compatible.
225///
226/// The returned handle borrows the region by raw pointer; the caller must keep the underlying
227/// `SharedFanoutLayout<N>` alive (and mapped) for as long as the handle is used.
228pub fn fanout_mpsc_producer<const N: usize>(
229    shared: &SharedFanoutLayoutCell<N>,
230) -> Result<FanoutMpscProducer<N>, Error> {
231    let ptr = shared.as_ptr();
232    internal::validate::validate_fanout_layout::<N>(unsafe { &*ptr })?;
233    Ok(FanoutMpscProducer { shared: ptr })
234}
235
236impl<const N: usize> FanoutProducer<N> {
237    #[inline]
238    /// Publish a byte payload into the producer→router queue.
239    ///
240    /// Returns:
241    /// - `Ok(())` if the message was committed
242    /// - `Err(Error::TooLarge)` if `data.len() > INDEXBUS_SLOT_DATA_SIZE`
243    /// - `Err(Error::Full)` if bounded capacity is exhausted
244    pub fn publish(&self, data: &[u8]) -> Result<(), Error> {
245        if data.len() > INDEXBUS_SLOT_DATA_SIZE {
246            return Err(Error::TooLarge {
247                max: INDEXBUS_SLOT_DATA_SIZE,
248                len: data.len(),
249            });
250        }
251
252        let slot = unsafe { internal::fanout::pool_alloc(self.shared) }.ok_or(Error::Full)?;
253        unsafe {
254            internal::fanout::slot_write(self.shared, slot, data);
255            internal::fanout::producer_queue_push_spsc(self.shared, slot).map_err(|_| {
256                internal::fanout::pool_free(self.shared, slot);
257                Error::Full
258            })?;
259        }
260        Ok(())
261    }
262
263    /// Publish by writing directly into the slot buffer.
264    ///
265    /// The closure must return the number of bytes written. That length becomes the published
266    /// message length.
267    ///
268    /// # Errors
269    ///
270    /// - [`PublishWithError::Encode`]: the closure returned an error; no message is published.
271    /// - [`PublishWithError::Core`]: capacity/layout failures (e.g. full), or `len > INDEXBUS_SLOT_DATA_SIZE`.
272    #[inline]
273    pub fn publish_with<F, E>(&self, f: F) -> Result<(), PublishWithError<E>>
274    where
275        F: FnOnce(&mut [u8]) -> Result<usize, E>,
276    {
277        let slot = unsafe { internal::fanout::pool_alloc(self.shared) }
278            .ok_or(PublishWithError::Core(Error::Full))?;
279
280        let pool = unsafe { core::ptr::addr_of_mut!((*self.shared).slot_pool) };
281        let s = unsafe { core::ptr::addr_of_mut!((*pool).slots[slot as usize]) };
282
283        // Match internal::fanout::slot_write behavior: clear refcnt on publish.
284        unsafe { internal::atomics::atomic_u32(&(*s).refcnt) }.store(0, Ordering::Relaxed);
285
286        let data_ptr = unsafe { core::ptr::addr_of_mut!((*s).data) as *mut u8 };
287        let buf = unsafe { core::slice::from_raw_parts_mut(data_ptr, INDEXBUS_SLOT_DATA_SIZE) };
288
289        let len = match f(buf) {
290            Ok(n) => n,
291            Err(e) => {
292                unsafe { internal::fanout::pool_free(self.shared, slot) };
293                return Err(PublishWithError::Encode(e));
294            }
295        };
296
297        if len > INDEXBUS_SLOT_DATA_SIZE {
298            unsafe { internal::fanout::pool_free(self.shared, slot) };
299            return Err(PublishWithError::Core(Error::TooLarge {
300                max: INDEXBUS_SLOT_DATA_SIZE,
301                len,
302            }));
303        }
304
305        unsafe { core::ptr::addr_of_mut!((*s).len).write(len as u32) };
306        unsafe {
307            internal::fanout::producer_queue_push_spsc(self.shared, slot).map_err(|_| {
308                internal::fanout::pool_free(self.shared, slot);
309                PublishWithError::Core(Error::Full)
310            })?;
311        }
312        Ok(())
313    }
314}
315
316impl<const N: usize> FanoutMpscProducer<N> {
317    #[inline]
318    /// Publish a byte payload into the producer→router MPSC queue.
319    ///
320    /// Returns:
321    /// - `Ok(())` if the message was committed
322    /// - `Err(Error::TooLarge)` if `data.len() > INDEXBUS_SLOT_DATA_SIZE`
323    /// - `Err(Error::Full)` if bounded capacity is exhausted
324    pub fn publish(&self, data: &[u8]) -> Result<(), Error> {
325        if data.len() > INDEXBUS_SLOT_DATA_SIZE {
326            return Err(Error::TooLarge {
327                max: INDEXBUS_SLOT_DATA_SIZE,
328                len: data.len(),
329            });
330        }
331
332        let slot = unsafe { internal::fanout::pool_alloc(self.shared) }.ok_or(Error::Full)?;
333        unsafe {
334            internal::fanout::slot_write(self.shared, slot, data);
335            internal::fanout::producer_queue_push_mpsc(self.shared, slot).map_err(|_| {
336                internal::fanout::pool_free(self.shared, slot);
337                Error::Full
338            })?;
339        }
340        Ok(())
341    }
342
343    /// Publish by writing directly into the slot buffer.
344    ///
345    /// # Errors
346    ///
347    /// - [`PublishWithError::Encode`]: the closure returned an error; no message is published.
348    /// - [`PublishWithError::Core`]: capacity/layout failures (e.g. full), or `len > INDEXBUS_SLOT_DATA_SIZE`.
349    #[inline]
350    pub fn publish_with<F, E>(&self, f: F) -> Result<(), PublishWithError<E>>
351    where
352        F: FnOnce(&mut [u8]) -> Result<usize, E>,
353    {
354        let slot = unsafe { internal::fanout::pool_alloc(self.shared) }
355            .ok_or(PublishWithError::Core(Error::Full))?;
356
357        let pool = unsafe { core::ptr::addr_of_mut!((*self.shared).slot_pool) };
358        let s = unsafe { core::ptr::addr_of_mut!((*pool).slots[slot as usize]) };
359
360        // Match internal::fanout::slot_write behavior: clear refcnt on publish.
361        unsafe { internal::atomics::atomic_u32(&(*s).refcnt) }.store(0, Ordering::Relaxed);
362
363        let data_ptr = unsafe { core::ptr::addr_of_mut!((*s).data) as *mut u8 };
364        let buf = unsafe { core::slice::from_raw_parts_mut(data_ptr, INDEXBUS_SLOT_DATA_SIZE) };
365
366        let len = match f(buf) {
367            Ok(n) => n,
368            Err(e) => {
369                unsafe { internal::fanout::pool_free(self.shared, slot) };
370                return Err(PublishWithError::Encode(e));
371            }
372        };
373
374        if len > INDEXBUS_SLOT_DATA_SIZE {
375            unsafe { internal::fanout::pool_free(self.shared, slot) };
376            return Err(PublishWithError::Core(Error::TooLarge {
377                max: INDEXBUS_SLOT_DATA_SIZE,
378                len,
379            }));
380        }
381
382        unsafe { core::ptr::addr_of_mut!((*s).len).write(len as u32) };
383        unsafe {
384            internal::fanout::producer_queue_push_mpsc(self.shared, slot).map_err(|_| {
385                internal::fanout::pool_free(self.shared, slot);
386                PublishWithError::Core(Error::Full)
387            })?;
388        }
389        Ok(())
390    }
391}
392
393impl<const N: usize> FanoutRouter<N> {
394    /// Route a single event.
395    ///
396    /// Returns `true` if an event was consumed from the source queue.
397    ///
398    /// Ordering for a given consumer matches the router's routing order.
399    #[inline]
400    pub fn route_once_with(&self, source: RouterSource, mode: RouterMode) -> bool {
401        unsafe { internal::fanout::route_once_with(self.shared, source, mode) }
402    }
403
404    #[inline]
405    /// Route a single event and return delivery statistics.
406    pub fn route_once_with_stats(&self, source: RouterSource, mode: RouterMode) -> RouteOnceResult {
407        unsafe { internal::fanout::route_once_with_stats(self.shared, source, mode) }
408    }
409
410    /// Credit-aware routing step.
411    ///
412    /// `eligible_mask` marks which consumers are eligible for delivery.
413    /// `no_credit_mask` marks consumers that are explicitly ineligible due to exhausted credit.
414    /// `full_mask` marks consumers whose destination queues are full.
415    ///
416    /// The router will attempt to deliver only to `eligible_mask` consumers.
417    #[inline]
418    pub fn route_once_with_credit_masks(
419        &self,
420        source: RouterSource,
421        mode: RouterMode,
422        eligible_mask: u64,
423        no_credit_mask: u64,
424        full_mask: u64,
425    ) -> RouteOnceCreditResult {
426        unsafe {
427            internal::fanout::route_once_with_credit_masks(
428                self.shared,
429                source,
430                mode,
431                eligible_mask,
432                no_credit_mask,
433                full_mask,
434            )
435        }
436    }
437
438    #[inline]
439    /// Return the underlying shared-memory pointer.
440    ///
441    /// This is intended for low-level integrations; callers must uphold the mapping lifetime.
442    pub fn as_ptr(&self) -> *mut SharedFanoutLayout<N> {
443        self.shared
444    }
445}
446
447impl<const N: usize> FanoutConsumer<N> {
448    #[inline]
449    /// Attempt to receive the next message into `out`.
450    ///
451    /// Returns:
452    /// - `Ok(Some(n))` when a message of length `n` was copied into `out`
453    /// - `Ok(None)` when empty (or if `consumer` is out of range)
454    /// - `Err(Error::BufferTooSmall)` when `out` cannot hold the message
455    pub fn try_recv_into(&self, out: &mut [u8]) -> Result<Option<usize>, Error> {
456        if self.consumer >= N {
457            return Ok(None);
458        }
459        unsafe { internal::fanout::consumer_try_recv_into(self.shared, self.consumer, out) }
460    }
461
462    /// Receive by borrowing the slot bytes for the duration of `f`.
463    ///
464    /// The borrowed `&[u8]` is only valid for the duration of the callback.
465    ///
466    /// If the callback returns `Err`, the message is still considered consumed; it will not be
467    /// re-delivered.
468    #[inline]
469    pub fn try_recv_with<F, R, E>(&self, f: F) -> Result<Option<R>, RecvWithError<E>>
470    where
471        F: FnOnce(&[u8]) -> Result<R, E>,
472    {
473        if self.consumer >= N {
474            return Ok(None);
475        }
476
477        unsafe { internal::fanout::consumer_try_recv_with(self.shared, self.consumer, f) }
478    }
479}
480
481#[cfg(test)]
482mod tests {
483    extern crate alloc;
484
485    use super::*;
486    use alloc::boxed::Box;
487    use indexbus_abi::layouts::SharedFanoutLayout;
488
489    fn make_layout<const N: usize>() -> Box<SharedFanoutLayout<N>> {
490        let mut shared: Box<SharedFanoutLayout<N>> = Box::new(unsafe { core::mem::zeroed() });
491        init_shared_fanout_layout::<N>(&mut shared);
492        shared
493    }
494
495    #[test]
496    fn mpsc_source_broadcast_delivers_to_all_consumers() {
497        let mut shared = make_layout::<4>();
498        let cell = SharedFanoutLayoutCell::from_mut(&mut shared);
499
500        let (p1, router, cons0) = fanout_handles::<4>(cell, 0).unwrap();
501        let (_p, _r, cons1) = fanout_handles::<4>(cell, 1).unwrap();
502        let (_p, _r, cons2) = fanout_handles::<4>(cell, 2).unwrap();
503        let (_p, _r, cons3) = fanout_handles::<4>(cell, 3).unwrap();
504
505        let mp1 = fanout_mpsc_producer::<4>(cell).unwrap();
506        let mp2 = fanout_mpsc_producer::<4>(cell).unwrap();
507
508        // Ensure both queues can be routed independently.
509        p1.publish(b"s").unwrap();
510        assert!(router.route_once_with(RouterSource::Spsc, RouterMode::Broadcast));
511        let mut out = [0u8; 256];
512        let n0 = cons0.try_recv_into(&mut out).unwrap().unwrap();
513        assert_eq!(&out[..n0], b"s");
514        let n1 = cons1.try_recv_into(&mut out).unwrap().unwrap();
515        assert_eq!(&out[..n1], b"s");
516        let n2 = cons2.try_recv_into(&mut out).unwrap().unwrap();
517        assert_eq!(&out[..n2], b"s");
518        let n3 = cons3.try_recv_into(&mut out).unwrap().unwrap();
519        assert_eq!(&out[..n3], b"s");
520
521        mp1.publish(b"a").unwrap();
522        let r1 = router.route_once_with_stats(RouterSource::Mpsc, RouterMode::Broadcast);
523        assert!(r1.routed);
524        assert_eq!(r1.delivered, 4);
525
526        mp2.publish(b"b").unwrap();
527        let r2 = router.route_once_with_stats(RouterSource::Mpsc, RouterMode::Broadcast);
528        assert!(r2.routed);
529        assert_eq!(r2.delivered, 4);
530
531        let a0 = cons0.try_recv_into(&mut out).unwrap().unwrap();
532        let a1 = cons1.try_recv_into(&mut out).unwrap().unwrap();
533        let a2 = cons2.try_recv_into(&mut out).unwrap().unwrap();
534        let a3 = cons3.try_recv_into(&mut out).unwrap().unwrap();
535        assert_eq!(&out[..a0], b"a");
536        assert_eq!(&out[..a1], b"a");
537        assert_eq!(&out[..a2], b"a");
538        assert_eq!(&out[..a3], b"a");
539
540        let b0 = cons0.try_recv_into(&mut out).unwrap().unwrap();
541        let b1 = cons1.try_recv_into(&mut out).unwrap().unwrap();
542        let b2 = cons2.try_recv_into(&mut out).unwrap().unwrap();
543        let b3 = cons3.try_recv_into(&mut out).unwrap().unwrap();
544        assert_eq!(&out[..b0], b"b");
545        assert_eq!(&out[..b1], b"b");
546        assert_eq!(&out[..b2], b"b");
547        assert_eq!(&out[..b3], b"b");
548    }
549
550    #[test]
551    fn mpsc_source_workqueue_delivers_to_exactly_one_consumer() {
552        let mut shared = make_layout::<4>();
553        let cell = SharedFanoutLayoutCell::from_mut(&mut shared);
554
555        let (_p, router, cons0) = fanout_handles::<4>(cell, 0).unwrap();
556        let (_p, _r, cons1) = fanout_handles::<4>(cell, 1).unwrap();
557        let (_p, _r, cons2) = fanout_handles::<4>(cell, 2).unwrap();
558        let (_p, _r, cons3) = fanout_handles::<4>(cell, 3).unwrap();
559
560        let mp = fanout_mpsc_producer::<4>(cell).unwrap();
561        mp.publish(b"x").unwrap();
562
563        let r = router.route_once_with_stats(RouterSource::Mpsc, RouterMode::WorkQueue);
564        assert!(r.routed);
565        assert_eq!(r.delivered, 1);
566
567        let mut out = [0u8; 256];
568        let mut got = 0;
569        for cons in [&cons0, &cons1, &cons2, &cons3] {
570            if let Some(n) = cons.try_recv_into(&mut out).unwrap() {
571                assert_eq!(&out[..n], b"x");
572                got += 1;
573            }
574        }
575        assert_eq!(got, 1);
576    }
577}