relm4/shared_state/
async_reducer.rs

1use std::future::Future;
2use std::sync::{Arc, RwLock};
3
4use once_cell::sync::Lazy;
5
6use crate::{RUNTIME, Sender};
7
8use super::SubscriberFn;
9
10/// A trait that implements an async reducer function.
11///
12/// For more information, see [`AsyncReducer`].
13pub trait AsyncReducible: Send {
14    /// The input message type used to modify the data.
15    type Input: Send;
16
17    /// Initialize the data.
18    fn init() -> impl Future<Output = Self> + Send;
19
20    /// Process the input message and update the state asynchronously.
21    ///
22    /// Return [`true`] to notify all subscribers.
23    /// Return [`false`] to ignore all subscribers.
24    ///
25    /// For example, it makes sense to return [`false`] to indicate
26    /// that the message had no (noteworthy) effect on the data and
27    /// the subscribers don't need to be notified.
28    fn reduce(&mut self, input: Self::Input) -> impl Future<Output = bool> + Send;
29}
30
31struct AsyncReducerInner<Data: AsyncReducible> {
32    sender: Sender<Data::Input>,
33    subscribers: Arc<RwLock<Vec<SubscriberFn<Data>>>>,
34}
35
36impl<Data> Default for AsyncReducerInner<Data>
37where
38    Data: AsyncReducible + 'static,
39{
40    fn default() -> Self {
41        let (sender, receiver) = crate::channel();
42        let subscribers: Arc<RwLock<Vec<SubscriberFn<Data>>>> = Arc::default();
43
44        let rt_subscribers = subscribers.clone();
45        RUNTIME.spawn(async move {
46            let mut data = Data::init().await;
47            while let Some(input) = receiver.recv().await {
48                if data.reduce(input).await {
49                    rt_subscribers
50                        .write()
51                        .unwrap()
52                        .retain(|subscriber| subscriber(&data));
53                }
54            }
55        });
56
57        Self {
58            sender,
59            subscribers,
60        }
61    }
62}
63
64impl<Data> std::fmt::Debug for AsyncReducerInner<Data>
65where
66    Data: std::fmt::Debug + AsyncReducible,
67{
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct("AsyncReducerInner")
70            .field("sender", &self.sender)
71            .field("subscribers", &self.subscribers.try_read().map(|s| s.len()))
72            .finish()
73    }
74}
75
76/// A type that allows you to share information across your
77/// application easily with async operations.
78///
79/// Async reducers receive messages, update their state asynchronously,
80/// and notify their subscribers.
81///
82/// Unlike [`SharedState`](super::SharedState), this type doesn't
83/// allow direct access to the internal data.
84/// Instead, it updates its state after receiving messages, similar to components.
85/// After the message is processed, all subscribers will be notified.
86///
87/// This is the async variant of [`Reducer`](super::Reducer), where the
88/// `reduce` method can perform asynchronous operations.
89///
90/// # Example
91///
92/// ```
93/// use relm4::{AsyncReducer, AsyncReducible};
94///
95/// struct CounterReducer(u8);
96///
97/// enum CounterInput {
98///     Increment,
99///     Decrement,
100/// }
101///
102/// impl AsyncReducible for CounterReducer {
103///     type Input = CounterInput;
104///
105///     async fn init() -> Self {
106///         Self(0)
107///     }
108///
109///     async fn reduce(&mut self, input: Self::Input) -> bool {
110///         match input {
111///             CounterInput::Increment => {
112///                 self.0 += 1;
113///             }
114///             CounterInput::Decrement =>  {
115///                 self.0 -= 1;
116///             }
117///         }
118///         true
119///     }
120/// }
121///
122/// // Create the reducer.
123/// static REDUCER: AsyncReducer<CounterReducer> = AsyncReducer::new();
124///
125/// // Update the reducer.
126/// REDUCER.emit(CounterInput::Increment);
127/// # use std::time::Duration;
128/// # std::thread::sleep(Duration::from_millis(10));
129///
130/// // Create a channel and subscribe to changes.
131/// let (sender, receiver) = relm4::channel();
132/// REDUCER.subscribe(&sender, |data| data.0);
133///
134/// // Count up to 2.
135/// REDUCER.emit(CounterInput::Increment);
136/// assert_eq!(receiver.recv_sync().unwrap(), 2);
137/// ```
138#[derive(Debug)]
139pub struct AsyncReducer<Data: AsyncReducible> {
140    inner: Lazy<AsyncReducerInner<Data>>,
141}
142
143impl<Data> Default for AsyncReducer<Data>
144where
145    Data: AsyncReducible + 'static,
146{
147    fn default() -> Self {
148        Self::new()
149    }
150}
151
152impl<Data> AsyncReducer<Data>
153where
154    Data: AsyncReducible + 'static,
155{
156    /// Create a new [`AsyncReducer`] variable.
157    ///
158    /// The data will be initialized lazily on the first access.
159    #[must_use]
160    pub const fn new() -> Self {
161        Self {
162            inner: Lazy::new(AsyncReducerInner::default),
163        }
164    }
165
166    /// Subscribe to an [`AsyncReducer`].
167    /// Any subscriber will be notified with a message every time
168    /// you modify the reducer (by calling [`Self::emit()`]).
169    pub fn subscribe<Msg, F>(&self, sender: &Sender<Msg>, f: F)
170    where
171        F: Fn(&Data) -> Msg + 'static + Send + Sync,
172        Msg: Send + 'static,
173    {
174        let sender = sender.clone();
175        self.inner
176            .subscribers
177            .write()
178            .unwrap()
179            .push(Box::new(move |data: &Data| {
180                let msg = f(data);
181                sender.send(msg).is_ok()
182            }));
183    }
184
185    /// An alternative version of [`subscribe()`](Self::subscribe()) that only send a message if
186    /// the closure returns [`Some`].
187    pub fn subscribe_optional<Msg, F>(&self, sender: &Sender<Msg>, f: F)
188    where
189        F: Fn(&Data) -> Option<Msg> + 'static + Send + Sync,
190        Msg: Send + 'static,
191    {
192        let sender = sender.clone();
193        self.inner
194            .subscribers
195            .write()
196            .unwrap()
197            .push(Box::new(move |data: &Data| {
198                if let Some(msg) = f(data) {
199                    sender.send(msg).is_ok()
200                } else {
201                    true
202                }
203            }));
204    }
205
206    /// Sends a message to the reducer to update its state asynchronously.
207    ///
208    /// If the [`AsyncReducible::reduce()`] method returns [`true`],
209    /// all subscribers will be notified.
210    pub fn emit(&self, input: Data::Input) {
211        assert!(
212            self.inner.sender.send(input).is_ok(),
213            "AsyncReducer runtime was dropped. Maybe a subscriber or the update function panicked?"
214        );
215    }
216}
217
218#[cfg(test)]
219mod test {
220    use std::time::Duration;
221
222    use super::{AsyncReducer, AsyncReducible};
223
224    struct CounterReducer(u8);
225
226    enum CounterInput {
227        Increment,
228        Decrement,
229    }
230
231    impl AsyncReducible for CounterReducer {
232        type Input = CounterInput;
233
234        async fn init() -> Self {
235            Self(0)
236        }
237
238        async fn reduce(&mut self, input: Self::Input) -> bool {
239            match input {
240                CounterInput::Increment => {
241                    self.0 += 1;
242                }
243                CounterInput::Decrement => {
244                    self.0 -= 1;
245                }
246            }
247            true
248        }
249    }
250
251    static REDUCER: AsyncReducer<CounterReducer> = AsyncReducer::new();
252
253    #[test]
254    fn shared_state() {
255        REDUCER.emit(CounterInput::Increment);
256        REDUCER.emit(CounterInput::Increment);
257        REDUCER.emit(CounterInput::Increment);
258        std::thread::sleep(Duration::from_millis(10));
259
260        let (sender, receiver) = crate::channel();
261
262        REDUCER.subscribe(&sender, |data| data.0);
263
264        REDUCER.emit(CounterInput::Increment);
265        assert_eq!(receiver.recv_sync().unwrap(), 4);
266
267        REDUCER.emit(CounterInput::Decrement);
268
269        assert_eq!(receiver.recv_sync().unwrap(), 3);
270    }
271}