relm4/component/
worker.rs

1// Copyright 2021-2022 Aaron Erhardt <aaron.erhardt@t-online.de>
2// Copyright 2022 System76 <info@system76.com>
3// SPDX-License-Identifier: MIT or Apache-2.0
4
5use gtk::glib;
6use tracing::info_span;
7
8use crate::{
9    Component, ComponentBuilder, ComponentParts, ComponentSender, GuardedReceiver, Receiver,
10    RuntimeSenders, Sender, ShutdownOnDrop, SimpleComponent,
11};
12use std::fmt::Debug;
13use std::{any, thread};
14
15/// Receives inputs and outputs in the background.
16///
17/// All types that implement [`Worker`] will also implement
18/// [`Component`] automatically.
19///
20/// If you need more flexibility when using workers, you can
21/// simply implement [`Component`] instead and set the [`Component::Widgets`]
22/// and [`Component::Root`] types both to `()`.
23/// This will still allow you to use all worker related methods because internally
24/// a worker is just seen as a [`Component`] without widgets.
25pub trait Worker: Sized + Send + 'static {
26    /// The initial parameters that will be used to build the worker state.
27    type Init: 'static + Send;
28    /// The type of inputs that this worker shall receive.
29    type Input: 'static + Send + Debug;
30    /// The type of outputs that this worker shall send.
31    type Output: 'static + Send + Debug;
32
33    /// Defines the initial state of the worker.
34    fn init(init: Self::Init, sender: ComponentSender<Self>) -> Self;
35
36    /// Defines how inputs will bep processed
37    fn update(&mut self, message: Self::Input, sender: ComponentSender<Self>);
38}
39
40impl<T> SimpleComponent for T
41where
42    T: Worker + 'static,
43{
44    type Root = ();
45    type Widgets = ();
46
47    type Init = <Self as Worker>::Init;
48    type Input = <Self as Worker>::Input;
49    type Output = <Self as Worker>::Output;
50
51    fn init_root() -> Self::Root {}
52
53    fn init(
54        init: Self::Init,
55        _root: Self::Root,
56        sender: ComponentSender<Self>,
57    ) -> ComponentParts<Self> {
58        let model = Self::init(init, sender);
59        ComponentParts { model, widgets: () }
60    }
61
62    fn update(&mut self, message: Self::Input, sender: ComponentSender<Self>) {
63        Self::update(self, message, sender);
64    }
65}
66
67impl<C> ComponentBuilder<C>
68where
69    C: Component<Root = (), Widgets = ()> + Send,
70    C::Input: Send,
71    C::Output: Send,
72    C::CommandOutput: Send,
73{
74    /// Starts a worker on a separate thread,
75    /// passing ownership to a future attached to a [gtk::glib::MainContext].
76    pub fn detach_worker(self, payload: C::Init) -> WorkerHandle<C> {
77        let Self { root, .. } = self;
78
79        // Used for all events to be processed by this component's internal service.
80        let (input_sender, input_receiver) = crate::channel::<C::Input>();
81
82        let RuntimeSenders {
83            output_sender,
84            output_receiver,
85            cmd_sender,
86            cmd_receiver,
87            shutdown_notifier,
88            shutdown_recipient,
89            shutdown_on_drop,
90            mut shutdown_event,
91        } = RuntimeSenders::<C::Output, C::CommandOutput>::new();
92
93        // Encapsulates the senders used by component methods.
94        let component_sender = ComponentSender::new(
95            input_sender.clone(),
96            output_sender.clone(),
97            cmd_sender,
98            shutdown_recipient,
99        );
100
101        let mut state = C::init(payload, root, component_sender.clone());
102
103        thread::spawn(move || {
104            let context = glib::MainContext::thread_default().unwrap_or_default();
105
106            // Spawns the component's service. It will receive both `Self::Input` and
107            // `Self::CommandOutput` messages. It will spawn commands as requested by
108            // updates, and send `Self::Output` messages externally.
109            context.block_on(async move {
110                let mut cmd = GuardedReceiver::new(cmd_receiver);
111                let mut input = GuardedReceiver::new(input_receiver);
112
113                loop {
114                    futures::select!(
115                        // Performs the model update, checking if the update requested a command.
116                        // Runs that command asynchronously in the background using tokio.
117                        message = input => {
118                            let ComponentParts {
119                                model,
120                                widgets,
121                            } = &mut state;
122
123                            let span = info_span!(
124                                "update_with_view",
125                                input=?message,
126                                component=any::type_name::<C>(),
127                                id=model.id(),
128                            );
129                            let _enter = span.enter();
130
131                            model.update_with_view(widgets, message, component_sender.clone(), &root);
132                        }
133
134                        // Handles responses from a command.
135                        message = cmd => {
136                            let ComponentParts {
137                                model,
138                                widgets,
139                            } = &mut state;
140
141                            let span = info_span!(
142                                "update_cmd_with_view",
143                                cmd_output=?message,
144                                component=any::type_name::<C>(),
145                                id=model.id(),
146                            );
147                            let _enter = span.enter();
148
149                            model.update_cmd_with_view(widgets, message, component_sender.clone(), &root);
150                        },
151
152                        // Triggered when the component is destroyed
153                        _ = shutdown_event => {
154                            let ComponentParts {
155                                model,
156                                widgets,
157                            } = &mut state;
158
159                            model.shutdown(widgets, output_sender);
160
161                            shutdown_notifier.shutdown();
162
163                            return;
164                        }
165                    );
166                }
167            });
168        });
169
170        // Give back a type for controlling the component service.
171        WorkerHandle {
172            sender: input_sender,
173            receiver: output_receiver,
174            shutdown_on_drop,
175        }
176    }
177}
178
179#[derive(Debug)]
180/// Handle to a worker task in the background
181pub struct WorkerHandle<W: Component> {
182    // Sends inputs to the worker.
183    sender: Sender<W::Input>,
184    // Where the worker will send its outputs to.
185    receiver: Receiver<W::Output>,
186    // Shutdown the worker when this is dropped
187    shutdown_on_drop: ShutdownOnDrop,
188}
189
190impl<W: Component> WorkerHandle<W>
191where
192    W::Input: 'static,
193    W::Output: 'static,
194{
195    /// Given a mutable closure, captures the receiver for handling.
196    pub fn connect_receiver<F: FnMut(&mut Sender<W::Input>, W::Output) + 'static>(
197        self,
198        mut func: F,
199    ) -> WorkerController<W> {
200        let Self {
201            sender,
202            receiver,
203            shutdown_on_drop,
204        } = self;
205
206        let mut sender_ = sender.clone();
207        crate::spawn_local(async move {
208            while let Some(event) = receiver.recv().await {
209                func(&mut sender_, event);
210            }
211        });
212
213        WorkerController {
214            sender,
215            shutdown_on_drop,
216        }
217    }
218
219    /// Forwards output events to the designated sender.
220    pub fn forward<X: 'static, F: (Fn(W::Output) -> X) + 'static>(
221        self,
222        sender: &Sender<X>,
223        transform: F,
224    ) -> WorkerController<W> {
225        let Self {
226            sender: own_sender,
227            receiver,
228            shutdown_on_drop,
229        } = self;
230
231        crate::spawn_local(receiver.forward(sender.clone(), transform));
232        WorkerController {
233            sender: own_sender,
234            shutdown_on_drop,
235        }
236    }
237
238    /// Ignore outputs from the component and finish the builder.
239    #[must_use]
240    pub fn detach(self) -> WorkerController<W> {
241        let Self {
242            sender,
243            shutdown_on_drop,
244            ..
245        } = self;
246
247        WorkerController {
248            sender,
249            shutdown_on_drop,
250        }
251    }
252}
253
254/// Sends inputs to a worker. On drop, shuts down the worker.
255#[derive(Debug)]
256pub struct WorkerController<W: Component> {
257    // Sends inputs to the worker.
258    sender: Sender<W::Input>,
259    // Shutdown the worker when this is dropped
260    shutdown_on_drop: ShutdownOnDrop,
261}
262
263impl<W: Component> WorkerController<W> {
264    /// Emits an input to the component.
265    pub fn emit(&self, event: W::Input) {
266        self.sender.send(event).unwrap();
267    }
268
269    /// Provides access to the component's sender.
270    #[must_use]
271    pub const fn sender(&self) -> &Sender<W::Input> {
272        &self.sender
273    }
274
275    /// Dropping this type will usually stop the runtime of the worker.
276    /// With this method you can give the runtime a static lifetime.
277    /// In other words, dropping the [`WorkerController`] will not stop
278    /// the runtime anymore, it will run until the app is closed.
279    pub fn detach_runtime(&mut self) {
280        self.shutdown_on_drop.deactivate();
281    }
282}