relm4/component/async/
stream.rs

1use std::fmt::Debug;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use flume::r#async::RecvStream;
6use futures::{Stream, StreamExt, pin_mut};
7
8use crate::component::AsyncComponent;
9use crate::runtime_util::ShutdownOnDrop;
10
11/// Yields [`AsyncComponent::Output`] values as a stream and contains the
12/// input sender and the root widget.
13///
14/// Use this as alternative to [`AsyncController`](crate::component::AsyncController) when
15/// you prefer a stream of futures or want to unlock the potential of
16/// [`StreamExt`](futures::StreamExt).
17/// Also, this type implements [`Send`] so using it in commands is
18/// possible.
19pub struct AsyncComponentStream<C: AsyncComponent> {
20    /// The outputs being received by the component.
21    pub(super) stream: RecvStream<'static, C::Output>,
22    pub(super) shutdown_on_drop: ShutdownOnDrop,
23}
24
25impl<C: AsyncComponent> Stream for AsyncComponentStream<C> {
26    type Item = C::Output;
27
28    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
29        let stream = &mut self.stream;
30        pin_mut!(stream);
31        stream.poll_next(cx)
32    }
33}
34
35impl<C: AsyncComponent> AsyncComponentStream<C> {
36    /// Receive one message and drop the component afterwards.
37    /// This can be used for dialogs.
38    pub async fn recv_one(mut self) -> Option<C::Output> {
39        self.stream.next().await
40    }
41}
42
43impl<C: AsyncComponent> AsyncComponentStream<C> {
44    /// Dropping this type will usually stop the runtime of the worker.
45    /// With this method you can give the runtime a static lifetime.
46    /// In other words, dropping the stream will not stop
47    /// the runtime anymore, instead it will run until the app is closed.
48    pub fn detach_runtime(&mut self) {
49        self.shutdown_on_drop.deactivate();
50    }
51}
52
53impl<C: AsyncComponent> Debug for AsyncComponentStream<C> {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        f.debug_struct("ComponentStream")
56            .field("stream", &"<RecvStream>")
57            .finish()
58    }
59}
60
61#[cfg(test)]
62mod test {
63    use std::rc::Rc;
64
65    use crate::component::{AsyncComponent, AsyncComponentParts};
66
67    fn assert_send<T: Send>(_stream: T) {}
68
69    #[allow(dead_code)]
70    struct Test(Rc<()>);
71
72    #[allow(unused_qualifications)]
73    impl AsyncComponent for Test {
74        type Input = ();
75        type Output = ();
76        type CommandOutput = ();
77        type Init = ();
78        type Root = Rc<()>;
79        type Widgets = Rc<()>;
80
81        fn init_root() -> Self::Root {
82            Rc::default()
83        }
84
85        async fn init(
86            _init: Self::Init,
87            _root: Self::Root,
88            _sender: crate::AsyncComponentSender<Self>,
89        ) -> AsyncComponentParts<Self> {
90            AsyncComponentParts {
91                model: Test(Rc::default()),
92                widgets: Rc::default(),
93            }
94        }
95    }
96
97    #[gtk::test]
98    fn stream_is_send() {
99        let stream = Test::builder().launch(()).into_stream();
100        assert_send(stream);
101    }
102}