relm4/component/sync/
stream.rs

1use std::fmt::Debug;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use flume::r#async::RecvStream;
6use futures::{Stream, StreamExt, pin_mut};
7
8use crate::{Component, ShutdownOnDrop};
9
10/// Yields [`Component::Output`] values as a stream and contains the
11/// input sender and the root widget.
12///
13/// Use this as alternative to [`Controller`](crate::Controller) when
14/// you prefer a stream of futures or want to unlock the potential of
15/// [`StreamExt`](futures::StreamExt).
16/// Also, this type implements [`Send`] so using it in commands is
17/// possible.
18pub struct ComponentStream<C: Component> {
19    /// The outputs being received by the component.
20    pub(super) stream: RecvStream<'static, C::Output>,
21    pub(super) shutdown_on_drop: ShutdownOnDrop,
22}
23
24impl<C: Component> Stream for ComponentStream<C> {
25    type Item = C::Output;
26
27    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
28        let stream = &mut self.stream;
29        pin_mut!(stream);
30        stream.poll_next(cx)
31    }
32}
33
34impl<C: Component> ComponentStream<C> {
35    /// Receive one message and drop the component afterwards.
36    /// This can be used for dialogs.
37    pub async fn recv_one(mut self) -> Option<C::Output> {
38        self.stream.next().await
39    }
40}
41
42impl<C: Component> ComponentStream<C> {
43    /// Dropping this type will usually stop the runtime of the component.
44    /// With this method you can give the runtime a static lifetime.
45    /// In other words, dropping the stream will not stop
46    /// the runtime anymore, instead it will run until the app is closed.
47    pub fn detach_runtime(&mut self) {
48        self.shutdown_on_drop.deactivate();
49    }
50}
51
52impl<C: Component> Debug for ComponentStream<C> {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        f.debug_struct("ComponentStream")
55            .field("stream", &"<RecvStream>")
56            .finish()
57    }
58}
59
60#[cfg(test)]
61mod test {
62    use std::rc::Rc;
63
64    use crate::{Component, ComponentParts, SimpleComponent};
65
66    fn assert_send<T: Send>(_stream: T) {}
67
68    #[allow(dead_code)]
69    struct Test(Rc<()>);
70
71    impl SimpleComponent for Test {
72        type Input = ();
73        type Output = ();
74        type Init = ();
75        type Root = Rc<()>;
76        type Widgets = Rc<()>;
77
78        fn init_root() -> Self::Root {
79            Rc::default()
80        }
81
82        fn init(
83            _init: Self::Init,
84            _root: Self::Root,
85            _sender: crate::ComponentSender<Self>,
86        ) -> ComponentParts<Self> {
87            ComponentParts {
88                model: Test(Rc::default()),
89                widgets: Rc::default(),
90            }
91        }
92    }
93
94    #[gtk::test]
95    fn stream_is_send() {
96        let stream = Test::builder().launch(()).into_stream();
97        assert_send(stream);
98    }
99}