1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use std::fmt::Debug;
use std::pin::Pin;
use std::task::{Context, Poll};
use flume::r#async::RecvStream;
use futures::{pin_mut, Stream, StreamExt};
use crate::{Component, ShutdownOnDrop};
pub struct ComponentStream<C: Component> {
pub(super) stream: RecvStream<'static, C::Output>,
pub(super) shutdown_on_drop: ShutdownOnDrop,
}
impl<C: Component> Stream for ComponentStream<C> {
type Item = C::Output;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let stream = &mut self.stream;
pin_mut!(stream);
stream.poll_next(cx)
}
}
impl<C: Component> ComponentStream<C> {
pub async fn recv_one(mut self) -> Option<C::Output> {
self.stream.next().await
}
}
impl<C: Component> ComponentStream<C> {
pub fn detach_runtime(&mut self) {
self.shutdown_on_drop.deactivate();
}
}
impl<C: Component> Debug for ComponentStream<C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ComponentStream")
.field("stream", &"<RecvStream>")
.finish()
}
}
#[cfg(test)]
mod test {
use std::rc::Rc;
use crate::{Component, ComponentParts, SimpleComponent};
fn assert_send<T: Send>(_stream: T) {}
struct Test(Rc<()>);
impl SimpleComponent for Test {
type Input = ();
type Output = ();
type Init = ();
type Root = Rc<()>;
type Widgets = Rc<()>;
fn init_root() -> Self::Root {
Rc::default()
}
fn init(
_init: Self::Init,
_root: &Self::Root,
_sender: crate::ComponentSender<Self>,
) -> ComponentParts<Self> {
ComponentParts {
model: Test(Rc::default()),
widgets: Rc::default(),
}
}
}
#[gtk::test]
fn stream_is_send() {
let stream = Test::builder().launch(()).into_stream();
assert_send(stream);
}
}