relm4/shared_state/async_reducer.rs
1use std::future::Future;
2use std::sync::{Arc, RwLock};
3
4use once_cell::sync::Lazy;
5
6use crate::{RUNTIME, Sender};
7
8use super::SubscriberFn;
9
10/// A trait that implements an async reducer function.
11///
12/// For more information, see [`AsyncReducer`].
13pub trait AsyncReducible: Send {
14 /// The input message type used to modify the data.
15 type Input: Send;
16
17 /// Initialize the data.
18 fn init() -> impl Future<Output = Self> + Send;
19
20 /// Process the input message and update the state asynchronously.
21 ///
22 /// Return [`true`] to notify all subscribers.
23 /// Return [`false`] to ignore all subscribers.
24 ///
25 /// For example, it makes sense to return [`false`] to indicate
26 /// that the message had no (noteworthy) effect on the data and
27 /// the subscribers don't need to be notified.
28 fn reduce(&mut self, input: Self::Input) -> impl Future<Output = bool> + Send;
29}
30
31struct AsyncReducerInner<Data: AsyncReducible> {
32 sender: Sender<Data::Input>,
33 subscribers: Arc<RwLock<Vec<SubscriberFn<Data>>>>,
34}
35
36impl<Data> Default for AsyncReducerInner<Data>
37where
38 Data: AsyncReducible + 'static,
39{
40 fn default() -> Self {
41 let (sender, receiver) = crate::channel();
42 let subscribers: Arc<RwLock<Vec<SubscriberFn<Data>>>> = Arc::default();
43
44 let rt_subscribers = subscribers.clone();
45 RUNTIME.spawn(async move {
46 let mut data = Data::init().await;
47 while let Some(input) = receiver.recv().await {
48 if data.reduce(input).await {
49 rt_subscribers
50 .write()
51 .unwrap()
52 .retain(|subscriber| subscriber(&data));
53 }
54 }
55 });
56
57 Self {
58 sender,
59 subscribers,
60 }
61 }
62}
63
64impl<Data> std::fmt::Debug for AsyncReducerInner<Data>
65where
66 Data: std::fmt::Debug + AsyncReducible,
67{
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 f.debug_struct("AsyncReducerInner")
70 .field("sender", &self.sender)
71 .field("subscribers", &self.subscribers.try_read().map(|s| s.len()))
72 .finish()
73 }
74}
75
76/// A type that allows you to share information across your
77/// application easily with async operations.
78///
79/// Async reducers receive messages, update their state asynchronously,
80/// and notify their subscribers.
81///
82/// Unlike [`SharedState`](super::SharedState), this type doesn't
83/// allow direct access to the internal data.
84/// Instead, it updates its state after receiving messages, similar to components.
85/// After the message is processed, all subscribers will be notified.
86///
87/// This is the async variant of [`Reducer`](super::Reducer), where the
88/// `reduce` method can perform asynchronous operations.
89///
90/// # Example
91///
92/// ```
93/// use relm4::{AsyncReducer, AsyncReducible};
94///
95/// struct CounterReducer(u8);
96///
97/// enum CounterInput {
98/// Increment,
99/// Decrement,
100/// }
101///
102/// impl AsyncReducible for CounterReducer {
103/// type Input = CounterInput;
104///
105/// async fn init() -> Self {
106/// Self(0)
107/// }
108///
109/// async fn reduce(&mut self, input: Self::Input) -> bool {
110/// match input {
111/// CounterInput::Increment => {
112/// self.0 += 1;
113/// }
114/// CounterInput::Decrement => {
115/// self.0 -= 1;
116/// }
117/// }
118/// true
119/// }
120/// }
121///
122/// // Create the reducer.
123/// static REDUCER: AsyncReducer<CounterReducer> = AsyncReducer::new();
124///
125/// // Update the reducer.
126/// REDUCER.emit(CounterInput::Increment);
127/// # use std::time::Duration;
128/// # std::thread::sleep(Duration::from_millis(10));
129///
130/// // Create a channel and subscribe to changes.
131/// let (sender, receiver) = relm4::channel();
132/// REDUCER.subscribe(&sender, |data| data.0);
133///
134/// // Count up to 2.
135/// REDUCER.emit(CounterInput::Increment);
136/// assert_eq!(receiver.recv_sync().unwrap(), 2);
137/// ```
138#[derive(Debug)]
139pub struct AsyncReducer<Data: AsyncReducible> {
140 inner: Lazy<AsyncReducerInner<Data>>,
141}
142
143impl<Data> Default for AsyncReducer<Data>
144where
145 Data: AsyncReducible + 'static,
146{
147 fn default() -> Self {
148 Self::new()
149 }
150}
151
152impl<Data> AsyncReducer<Data>
153where
154 Data: AsyncReducible + 'static,
155{
156 /// Create a new [`AsyncReducer`] variable.
157 ///
158 /// The data will be initialized lazily on the first access.
159 #[must_use]
160 pub const fn new() -> Self {
161 Self {
162 inner: Lazy::new(AsyncReducerInner::default),
163 }
164 }
165
166 /// Subscribe to an [`AsyncReducer`].
167 /// Any subscriber will be notified with a message every time
168 /// you modify the reducer (by calling [`Self::emit()`]).
169 pub fn subscribe<Msg, F>(&self, sender: &Sender<Msg>, f: F)
170 where
171 F: Fn(&Data) -> Msg + 'static + Send + Sync,
172 Msg: Send + 'static,
173 {
174 let sender = sender.clone();
175 self.inner
176 .subscribers
177 .write()
178 .unwrap()
179 .push(Box::new(move |data: &Data| {
180 let msg = f(data);
181 sender.send(msg).is_ok()
182 }));
183 }
184
185 /// An alternative version of [`subscribe()`](Self::subscribe()) that only send a message if
186 /// the closure returns [`Some`].
187 pub fn subscribe_optional<Msg, F>(&self, sender: &Sender<Msg>, f: F)
188 where
189 F: Fn(&Data) -> Option<Msg> + 'static + Send + Sync,
190 Msg: Send + 'static,
191 {
192 let sender = sender.clone();
193 self.inner
194 .subscribers
195 .write()
196 .unwrap()
197 .push(Box::new(move |data: &Data| {
198 if let Some(msg) = f(data) {
199 sender.send(msg).is_ok()
200 } else {
201 true
202 }
203 }));
204 }
205
206 /// Sends a message to the reducer to update its state asynchronously.
207 ///
208 /// If the [`AsyncReducible::reduce()`] method returns [`true`],
209 /// all subscribers will be notified.
210 pub fn emit(&self, input: Data::Input) {
211 assert!(
212 self.inner.sender.send(input).is_ok(),
213 "AsyncReducer runtime was dropped. Maybe a subscriber or the update function panicked?"
214 );
215 }
216}
217
218#[cfg(test)]
219mod test {
220 use std::time::Duration;
221
222 use super::{AsyncReducer, AsyncReducible};
223
224 struct CounterReducer(u8);
225
226 enum CounterInput {
227 Increment,
228 Decrement,
229 }
230
231 impl AsyncReducible for CounterReducer {
232 type Input = CounterInput;
233
234 async fn init() -> Self {
235 Self(0)
236 }
237
238 async fn reduce(&mut self, input: Self::Input) -> bool {
239 match input {
240 CounterInput::Increment => {
241 self.0 += 1;
242 }
243 CounterInput::Decrement => {
244 self.0 -= 1;
245 }
246 }
247 true
248 }
249 }
250
251 static REDUCER: AsyncReducer<CounterReducer> = AsyncReducer::new();
252
253 #[test]
254 fn shared_state() {
255 REDUCER.emit(CounterInput::Increment);
256 REDUCER.emit(CounterInput::Increment);
257 REDUCER.emit(CounterInput::Increment);
258 std::thread::sleep(Duration::from_millis(10));
259
260 let (sender, receiver) = crate::channel();
261
262 REDUCER.subscribe(&sender, |data| data.0);
263
264 REDUCER.emit(CounterInput::Increment);
265 assert_eq!(receiver.recv_sync().unwrap(), 4);
266
267 REDUCER.emit(CounterInput::Decrement);
268
269 assert_eq!(receiver.recv_sync().unwrap(), 3);
270 }
271}