Skip to main content

renderbox_sdk/graph/
stream.rs

1use std::marker::PhantomData;
2use std::rc::Rc;
3
4use renderbox_dsl::NodeId;
5
6use crate::sorts::Sort;
7
8use super::builder::SharedGraph;
9
10pub struct Stream<S: Sort> {
11    pub(crate) node_id: NodeId,
12    pub(crate) graph: SharedGraph,
13    _sort: PhantomData<S>,
14}
15
16impl<S: Sort> Clone for Stream<S> {
17    fn clone(&self) -> Self {
18        Self {
19            node_id: self.node_id,
20            graph: Rc::clone(&self.graph),
21            _sort: PhantomData,
22        }
23    }
24}
25
26impl<S: Sort> Stream<S> {
27    pub(crate) fn new(node_id: NodeId, graph: SharedGraph) -> Self {
28        Self {
29            node_id,
30            graph,
31            _sort: PhantomData,
32        }
33    }
34
35    pub fn pipe<T: Sort>(&self, f: impl FnOnce(Stream<S>) -> Stream<T>) -> Stream<T> {
36        f(self.clone())
37    }
38}
39
40#[cfg(test)]
41mod tests {
42    use super::*;
43    use std::collections::BTreeMap;
44    use renderbox_dsl::OpNode;
45    use crate::graph::input::input;
46    use crate::sorts::{Detection, Video};
47
48    #[test]
49    fn pipe_adds_node_to_graph() {
50        let (v, _a) = input("test.mp4");
51        let v2: Stream<Video> = v.pipe(|s| {
52            let node_id = s.graph.borrow_mut().add_node(OpNode::Filter {
53                name: "scale".into(),
54                params: BTreeMap::new(),
55                input: s.node_id,
56            });
57            Stream::new(node_id, s.graph)
58        });
59        assert_eq!(v2.graph.borrow().arena.len(), 2);
60    }
61
62    #[test]
63    fn pipe_preserves_graph_sharing() {
64        let (v, a) = input("test.mp4");
65        let v2: Stream<Video> = v.pipe(|s| {
66            let node_id = s.graph.borrow_mut().add_node(OpNode::Filter {
67                name: "scale".into(),
68                params: BTreeMap::new(),
69                input: s.node_id,
70            });
71            Stream::new(node_id, s.graph)
72        });
73        assert!(Rc::ptr_eq(&v2.graph, &a.graph));
74    }
75
76    #[test]
77    fn dag_sharing_same_stream_used_twice() {
78        let (v, _a) = input("test.mp4");
79        let v1: Stream<Video> = v.pipe(|s| {
80            let node_id = s.graph.borrow_mut().add_node(OpNode::Filter {
81                name: "scale".into(),
82                params: BTreeMap::new(),
83                input: s.node_id,
84            });
85            Stream::new(node_id, s.graph)
86        });
87        let v2: Stream<Video> = v.pipe(|s| {
88            let node_id = s.graph.borrow_mut().add_node(OpNode::Filter {
89                name: "crop".into(),
90                params: BTreeMap::new(),
91                input: s.node_id,
92            });
93            Stream::new(node_id, s.graph)
94        });
95        assert_eq!(v1.graph.borrow().arena.len(), 3);
96        assert!(Rc::ptr_eq(&v1.graph, &v2.graph));
97    }
98
99    #[test]
100    fn cross_sort_pipe() {
101        let (v, _a) = input("test.mp4");
102        let _d: Stream<Detection> = v.pipe(|s: Stream<Video>| {
103            let node_id = s.graph.borrow_mut().add_node(OpNode::Fanout {
104                op: "detect".into(),
105                params: BTreeMap::new(),
106                input: s.node_id,
107            });
108            Stream::new(node_id, s.graph)
109        });
110    }
111}