relm4/component/sync/
stream.rs1use std::fmt::Debug;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use flume::r#async::RecvStream;
6use futures::{Stream, StreamExt, pin_mut};
7
8use crate::{Component, ShutdownOnDrop};
9
10pub struct ComponentStream<C: Component> {
19 pub(super) stream: RecvStream<'static, C::Output>,
21 pub(super) shutdown_on_drop: ShutdownOnDrop,
22}
23
24impl<C: Component> Stream for ComponentStream<C> {
25 type Item = C::Output;
26
27 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
28 let stream = &mut self.stream;
29 pin_mut!(stream);
30 stream.poll_next(cx)
31 }
32}
33
34impl<C: Component> ComponentStream<C> {
35 pub async fn recv_one(mut self) -> Option<C::Output> {
38 self.stream.next().await
39 }
40}
41
42impl<C: Component> ComponentStream<C> {
43 pub fn detach_runtime(&mut self) {
48 self.shutdown_on_drop.deactivate();
49 }
50}
51
52impl<C: Component> Debug for ComponentStream<C> {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 f.debug_struct("ComponentStream")
55 .field("stream", &"<RecvStream>")
56 .finish()
57 }
58}
59
60#[cfg(test)]
61mod test {
62 use std::rc::Rc;
63
64 use crate::{Component, ComponentParts, SimpleComponent};
65
66 fn assert_send<T: Send>(_stream: T) {}
67
68 #[allow(dead_code)]
69 struct Test(Rc<()>);
70
71 impl SimpleComponent for Test {
72 type Input = ();
73 type Output = ();
74 type Init = ();
75 type Root = Rc<()>;
76 type Widgets = Rc<()>;
77
78 fn init_root() -> Self::Root {
79 Rc::default()
80 }
81
82 fn init(
83 _init: Self::Init,
84 _root: Self::Root,
85 _sender: crate::ComponentSender<Self>,
86 ) -> ComponentParts<Self> {
87 ComponentParts {
88 model: Test(Rc::default()),
89 widgets: Rc::default(),
90 }
91 }
92 }
93
94 #[gtk::test]
95 fn stream_is_send() {
96 let stream = Test::builder().launch(()).into_stream();
97 assert_send(stream);
98 }
99}