1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
mod component;
pub mod shutdown;
pub use component::{AsyncComponentSender, AsyncFactorySender, ComponentSender, FactorySender};
use std::fmt;
use flume::r#async::RecvStream;
#[must_use]
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let (tx, rx) = flume::unbounded();
(Sender(tx), Receiver(rx))
}
pub struct Sender<T>(pub(crate) flume::Sender<T>);
impl<T> From<flume::Sender<T>> for Sender<T> {
fn from(sender: flume::Sender<T>) -> Self {
Self(sender)
}
}
impl<T> Sender<T> {
pub fn emit(&self, message: T) {
if self.send(message).is_err() {
tracing::warn!("Receiver was dropped");
}
}
pub fn send(&self, message: T) -> Result<(), T> {
self.0.send(message).map_err(|e| e.into_inner())
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("Sender").field(&self.0).finish()
}
}
pub struct Receiver<T>(pub(crate) flume::Receiver<T>);
impl<T> Receiver<T> {
pub async fn recv(&self) -> Option<T> {
self.0.recv_async().await.ok()
}
#[must_use]
pub fn recv_sync(&self) -> Option<T> {
self.0.recv().ok()
}
#[must_use]
pub(crate) fn into_stream(self) -> RecvStream<'static, T> {
self.0.into_stream()
}
pub async fn forward<Transformer, Output>(
self,
sender: impl Into<Sender<Output>>,
transformer: Transformer,
) where
Transformer: (Fn(T) -> Output) + 'static,
Output: 'static,
{
let sender = sender.into();
while let Some(event) = self.recv().await {
if sender.send(transformer(event)).is_err() {
return;
}
}
}
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("Receiver").field(&self.0).finish()
}
}