relm4/channel/
mod.rs

1mod component;
2/// Cancellation mechanism used by Relm4.
3pub mod shutdown;
4
5pub use component::{AsyncComponentSender, AsyncFactorySender, ComponentSender, FactorySender};
6
7// Copyright 2022 System76 <info@system76.com>
8// SPDX-License-Identifier: MIT or Apache-2.0
9
10use std::fmt;
11
12use flume::r#async::RecvStream;
13
14/// Create an unbounded channel to send messages
15/// between different parts of you application.
16#[must_use]
17pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
18    let (tx, rx) = flume::unbounded();
19    (Sender(tx), Receiver(rx))
20}
21
22/// A Relm4 sender sends messages to a component or worker.
23pub struct Sender<T>(pub(crate) flume::Sender<T>);
24
25impl<T> From<flume::Sender<T>> for Sender<T> {
26    fn from(sender: flume::Sender<T>) -> Self {
27        Self(sender)
28    }
29}
30
31impl<T> Sender<T> {
32    /// Sends a message through the channel.
33    ///
34    /// **This method ignores errors.**
35    /// Only a log message will appear when sending fails.
36    pub fn emit(&self, message: T) {
37        if self.send(message).is_err() {
38            tracing::warn!("Receiver was dropped");
39        }
40    }
41
42    /// Sends a message through the channel.
43    ///
44    /// If all receivers where dropped, [`Err`] is returned
45    /// with the content of the message.
46    pub fn send(&self, message: T) -> Result<(), T> {
47        self.0.send(message).map_err(|e| e.into_inner())
48    }
49}
50
51impl<T> Clone for Sender<T> {
52    fn clone(&self) -> Self {
53        Self(self.0.clone())
54    }
55}
56
57impl<T> fmt::Debug for Sender<T> {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        f.debug_tuple("Sender").field(&self.0).finish()
60    }
61}
62
63/// A Relm4 receiver receives messages from a component or worker.
64pub struct Receiver<T>(pub(crate) flume::Receiver<T>);
65
66impl<T> Receiver<T> {
67    /// Receives a message from a component or worker.
68    ///
69    /// Returns [`None`] if all senders have been disconnected.
70    pub async fn recv(&self) -> Option<T> {
71        self.0.recv_async().await.ok()
72    }
73
74    /// Receives a message synchronously from a component or worker.
75    ///
76    /// Returns [`None`] if all senders have been disconnected.
77    #[must_use]
78    pub fn recv_sync(&self) -> Option<T> {
79        self.0.recv().ok()
80    }
81
82    /// Convert this receiver into a stream that asynchronously yields
83    /// messages from the channel.
84    #[must_use]
85    pub fn into_stream(self) -> RecvStream<'static, T> {
86        self.0.into_stream()
87    }
88
89    /// Forwards an event from one channel to another.
90    pub async fn forward<Transformer, Output>(
91        self,
92        sender: impl Into<Sender<Output>>,
93        transformer: Transformer,
94    ) where
95        Transformer: (Fn(T) -> Output) + 'static,
96        Output: 'static,
97    {
98        let sender = sender.into();
99        while let Some(event) = self.recv().await {
100            if sender.send(transformer(event)).is_err() {
101                return;
102            }
103        }
104    }
105}
106
107impl<T> fmt::Debug for Receiver<T> {
108    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109        f.debug_tuple("Receiver").field(&self.0).finish()
110    }
111}