renderbox_sdk/graph/
stream.rs1use std::marker::PhantomData;
2use std::rc::Rc;
3
4use renderbox_dsl::NodeId;
5
6use crate::sorts::Sort;
7
8use super::builder::SharedGraph;
9
10pub struct Stream<S: Sort> {
11 pub(crate) node_id: NodeId,
12 pub(crate) graph: SharedGraph,
13 _sort: PhantomData<S>,
14}
15
16impl<S: Sort> Clone for Stream<S> {
17 fn clone(&self) -> Self {
18 Self {
19 node_id: self.node_id,
20 graph: Rc::clone(&self.graph),
21 _sort: PhantomData,
22 }
23 }
24}
25
26impl<S: Sort> Stream<S> {
27 pub(crate) fn new(node_id: NodeId, graph: SharedGraph) -> Self {
28 Self {
29 node_id,
30 graph,
31 _sort: PhantomData,
32 }
33 }
34
35 pub fn pipe<T: Sort>(&self, f: impl FnOnce(Stream<S>) -> Stream<T>) -> Stream<T> {
36 f(self.clone())
37 }
38}
39
40#[cfg(test)]
41mod tests {
42 use super::*;
43 use std::collections::BTreeMap;
44 use renderbox_dsl::OpNode;
45 use crate::graph::input::input;
46 use crate::sorts::{Detection, Video};
47
48 #[test]
49 fn pipe_adds_node_to_graph() {
50 let (v, _a) = input("test.mp4");
51 let v2: Stream<Video> = v.pipe(|s| {
52 let node_id = s.graph.borrow_mut().add_node(OpNode::Filter {
53 name: "scale".into(),
54 params: BTreeMap::new(),
55 input: s.node_id,
56 });
57 Stream::new(node_id, s.graph)
58 });
59 assert_eq!(v2.graph.borrow().arena.len(), 2);
60 }
61
62 #[test]
63 fn pipe_preserves_graph_sharing() {
64 let (v, a) = input("test.mp4");
65 let v2: Stream<Video> = v.pipe(|s| {
66 let node_id = s.graph.borrow_mut().add_node(OpNode::Filter {
67 name: "scale".into(),
68 params: BTreeMap::new(),
69 input: s.node_id,
70 });
71 Stream::new(node_id, s.graph)
72 });
73 assert!(Rc::ptr_eq(&v2.graph, &a.graph));
74 }
75
76 #[test]
77 fn dag_sharing_same_stream_used_twice() {
78 let (v, _a) = input("test.mp4");
79 let v1: Stream<Video> = v.pipe(|s| {
80 let node_id = s.graph.borrow_mut().add_node(OpNode::Filter {
81 name: "scale".into(),
82 params: BTreeMap::new(),
83 input: s.node_id,
84 });
85 Stream::new(node_id, s.graph)
86 });
87 let v2: Stream<Video> = v.pipe(|s| {
88 let node_id = s.graph.borrow_mut().add_node(OpNode::Filter {
89 name: "crop".into(),
90 params: BTreeMap::new(),
91 input: s.node_id,
92 });
93 Stream::new(node_id, s.graph)
94 });
95 assert_eq!(v1.graph.borrow().arena.len(), 3);
96 assert!(Rc::ptr_eq(&v1.graph, &v2.graph));
97 }
98
99 #[test]
100 fn cross_sort_pipe() {
101 let (v, _a) = input("test.mp4");
102 let _d: Stream<Detection> = v.pipe(|s: Stream<Video>| {
103 let node_id = s.graph.borrow_mut().add_node(OpNode::Fanout {
104 op: "detect".into(),
105 params: BTreeMap::new(),
106 input: s.node_id,
107 });
108 Stream::new(node_id, s.graph)
109 });
110 }
111}