byteor_pipeline_spec/
builder.rs1#[cfg(feature = "alloc")]
4use alloc::vec::Vec;
5
6use crate::{
7 OrderingV1, PipelineSpecV1, ShardKeyV1, SingleRingProducerV1, SingleRingSchedulingV1,
8 SingleRingV1, SpecError, StageOpV1, StageV1, SEQUENCED_SLOTS_V1_GATING,
9};
10
11#[cfg(feature = "alloc")]
15pub struct SingleRingBuilder {
16 shards: u32,
17 ordering: OrderingV1,
18 producer: SingleRingProducerV1,
19 scheduling: SingleRingSchedulingV1,
20 shard_key: ShardKeyV1,
21 stages: Vec<StageV1>,
22}
23
24#[cfg(feature = "alloc")]
25impl Default for SingleRingBuilder {
26 fn default() -> Self {
27 Self {
28 shards: 1,
29 ordering: OrderingV1::Strict,
30 producer: SingleRingProducerV1::Single,
31 scheduling: SingleRingSchedulingV1::Dedicated,
32 shard_key: ShardKeyV1::FirstByte,
33 stages: Vec::new(),
34 }
35 }
36}
37
38#[cfg(feature = "alloc")]
39impl SingleRingBuilder {
40 pub fn new() -> Self {
42 Self::default()
43 }
44
45 pub fn shards(&mut self, shards: u32) -> &mut Self {
47 self.shards = shards;
48 if self.shards > 1 && self.ordering == OrderingV1::Strict {
50 self.ordering = OrderingV1::PerKey;
51 }
52 self
53 }
54
55 pub fn ordering(&mut self, ordering: OrderingV1) -> &mut Self {
57 self.ordering = ordering;
58 self
59 }
60
61 pub fn producer(&mut self, producer: SingleRingProducerV1) -> &mut Self {
63 self.producer = producer;
64 self
65 }
66
67 pub fn scheduling(&mut self, scheduling: SingleRingSchedulingV1) -> &mut Self {
69 self.scheduling = scheduling;
70 self
71 }
72
73 pub fn shard_key(&mut self, shard_key: ShardKeyV1) -> &mut Self {
75 self.shard_key = shard_key;
76 self
77 }
78
79 pub fn push_source_stage(&mut self, op: StageOpV1) -> Result<u32, SpecError> {
81 self.push_stage(op, &[])
82 }
83
84 pub fn push_stage(&mut self, op: StageOpV1, depends_on: &[u32]) -> Result<u32, SpecError> {
89 if self.shards == 0 {
90 return Err(SpecError::new("single_ring.shards must be >= 1"));
91 }
92
93 if self.stages.len() >= SEQUENCED_SLOTS_V1_GATING as usize {
96 return Err(SpecError::new("single_ring stage count exceeds v1 limit"));
97 }
98
99 let next_idx = self.stages.len() as u32;
100
101 for &d in depends_on {
103 if d >= next_idx {
104 return Err(SpecError::new(
105 "single_ring dependencies must reference earlier stages",
106 ));
107 }
108 }
109
110 let mut deps = Vec::from(depends_on);
112 deps.sort_unstable();
113 deps.dedup();
114
115 self.stages.push(StageV1 {
116 op,
117 depends_on: deps,
118 });
119 Ok(next_idx)
120 }
121
122 pub fn build(self) -> Result<PipelineSpecV1, SpecError> {
124 if self.stages.is_empty() {
125 return Err(SpecError::new("single_ring requires at least one stage"));
126 }
127
128 Ok(PipelineSpecV1::SingleRing(SingleRingV1 {
129 shards: self.shards,
130 ordering: self.ordering,
131 producer: self.producer,
132 scheduling: self.scheduling,
133 shard_key: self.shard_key,
134 stages: self.stages,
135 }))
136 }
137}
138
139#[cfg(feature = "alloc")]
141pub fn single_ring_chain(ops: &[StageOpV1]) -> Result<PipelineSpecV1, SpecError> {
142 let mut b = SingleRingBuilder::new();
143 let mut prev: Option<u32> = None;
144
145 for op in ops.iter().cloned() {
146 let idx = match prev {
147 None => b.push_source_stage(op)?,
148 Some(p) => b.push_stage(op, &[p])?,
149 };
150 prev = Some(idx);
151 }
152
153 b.build()
154}