byteor_pipeline_spec/
builder.rs

1//! Builder-first helpers for constructing v1 specs.
2
3#[cfg(feature = "alloc")]
4use alloc::vec::Vec;
5
6use crate::{
7    OrderingV1, PipelineSpecV1, ShardKeyV1, SingleRingProducerV1, SingleRingSchedulingV1,
8    SingleRingV1, SpecError, StageOpV1, StageV1, SEQUENCED_SLOTS_V1_GATING,
9};
10
11/// Builder for `ExecutionModelV1::SingleRing` specs.
12///
13/// This is intentionally small: it helps construct valid v1 specs without macros.
14#[cfg(feature = "alloc")]
15pub struct SingleRingBuilder {
16    shards: u32,
17    ordering: OrderingV1,
18    producer: SingleRingProducerV1,
19    scheduling: SingleRingSchedulingV1,
20    shard_key: ShardKeyV1,
21    stages: Vec<StageV1>,
22}
23
24#[cfg(feature = "alloc")]
25impl Default for SingleRingBuilder {
26    fn default() -> Self {
27        Self {
28            shards: 1,
29            ordering: OrderingV1::Strict,
30            producer: SingleRingProducerV1::Single,
31            scheduling: SingleRingSchedulingV1::Dedicated,
32            shard_key: ShardKeyV1::FirstByte,
33            stages: Vec::new(),
34        }
35    }
36}
37
38#[cfg(feature = "alloc")]
39impl SingleRingBuilder {
40    /// Create a new builder with v1 defaults.
41    pub fn new() -> Self {
42        Self::default()
43    }
44
45    /// Set the number of PerKey shards.
46    pub fn shards(&mut self, shards: u32) -> &mut Self {
47        self.shards = shards;
48        // Default ordering for PerKey sharding.
49        if self.shards > 1 && self.ordering == OrderingV1::Strict {
50            self.ordering = OrderingV1::PerKey;
51        }
52        self
53    }
54
55    /// Set requested ordering semantics.
56    pub fn ordering(&mut self, ordering: OrderingV1) -> &mut Self {
57        self.ordering = ordering;
58        self
59    }
60
61    /// Set requested producer model.
62    pub fn producer(&mut self, producer: SingleRingProducerV1) -> &mut Self {
63        self.producer = producer;
64        self
65    }
66
67    /// Set requested scheduling model.
68    pub fn scheduling(&mut self, scheduling: SingleRingSchedulingV1) -> &mut Self {
69        self.scheduling = scheduling;
70        self
71    }
72
73    /// Set the shard key extractor.
74    pub fn shard_key(&mut self, shard_key: ShardKeyV1) -> &mut Self {
75        self.shard_key = shard_key;
76        self
77    }
78
79    /// Add a stage that depends directly on the producer cursor.
80    pub fn push_source_stage(&mut self, op: StageOpV1) -> Result<u32, SpecError> {
81        self.push_stage(op, &[])
82    }
83
84    /// Add a stage with explicit dependencies.
85    ///
86    /// `depends_on` contains stage indices; each dependency must reference a stage that has already
87    /// been added.
88    pub fn push_stage(&mut self, op: StageOpV1, depends_on: &[u32]) -> Result<u32, SpecError> {
89        if self.shards == 0 {
90            return Err(SpecError::new("single_ring.shards must be >= 1"));
91        }
92
93        // Current OSS backing uses `SequencedSlotsLayout4` with 4 gating sequences.
94        // v1 contract: the final stage releases slots, so we can use all 4 gating cells.
95        if self.stages.len() >= SEQUENCED_SLOTS_V1_GATING as usize {
96            return Err(SpecError::new("single_ring stage count exceeds v1 limit"));
97        }
98
99        let next_idx = self.stages.len() as u32;
100
101        // Validate dependencies.
102        for &d in depends_on {
103            if d >= next_idx {
104                return Err(SpecError::new(
105                    "single_ring dependencies must reference earlier stages",
106                ));
107            }
108        }
109
110        // Normalize: sort + dedup dependencies to keep `kv` encoding deterministic.
111        let mut deps = Vec::from(depends_on);
112        deps.sort_unstable();
113        deps.dedup();
114
115        self.stages.push(StageV1 {
116            op,
117            depends_on: deps,
118        });
119        Ok(next_idx)
120    }
121
122    /// Build the full `PipelineSpecV1`.
123    pub fn build(self) -> Result<PipelineSpecV1, SpecError> {
124        if self.stages.is_empty() {
125            return Err(SpecError::new("single_ring requires at least one stage"));
126        }
127
128        Ok(PipelineSpecV1::SingleRing(SingleRingV1 {
129            shards: self.shards,
130            ordering: self.ordering,
131            producer: self.producer,
132            scheduling: self.scheduling,
133            shard_key: self.shard_key,
134            stages: self.stages,
135        }))
136    }
137}
138
139/// Convenience helper to build a SingleRing chain: stage `i` depends on stage `i-1`.
140#[cfg(feature = "alloc")]
141pub fn single_ring_chain(ops: &[StageOpV1]) -> Result<PipelineSpecV1, SpecError> {
142    let mut b = SingleRingBuilder::new();
143    let mut prev: Option<u32> = None;
144
145    for op in ops.iter().cloned() {
146        let idx = match prev {
147            None => b.push_source_stage(op)?,
148            Some(p) => b.push_stage(op, &[p])?,
149        };
150        prev = Some(idx);
151    }
152
153    b.build()
154}