1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
mod component;
/// Cancellation mechanism used by Relm4.
pub mod shutdown;

pub use component::{AsyncComponentSender, AsyncFactorySender, ComponentSender, FactorySender};

// Copyright 2022 System76 <info@system76.com>
// SPDX-License-Identifier: MIT or Apache-2.0

use std::fmt;

use flume::r#async::RecvStream;

/// Create an unbounded channel to send messages
/// between different parts of you application.
#[must_use]
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
    let (tx, rx) = flume::unbounded();
    (Sender(tx), Receiver(rx))
}

/// A Relm4 sender sends messages to a component or worker.
pub struct Sender<T>(pub(crate) flume::Sender<T>);

impl<T> From<flume::Sender<T>> for Sender<T> {
    fn from(sender: flume::Sender<T>) -> Self {
        Self(sender)
    }
}

impl<T> Sender<T> {
    /// Sends a message through the channel.
    ///
    /// **This method ignores errors.**
    /// Only a log message will appear when sending fails.
    pub fn emit(&self, message: T) {
        if self.send(message).is_err() {
            tracing::warn!("Receiver was dropped");
        }
    }

    /// Sends a message through the channel.
    ///
    /// If all receivers where dropped, [`Err`] is returned
    /// with the content of the message.
    pub fn send(&self, message: T) -> Result<(), T> {
        self.0.send(message).map_err(|e| e.into_inner())
    }
}

impl<T> Clone for Sender<T> {
    fn clone(&self) -> Self {
        Self(self.0.clone())
    }
}

impl<T> fmt::Debug for Sender<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_tuple("Sender").field(&self.0).finish()
    }
}

/// A Relm4 receiver receives messages from a component or worker.
pub struct Receiver<T>(pub(crate) flume::Receiver<T>);

impl<T> Receiver<T> {
    /// Receives a message from a component or worker.
    ///
    /// Returns [`None`] if all senders have been disconnected.
    pub async fn recv(&self) -> Option<T> {
        self.0.recv_async().await.ok()
    }

    /// Receives a message synchronously from a component or worker.
    ///
    /// Returns [`None`] if all senders have been disconnected.
    #[must_use]
    pub fn recv_sync(&self) -> Option<T> {
        self.0.recv().ok()
    }

    #[must_use]
    pub(crate) fn into_stream(self) -> RecvStream<'static, T> {
        self.0.into_stream()
    }

    /// Forwards an event from one channel to another.
    pub async fn forward<Transformer, Output>(
        self,
        sender: impl Into<Sender<Output>>,
        transformer: Transformer,
    ) where
        Transformer: (Fn(T) -> Output) + 'static,
        Output: 'static,
    {
        let sender = sender.into();
        while let Some(event) = self.recv().await {
            if sender.send(transformer(event)).is_err() {
                return;
            }
        }
    }
}

impl<T> fmt::Debug for Receiver<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_tuple("Receiver").field(&self.0).finish()
    }
}