1mod component;
2pub mod shutdown;
4
5pub use component::{AsyncComponentSender, AsyncFactorySender, ComponentSender, FactorySender};
6
7use std::fmt;
11
12use flume::r#async::RecvStream;
13
14#[must_use]
17pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
18 let (tx, rx) = flume::unbounded();
19 (Sender(tx), Receiver(rx))
20}
21
22pub struct Sender<T>(pub(crate) flume::Sender<T>);
24
25impl<T> From<flume::Sender<T>> for Sender<T> {
26 fn from(sender: flume::Sender<T>) -> Self {
27 Self(sender)
28 }
29}
30
31impl<T> Sender<T> {
32 pub fn emit(&self, message: T) {
37 if self.send(message).is_err() {
38 tracing::warn!("Receiver was dropped");
39 }
40 }
41
42 pub fn send(&self, message: T) -> Result<(), T> {
47 self.0.send(message).map_err(|e| e.into_inner())
48 }
49}
50
51impl<T> Clone for Sender<T> {
52 fn clone(&self) -> Self {
53 Self(self.0.clone())
54 }
55}
56
57impl<T> fmt::Debug for Sender<T> {
58 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 f.debug_tuple("Sender").field(&self.0).finish()
60 }
61}
62
63pub struct Receiver<T>(pub(crate) flume::Receiver<T>);
65
66impl<T> Receiver<T> {
67 pub async fn recv(&self) -> Option<T> {
71 self.0.recv_async().await.ok()
72 }
73
74 #[must_use]
78 pub fn recv_sync(&self) -> Option<T> {
79 self.0.recv().ok()
80 }
81
82 #[must_use]
85 pub fn into_stream(self) -> RecvStream<'static, T> {
86 self.0.into_stream()
87 }
88
89 pub async fn forward<Transformer, Output>(
91 self,
92 sender: impl Into<Sender<Output>>,
93 transformer: Transformer,
94 ) where
95 Transformer: (Fn(T) -> Output) + 'static,
96 Output: 'static,
97 {
98 let sender = sender.into();
99 while let Some(event) = self.recv().await {
100 if sender.send(transformer(event)).is_err() {
101 return;
102 }
103 }
104 }
105}
106
107impl<T> fmt::Debug for Receiver<T> {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 f.debug_tuple("Receiver").field(&self.0).finish()
110 }
111}