byteor_pipeline_spec/
kv.rs

1//! Stable line-oriented `kv` encoding/decoding (v1).
2
3#[cfg(feature = "alloc")]
4use alloc::collections::BTreeMap;
5
6#[cfg(feature = "alloc")]
7use alloc::format;
8
9#[cfg(feature = "alloc")]
10use alloc::string::{String, ToString};
11
12#[cfg(feature = "alloc")]
13use alloc::vec::Vec;
14
15#[cfg(feature = "alloc")]
16use crate::{
17    canonicalize_v1, validate_v1, BridgeRoleCfgV1, EndpointCfgV1, EndpointKindV1, LaneCfgV1,
18    LaneGraphV1, LaneKindV1, MergePolicyV1, MergeRoleCfgV1, OnFullV1, OrderingV1, PipelineSpecV1,
19    RoleCfgV1, RouterRoleCfgV1, ShardKeyV1, SingleRingProducerV1, SingleRingSchedulingV1,
20    SingleRingV1, SpecError, StageOpV1, StageRoleCfgV1, StageV1,
21};
22
23/// Encode a v1 spec into a stable line-oriented `kv` format.
24///
25/// Format rules (v1):
26/// - `key=value` per line
27/// - keys and values are ASCII; whitespace around `=` is not allowed
28/// - unknown keys are ignored by the decoder (forward-compat)
29#[cfg(feature = "alloc")]
30pub fn encode_kv_v1(spec: &PipelineSpecV1) -> String {
31    let spec = canonicalize_v1(spec);
32    let mut out = String::new();
33    out.push_str("v=1\n");
34    match &spec {
35        PipelineSpecV1::LaneGraph(lg) => {
36            out.push_str("model=lane_graph\n");
37
38            out.push_str("lane_graph.on_full=");
39            match lg.on_full {
40                OnFullV1::Block => out.push_str("block"),
41                OnFullV1::Drop => out.push_str("drop"),
42            }
43            out.push('\n');
44
45            out.push_str("lane_graph.endpoints.len=");
46            out.push_str(&lg.endpoints.len().to_string());
47            out.push('\n');
48            for (i, ep) in lg.endpoints.iter().enumerate() {
49                let p = format!("lane_graph.endpoints.{i}.");
50                out.push_str(&p);
51                out.push_str("name=");
52                out.push_str(&ep.name);
53                out.push('\n');
54
55                out.push_str(&p);
56                out.push_str("kind=");
57                match ep.kind {
58                    EndpointKindV1::Ingress => out.push_str("ingress"),
59                    EndpointKindV1::Egress => out.push_str("egress"),
60                }
61                out.push('\n');
62
63                out.push_str(&p);
64                out.push_str("lane=");
65                out.push_str(&ep.lane);
66                out.push('\n');
67            }
68
69            out.push_str("lane_graph.lanes.len=");
70            out.push_str(&lg.lanes.len().to_string());
71            out.push('\n');
72            for (i, lane) in lg.lanes.iter().enumerate() {
73                let p = format!("lane_graph.lanes.{i}.");
74                out.push_str(&p);
75                out.push_str("name=");
76                out.push_str(&lane.name);
77                out.push('\n');
78
79                out.push_str(&p);
80                out.push_str("kind=");
81                match lane.kind {
82                    LaneKindV1::Events => {
83                        out.push_str("events");
84                        out.push('\n');
85                    }
86                    LaneKindV1::Journal => {
87                        out.push_str("journal");
88                        out.push('\n');
89                    }
90                    LaneKindV1::SequencedSlots { capacity, gating } => {
91                        out.push_str("sequenced_slots");
92                        out.push('\n');
93                        out.push_str(&p);
94                        out.push_str("capacity=");
95                        out.push_str(&capacity.to_string());
96                        out.push('\n');
97                        out.push_str(&p);
98                        out.push_str("gating=");
99                        out.push_str(&gating.to_string());
100                        out.push('\n');
101                    }
102                    LaneKindV1::FanoutBroadcast { consumers } => {
103                        out.push_str("fanout_broadcast");
104                        out.push('\n');
105                        out.push_str(&p);
106                        out.push_str("consumers=");
107                        out.push_str(&consumers.to_string());
108                        out.push('\n');
109                    }
110                }
111            }
112
113            out.push_str("lane_graph.roles.len=");
114            out.push_str(&lg.roles.len().to_string());
115            out.push('\n');
116            for (i, role) in lg.roles.iter().enumerate() {
117                let p = format!("lane_graph.roles.{i}.");
118                match role {
119                    RoleCfgV1::Stage(cfg) => {
120                        out.push_str(&p);
121                        out.push_str("kind=stage\n");
122                        out.push_str(&p);
123                        out.push_str("name=");
124                        out.push_str(&cfg.name);
125                        out.push('\n');
126                        out.push_str(&p);
127                        out.push_str("stage=");
128                        out.push_str(&cfg.stage);
129                        out.push('\n');
130                        out.push_str(&p);
131                        out.push_str("rx=");
132                        out.push_str(&cfg.rx);
133                        out.push('\n');
134                        out.push_str(&p);
135                        out.push_str("tx=");
136                        out.push_str(&cfg.tx);
137                        out.push('\n');
138                    }
139                    RoleCfgV1::Bridge(cfg) => {
140                        out.push_str(&p);
141                        out.push_str("kind=bridge\n");
142                        out.push_str(&p);
143                        out.push_str("name=");
144                        out.push_str(&cfg.name);
145                        out.push('\n');
146                        out.push_str(&p);
147                        out.push_str("rx=");
148                        out.push_str(&cfg.rx);
149                        out.push('\n');
150                        out.push_str(&p);
151                        out.push_str("tx=");
152                        out.push_str(&cfg.tx);
153                        out.push('\n');
154                    }
155                    RoleCfgV1::Router(cfg) => {
156                        out.push_str(&p);
157                        out.push_str("kind=router\n");
158                        out.push_str(&p);
159                        out.push_str("name=");
160                        out.push_str(&cfg.name);
161                        out.push('\n');
162                        out.push_str(&p);
163                        out.push_str("rx=");
164                        out.push_str(&cfg.rx);
165                        out.push('\n');
166                        out.push_str(&p);
167                        out.push_str("tx.len=");
168                        out.push_str(&cfg.tx.len().to_string());
169                        out.push('\n');
170                        for (j, lane_name) in cfg.tx.iter().enumerate() {
171                            out.push_str(&p);
172                            out.push_str("tx.");
173                            out.push_str(&j.to_string());
174                            out.push('=');
175                            out.push_str(lane_name);
176                            out.push('\n');
177                        }
178                    }
179                    RoleCfgV1::Merge(cfg) => {
180                        out.push_str(&p);
181                        out.push_str("kind=merge\n");
182                        out.push_str(&p);
183                        out.push_str("name=");
184                        out.push_str(&cfg.name);
185                        out.push('\n');
186                        out.push_str(&p);
187                        out.push_str("policy=");
188                        match cfg.policy {
189                            MergePolicyV1::RoundRobin => out.push_str("round_robin"),
190                        }
191                        out.push('\n');
192                        out.push_str(&p);
193                        out.push_str("tx=");
194                        out.push_str(&cfg.tx);
195                        out.push('\n');
196                        out.push_str(&p);
197                        out.push_str("rx.len=");
198                        out.push_str(&cfg.rx.len().to_string());
199                        out.push('\n');
200                        for (j, lane_name) in cfg.rx.iter().enumerate() {
201                            out.push_str(&p);
202                            out.push_str("rx.");
203                            out.push_str(&j.to_string());
204                            out.push('=');
205                            out.push_str(lane_name);
206                            out.push('\n');
207                        }
208                    }
209                }
210            }
211        }
212        PipelineSpecV1::SingleRing(ring) => {
213            out.push_str("model=single_ring\n");
214
215            out.push_str("single_ring.shards=");
216            out.push_str(&ring.shards.to_string());
217            out.push('\n');
218
219            out.push_str("single_ring.ordering=");
220            match ring.ordering {
221                OrderingV1::Strict => out.push_str("strict"),
222                OrderingV1::PerKey => out.push_str("per_key"),
223                OrderingV1::Unordered => out.push_str("unordered"),
224            }
225            out.push('\n');
226
227            out.push_str("single_ring.producer=");
228            match ring.producer {
229                SingleRingProducerV1::Single => out.push_str("single"),
230                SingleRingProducerV1::Mpmc => out.push_str("mpmc"),
231            }
232            out.push('\n');
233
234            out.push_str("single_ring.scheduling=");
235            match ring.scheduling {
236                SingleRingSchedulingV1::Dedicated => out.push_str("dedicated"),
237                SingleRingSchedulingV1::WorkQueue => out.push_str("work_queue"),
238            }
239            out.push('\n');
240
241            out.push_str("single_ring.shard_key=");
242            match ring.shard_key {
243                ShardKeyV1::FirstByte => out.push_str("first_byte"),
244            }
245            out.push('\n');
246
247            out.push_str("single_ring.stages=");
248            out.push_str(&ring.stages.len().to_string());
249            out.push('\n');
250
251            for (i, st) in ring.stages.iter().enumerate() {
252                out.push_str("stage.");
253                out.push_str(&i.to_string());
254                out.push_str(".op=");
255                match &st.op {
256                    StageOpV1::Identity => out.push_str("identity\n"),
257                    StageOpV1::AddU8 { delta } => {
258                        out.push_str("add_u8\n");
259                        out.push_str("stage.");
260                        out.push_str(&i.to_string());
261                        out.push_str(".delta=");
262                        out.push_str(&delta.to_string());
263                        out.push('\n');
264                    }
265                    #[cfg(feature = "alloc")]
266                    StageOpV1::ResolverKey { stage } => {
267                        out.push_str("resolver_key\n");
268                        out.push_str("stage.");
269                        out.push_str(&i.to_string());
270                        out.push_str(".stage=");
271                        out.push_str(stage);
272                        out.push('\n');
273                    }
274                }
275
276                out.push_str("stage.");
277                out.push_str(&i.to_string());
278                out.push_str(".depends_on=");
279                for (j, d) in st.depends_on.iter().enumerate() {
280                    if j != 0 {
281                        out.push(',');
282                    }
283                    out.push_str(&d.to_string());
284                }
285                out.push('\n');
286            }
287        }
288    }
289    out
290}
291
292/// Decode a v1 spec from the stable line-oriented `kv` format.
293#[cfg(feature = "alloc")]
294pub fn decode_kv_v1(input: &str) -> Result<PipelineSpecV1, SpecError> {
295    let mut map: BTreeMap<String, String> = BTreeMap::new();
296
297    for raw_line in input.lines() {
298        let line = raw_line.trim();
299        if line.is_empty() || line.starts_with('#') {
300            continue;
301        }
302        let Some((k, v)) = line.split_once('=') else {
303            return Err(SpecError::new("invalid kv line"));
304        };
305        if k.is_empty() {
306            return Err(SpecError::new("invalid kv key"));
307        }
308        map.insert(k.to_string(), v.to_string());
309    }
310
311    if let Some(v) = map.get("v") {
312        if v != "1" {
313            return Err(SpecError::new("unsupported spec version"));
314        }
315    }
316
317    let model = match map.get("model").map(|s| s.as_str()) {
318        Some("lane_graph") | None => "lane_graph",
319        Some("single_ring") => "single_ring",
320        _ => return Err(SpecError::new("unknown execution model")),
321    };
322
323    let spec = if model == "lane_graph" {
324        let lg = decode_lane_graph_v1(&map)?;
325        PipelineSpecV1::LaneGraph(lg)
326    } else {
327        let shards: u32 = map
328            .get("single_ring.shards")
329            .map(|s| s.as_str())
330            .unwrap_or("1")
331            .parse()
332            .map_err(|_| SpecError::new("invalid single_ring.shards"))?;
333
334        // Backwards-compatible defaults:
335        // - unsharded: strict ordering
336        // - sharded: per_key ordering
337        let ordering = match map.get("single_ring.ordering").map(|s| s.as_str()) {
338            None => {
339                if shards > 1 {
340                    OrderingV1::PerKey
341                } else {
342                    OrderingV1::Strict
343                }
344            }
345            Some("strict") => OrderingV1::Strict,
346            Some("per_key") => OrderingV1::PerKey,
347            Some("unordered") => OrderingV1::Unordered,
348            Some(_) => return Err(SpecError::new("unknown single_ring.ordering")),
349        };
350
351        let producer = match map.get("single_ring.producer").map(|s| s.as_str()) {
352            None => SingleRingProducerV1::Single,
353            Some("single") => SingleRingProducerV1::Single,
354            Some("mpmc") => SingleRingProducerV1::Mpmc,
355            Some(_) => return Err(SpecError::new("unknown single_ring.producer")),
356        };
357
358        let scheduling = match map.get("single_ring.scheduling").map(|s| s.as_str()) {
359            None => SingleRingSchedulingV1::Dedicated,
360            Some("dedicated") => SingleRingSchedulingV1::Dedicated,
361            Some("work_queue") => SingleRingSchedulingV1::WorkQueue,
362            Some(_) => return Err(SpecError::new("unknown single_ring.scheduling")),
363        };
364
365        let shard_key = match map.get("single_ring.shard_key").map(|s| s.as_str()) {
366            None => ShardKeyV1::FirstByte,
367            Some("first_byte") => ShardKeyV1::FirstByte,
368            Some(_) => return Err(SpecError::new("unknown single_ring.shard_key")),
369        };
370
371        let n: usize = map
372            .get("single_ring.stages")
373            .ok_or_else(|| SpecError::new("missing single_ring.stages"))?
374            .parse()
375            .map_err(|_| SpecError::new("invalid stage count"))?;
376
377        let mut stages = Vec::with_capacity(n);
378        for i in 0..n {
379            let op_key = format!("stage.{i}.op");
380            let op_val = map
381                .get(&op_key)
382                .ok_or_else(|| SpecError::new("missing stage op"))?;
383            let op = match op_val.as_str() {
384                "identity" => StageOpV1::Identity,
385                "add_u8" => {
386                    let delta_key = format!("stage.{i}.delta");
387                    let delta: u8 = map
388                        .get(&delta_key)
389                        .ok_or_else(|| SpecError::new("missing stage delta"))?
390                        .parse()
391                        .map_err(|_| SpecError::new("invalid stage delta"))?;
392                    StageOpV1::AddU8 { delta }
393                }
394                #[cfg(feature = "alloc")]
395                "resolver" | "resolver_key" => {
396                    let stage_key = format!("stage.{i}.stage");
397                    let stage = map
398                        .get(&stage_key)
399                        .ok_or_else(|| SpecError::new("missing stage identity"))?
400                        .to_string();
401                    StageOpV1::ResolverKey { stage }
402                }
403                _ => return Err(SpecError::new("unknown stage op")),
404            };
405
406            let dep_key = format!("stage.{i}.depends_on");
407            let dep_val = map.get(&dep_key).map(|s| s.as_str()).unwrap_or("");
408            let mut depends_on = Vec::new();
409            let dep_val = dep_val.trim();
410            if !dep_val.is_empty() {
411                for part in dep_val.split(',') {
412                    let d: u32 = part
413                        .parse()
414                        .map_err(|_| SpecError::new("invalid stage dependency"))?;
415                    depends_on.push(d);
416                }
417            }
418
419            stages.push(StageV1 { op, depends_on });
420        }
421
422        PipelineSpecV1::SingleRing(SingleRingV1 {
423            shards,
424            ordering,
425            producer,
426            scheduling,
427            shard_key,
428            stages,
429        })
430    };
431
432    validate_v1(&spec)?;
433    Ok(spec)
434}
435
436#[cfg(feature = "alloc")]
437fn decode_lane_graph_v1(map: &BTreeMap<String, String>) -> Result<LaneGraphV1, SpecError> {
438    fn take_u32(map: &BTreeMap<String, String>, k: &str) -> Result<u32, SpecError> {
439        map.get(k)
440            .ok_or_else(|| SpecError::new("missing lane_graph key"))?
441            .parse()
442            .map_err(|_| SpecError::new("invalid lane_graph integer"))
443    }
444    fn take_usize(map: &BTreeMap<String, String>, k: &str) -> Result<usize, SpecError> {
445        map.get(k)
446            .ok_or_else(|| SpecError::new("missing lane_graph key"))?
447            .parse()
448            .map_err(|_| SpecError::new("invalid lane_graph integer"))
449    }
450    fn take_str<'a>(map: &'a BTreeMap<String, String>, k: &str) -> Result<&'a str, SpecError> {
451        map.get(k)
452            .map(|s| s.as_str())
453            .ok_or_else(|| SpecError::new("missing lane_graph key"))
454    }
455
456    let on_full = match map.get("lane_graph.on_full").map(|s| s.as_str()) {
457        Some("block") => OnFullV1::Block,
458        Some("drop") => OnFullV1::Drop,
459        Some(_) => return Err(SpecError::new("unknown lane_graph.on_full")),
460        None => return Err(SpecError::new("missing lane_graph config")),
461    };
462
463    let endpoints_len = take_usize(map, "lane_graph.endpoints.len")?;
464    let mut endpoints = Vec::with_capacity(endpoints_len);
465    for i in 0..endpoints_len {
466        let p = format!("lane_graph.endpoints.{i}.");
467        let name = take_str(map, &format!("{p}name"))?.to_string();
468        let kind = match take_str(map, &format!("{p}kind"))? {
469            "ingress" => EndpointKindV1::Ingress,
470            "egress" => EndpointKindV1::Egress,
471            _ => return Err(SpecError::new("unknown endpoint kind")),
472        };
473        let lane = take_str(map, &format!("{p}lane"))?.to_string();
474        endpoints.push(EndpointCfgV1 { name, kind, lane });
475    }
476
477    let lanes_len = take_usize(map, "lane_graph.lanes.len")?;
478    let mut lanes = Vec::with_capacity(lanes_len);
479    for i in 0..lanes_len {
480        let p = format!("lane_graph.lanes.{i}.");
481        let name = take_str(map, &format!("{p}name"))?.to_string();
482        let kind_s = take_str(map, &format!("{p}kind"))?;
483        let kind = match kind_s {
484            "events" => LaneKindV1::Events,
485            "journal" => LaneKindV1::Journal,
486            "sequenced_slots" => {
487                let capacity = take_u32(map, &format!("{p}capacity"))?;
488                let gating = take_u32(map, &format!("{p}gating"))?;
489                LaneKindV1::SequencedSlots { capacity, gating }
490            }
491            "fanout_broadcast" => {
492                let consumers = take_u32(map, &format!("{p}consumers"))?;
493                LaneKindV1::FanoutBroadcast { consumers }
494            }
495            _ => return Err(SpecError::new("unknown lane kind")),
496        };
497        lanes.push(LaneCfgV1 { name, kind });
498    }
499
500    let roles_len = take_usize(map, "lane_graph.roles.len")?;
501    let mut roles = Vec::with_capacity(roles_len);
502    for i in 0..roles_len {
503        let p = format!("lane_graph.roles.{i}.");
504        let kind_s = take_str(map, &format!("{p}kind"))?;
505        let name = take_str(map, &format!("{p}name"))?.to_string();
506
507        let role = match kind_s {
508            "stage" => {
509                let stage = take_str(map, &format!("{p}stage"))?.to_string();
510                let rx = take_str(map, &format!("{p}rx"))?.to_string();
511                let tx = take_str(map, &format!("{p}tx"))?.to_string();
512                RoleCfgV1::Stage(StageRoleCfgV1 {
513                    name,
514                    stage,
515                    rx,
516                    tx,
517                })
518            }
519            "bridge" => {
520                let rx = take_str(map, &format!("{p}rx"))?.to_string();
521                let tx = take_str(map, &format!("{p}tx"))?.to_string();
522                RoleCfgV1::Bridge(BridgeRoleCfgV1 { name, rx, tx })
523            }
524            "router" => {
525                let rx = take_str(map, &format!("{p}rx"))?.to_string();
526                let tx_len = take_usize(map, &format!("{p}tx.len"))?;
527                let mut tx = Vec::with_capacity(tx_len);
528                for j in 0..tx_len {
529                    tx.push(take_str(map, &format!("{p}tx.{j}"))?.to_string());
530                }
531                RoleCfgV1::Router(RouterRoleCfgV1 { name, rx, tx })
532            }
533            "merge" => {
534                let policy = match take_str(map, &format!("{p}policy"))? {
535                    "round_robin" => MergePolicyV1::RoundRobin,
536                    _ => return Err(SpecError::new("unknown merge policy")),
537                };
538                let tx = take_str(map, &format!("{p}tx"))?.to_string();
539                let rx_len = take_usize(map, &format!("{p}rx.len"))?;
540                let mut rx = Vec::with_capacity(rx_len);
541                for j in 0..rx_len {
542                    rx.push(take_str(map, &format!("{p}rx.{j}"))?.to_string());
543                }
544                RoleCfgV1::Merge(MergeRoleCfgV1 {
545                    name,
546                    rx,
547                    tx,
548                    policy,
549                })
550            }
551            _ => return Err(SpecError::new("unknown role kind")),
552        };
553        roles.push(role);
554    }
555
556    Ok(LaneGraphV1 {
557        endpoints,
558        lanes,
559        roles,
560        on_full,
561    })
562}
563