relm4/component/async/
stream.rs1use std::fmt::Debug;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use flume::r#async::RecvStream;
6use futures::{Stream, StreamExt, pin_mut};
7
8use crate::component::AsyncComponent;
9use crate::runtime_util::ShutdownOnDrop;
10
11pub struct AsyncComponentStream<C: AsyncComponent> {
20 pub(super) stream: RecvStream<'static, C::Output>,
22 pub(super) shutdown_on_drop: ShutdownOnDrop,
23}
24
25impl<C: AsyncComponent> Stream for AsyncComponentStream<C> {
26 type Item = C::Output;
27
28 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
29 let stream = &mut self.stream;
30 pin_mut!(stream);
31 stream.poll_next(cx)
32 }
33}
34
35impl<C: AsyncComponent> AsyncComponentStream<C> {
36 pub async fn recv_one(mut self) -> Option<C::Output> {
39 self.stream.next().await
40 }
41}
42
43impl<C: AsyncComponent> AsyncComponentStream<C> {
44 pub fn detach_runtime(&mut self) {
49 self.shutdown_on_drop.deactivate();
50 }
51}
52
53impl<C: AsyncComponent> Debug for AsyncComponentStream<C> {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 f.debug_struct("ComponentStream")
56 .field("stream", &"<RecvStream>")
57 .finish()
58 }
59}
60
61#[cfg(test)]
62mod test {
63 use std::rc::Rc;
64
65 use crate::component::{AsyncComponent, AsyncComponentParts};
66
67 fn assert_send<T: Send>(_stream: T) {}
68
69 #[allow(dead_code)]
70 struct Test(Rc<()>);
71
72 #[allow(unused_qualifications)]
73 impl AsyncComponent for Test {
74 type Input = ();
75 type Output = ();
76 type CommandOutput = ();
77 type Init = ();
78 type Root = Rc<()>;
79 type Widgets = Rc<()>;
80
81 fn init_root() -> Self::Root {
82 Rc::default()
83 }
84
85 async fn init(
86 _init: Self::Init,
87 _root: Self::Root,
88 _sender: crate::AsyncComponentSender<Self>,
89 ) -> AsyncComponentParts<Self> {
90 AsyncComponentParts {
91 model: Test(Rc::default()),
92 widgets: Rc::default(),
93 }
94 }
95 }
96
97 #[gtk::test]
98 fn stream_is_send() {
99 let stream = Test::builder().launch(()).into_stream();
100 assert_send(stream);
101 }
102}