iced_futures/
subscription.rs

1//! Listen to external events in your application.
2mod tracker;
3
4pub use tracker::Tracker;
5
6use crate::core::event;
7use crate::core::theme;
8use crate::core::window;
9use crate::futures::Stream;
10use crate::{BoxStream, MaybeSend};
11
12use std::any::TypeId;
13use std::hash::Hash;
14
15/// A subscription event.
16#[derive(Debug, Clone, PartialEq)]
17pub enum Event {
18    /// A user interacted with a user interface in a window.
19    Interaction {
20        /// The window holding the interface of the interaction.
21        window: window::Id,
22        /// The [`Event`] describing the interaction.
23        ///
24        /// [`Event`]: event::Event
25        event: event::Event,
26
27        /// The [`event::Status`] of the interaction.
28        status: event::Status,
29    },
30
31    /// The system theme has changed.
32    SystemThemeChanged(theme::Mode),
33}
34
35/// A stream of runtime events.
36///
37/// It is the input of a [`Subscription`].
38pub type EventStream = BoxStream<Event>;
39
40/// The hasher used for identifying subscriptions.
41pub type Hasher = rustc_hash::FxHasher;
42
43/// A request to listen to external events.
44///
45/// Besides performing async actions on demand with `Task`, most
46/// applications also need to listen to external events passively.
47///
48/// A [`Subscription`] is normally provided to some runtime, like a `Task`,
49/// and it will generate events as long as the user keeps requesting it.
50///
51/// For instance, you can use a [`Subscription`] to listen to a `WebSocket`
52/// connection, keyboard presses, mouse events, time ticks, etc.
53///
54/// # The Lifetime of a [`Subscription`]
55/// Much like a [`Future`] or a [`Stream`], a [`Subscription`] does not produce any effects
56/// on its own. For a [`Subscription`] to run, it must be returned to the iced runtime—normally
57/// in the `subscription` function of an `application` or a `daemon`.
58///
59/// When a [`Subscription`] is provided to the runtime for the first time, the runtime will
60/// start running it asynchronously. Running a [`Subscription`] consists in building its underlying
61/// [`Stream`] and executing it in an async runtime.
62///
63/// Therefore, you can think of a [`Subscription`] as a "stream builder". It simply represents a way
64/// to build a certain [`Stream`] together with some way to _identify_ it.
65///
66/// Identification is important because when a specific [`Subscription`] stops being returned to the
67/// iced runtime, the runtime will kill its associated [`Stream`]. The runtime uses the identity of a
68/// [`Subscription`] to keep track of it.
69///
70/// This way, iced allows you to declaratively __subscribe__ to particular streams of data temporarily
71/// and whenever necessary.
72///
73/// ```
74/// # mod iced {
75/// #     pub mod time {
76/// #         pub use iced_futures::backend::default::time::every;
77/// #         pub use std::time::{Duration, Instant};
78/// #     }
79/// #
80/// #     pub use iced_futures::Subscription;
81/// # }
82/// use iced::time::{self, Duration, Instant};
83/// use iced::Subscription;
84///
85/// struct State {
86///     timer_enabled: bool,
87/// }
88///
89/// fn subscription(state: &State) -> Subscription<Instant> {
90///     if state.timer_enabled {
91///         time::every(Duration::from_secs(1))
92///     } else {
93///         Subscription::none()
94///     }
95/// }
96/// ```
97///
98/// [`Future`]: std::future::Future
99#[must_use = "`Subscription` must be returned to the runtime to take effect; normally in your `subscription` function."]
100pub struct Subscription<T> {
101    recipes: Vec<Box<dyn Recipe<Output = T>>>,
102}
103
104impl<T> Subscription<T> {
105    /// Returns an empty [`Subscription`] that will not produce any output.
106    pub fn none() -> Self {
107        Self {
108            recipes: Vec::new(),
109        }
110    }
111
112    /// Returns a [`Subscription`] that will call the given function to create and
113    /// asynchronously run the given [`Stream`].
114    ///
115    /// # Creating an asynchronous worker with bidirectional communication
116    /// You can leverage this helper to create a [`Subscription`] that spawns
117    /// an asynchronous worker in the background and establish a channel of
118    /// communication with an `iced` application.
119    ///
120    /// You can achieve this by creating an `mpsc` channel inside the closure
121    /// and returning the `Sender` as a `Message` for the `Application`:
122    ///
123    /// ```
124    /// # mod iced {
125    /// #     pub use iced_futures::Subscription;   
126    /// #     pub use iced_futures::futures;
127    /// #     pub use iced_futures::stream;
128    /// # }
129    /// use iced::futures::channel::mpsc;
130    /// use iced::futures::sink::SinkExt;
131    /// use iced::futures::Stream;
132    /// use iced::stream;
133    /// use iced::Subscription;
134    ///
135    /// pub enum Event {
136    ///     Ready(mpsc::Sender<Input>),
137    ///     WorkFinished,
138    ///     // ...
139    /// }
140    ///
141    /// enum Input {
142    ///     DoSomeWork,
143    ///     // ...
144    /// }
145    ///
146    /// fn some_worker() -> impl Stream<Item = Event> {
147    ///     stream::channel(100, async |mut output| {
148    ///         // Create channel
149    ///         let (sender, mut receiver) = mpsc::channel(100);
150    ///
151    ///         // Send the sender back to the application
152    ///         output.send(Event::Ready(sender)).await;
153    ///
154    ///         loop {
155    ///             use iced_futures::futures::StreamExt;
156    ///
157    ///             // Read next input sent from `Application`
158    ///             let input = receiver.select_next_some().await;
159    ///
160    ///             match input {
161    ///                 Input::DoSomeWork => {
162    ///                     // Do some async work...
163    ///
164    ///                     // Finally, we can optionally produce a message to tell the
165    ///                     // `Application` the work is done
166    ///                     output.send(Event::WorkFinished).await;
167    ///                 }
168    ///             }
169    ///         }
170    ///     })
171    /// }
172    ///
173    /// fn subscription() -> Subscription<Event> {
174    ///     Subscription::run(some_worker)
175    /// }
176    /// ```
177    ///
178    /// Check out the [`websocket`] example, which showcases this pattern to maintain a `WebSocket`
179    /// connection open.
180    ///
181    /// [`websocket`]: https://github.com/iced-rs/iced/tree/0.14/examples/websocket
182    pub fn run<S>(builder: fn() -> S) -> Self
183    where
184        S: Stream<Item = T> + MaybeSend + 'static,
185        T: 'static,
186    {
187        from_recipe(Runner {
188            data: builder,
189            spawn: |builder, _| builder(),
190        })
191    }
192
193    /// Returns a [`Subscription`] that will create and asynchronously run the
194    /// given [`Stream`].
195    ///
196    /// Both the `data` and the function pointer will be used to uniquely identify
197    /// the [`Subscription`].
198    pub fn run_with<D, S>(data: D, builder: fn(&D) -> S) -> Self
199    where
200        D: Hash + 'static,
201        S: Stream<Item = T> + MaybeSend + 'static,
202        T: 'static,
203    {
204        from_recipe(Runner {
205            data: (data, builder),
206            spawn: |(data, builder), _| builder(data),
207        })
208    }
209
210    /// Batches all the provided subscriptions and returns the resulting
211    /// [`Subscription`].
212    pub fn batch(
213        subscriptions: impl IntoIterator<Item = Subscription<T>>,
214    ) -> Self {
215        Self {
216            recipes: subscriptions
217                .into_iter()
218                .flat_map(|subscription| subscription.recipes)
219                .collect(),
220        }
221    }
222
223    /// Adds a value to the [`Subscription`] context.
224    ///
225    /// The value will be part of the identity of a [`Subscription`].
226    pub fn with<A>(self, value: A) -> Subscription<(A, T)>
227    where
228        T: 'static,
229        A: std::hash::Hash + Clone + Send + Sync + 'static,
230    {
231        struct With<A, B> {
232            recipe: Box<dyn Recipe<Output = A>>,
233            value: B,
234        }
235
236        impl<A, B> Recipe for With<A, B>
237        where
238            A: 'static,
239            B: 'static + std::hash::Hash + Clone + Send + Sync,
240        {
241            type Output = (B, A);
242
243            fn hash(&self, state: &mut Hasher) {
244                std::any::TypeId::of::<B>().hash(state);
245                self.value.hash(state);
246                self.recipe.hash(state);
247            }
248
249            fn stream(
250                self: Box<Self>,
251                input: EventStream,
252            ) -> BoxStream<Self::Output> {
253                use futures::StreamExt;
254
255                let value = self.value;
256
257                Box::pin(
258                    self.recipe
259                        .stream(input)
260                        .map(move |element| (value.clone(), element)),
261                )
262            }
263        }
264
265        Subscription {
266            recipes: self
267                .recipes
268                .into_iter()
269                .map(|recipe| {
270                    Box::new(With {
271                        recipe,
272                        value: value.clone(),
273                    }) as Box<dyn Recipe<Output = (A, T)>>
274                })
275                .collect(),
276        }
277    }
278
279    /// Transforms the [`Subscription`] output with the given function.
280    ///
281    /// The closure provided must be a non-capturing closure.
282    pub fn map<F, A>(self, f: F) -> Subscription<A>
283    where
284        T: 'static,
285        F: Fn(T) -> A + MaybeSend + Clone + 'static,
286        A: 'static,
287    {
288        const {
289            check_zero_sized::<F>();
290        }
291
292        struct Map<A, B, F>
293        where
294            F: Fn(A) -> B + 'static,
295        {
296            recipe: Box<dyn Recipe<Output = A>>,
297            mapper: F,
298        }
299
300        impl<A, B, F> Recipe for Map<A, B, F>
301        where
302            A: 'static,
303            B: 'static,
304            F: Fn(A) -> B + 'static + MaybeSend,
305        {
306            type Output = B;
307
308            fn hash(&self, state: &mut Hasher) {
309                TypeId::of::<F>().hash(state);
310                self.recipe.hash(state);
311            }
312
313            fn stream(
314                self: Box<Self>,
315                input: EventStream,
316            ) -> BoxStream<Self::Output> {
317                use futures::StreamExt;
318
319                Box::pin(self.recipe.stream(input).map(self.mapper))
320            }
321        }
322
323        Subscription {
324            recipes: self
325                .recipes
326                .into_iter()
327                .map(|recipe| {
328                    Box::new(Map {
329                        recipe,
330                        mapper: f.clone(),
331                    }) as Box<dyn Recipe<Output = A>>
332                })
333                .collect(),
334        }
335    }
336
337    /// Transforms the [`Subscription`] output with the given function, yielding only
338    /// values only when the function returns `Some(A)`.
339    ///
340    /// The closure provided must be a non-capturing closure.
341    pub fn filter_map<F, A>(mut self, f: F) -> Subscription<A>
342    where
343        T: MaybeSend + 'static,
344        F: Fn(T) -> Option<A> + MaybeSend + Clone + 'static,
345        A: MaybeSend + 'static,
346    {
347        const {
348            check_zero_sized::<F>();
349        }
350
351        struct FilterMap<A, B, F>
352        where
353            F: Fn(A) -> Option<B> + 'static,
354        {
355            recipe: Box<dyn Recipe<Output = A>>,
356            mapper: F,
357        }
358
359        impl<A, B, F> Recipe for FilterMap<A, B, F>
360        where
361            A: 'static,
362            B: 'static + MaybeSend,
363            F: Fn(A) -> Option<B> + MaybeSend,
364        {
365            type Output = B;
366
367            fn hash(&self, state: &mut Hasher) {
368                TypeId::of::<F>().hash(state);
369                self.recipe.hash(state);
370            }
371
372            fn stream(
373                self: Box<Self>,
374                input: EventStream,
375            ) -> BoxStream<Self::Output> {
376                use futures::StreamExt;
377                use futures::future;
378
379                let mapper = self.mapper;
380
381                Box::pin(
382                    self.recipe
383                        .stream(input)
384                        .filter_map(move |a| future::ready(mapper(a))),
385                )
386            }
387        }
388
389        Subscription {
390            recipes: self
391                .recipes
392                .drain(..)
393                .map(|recipe| {
394                    Box::new(FilterMap {
395                        recipe,
396                        mapper: f.clone(),
397                    }) as Box<dyn Recipe<Output = A>>
398                })
399                .collect(),
400        }
401    }
402
403    /// Returns the amount of recipe units in this [`Subscription`].
404    pub fn units(&self) -> usize {
405        self.recipes.len()
406    }
407}
408
409/// Creates a [`Subscription`] from a [`Recipe`] describing it.
410pub fn from_recipe<T>(
411    recipe: impl Recipe<Output = T> + 'static,
412) -> Subscription<T> {
413    Subscription {
414        recipes: vec![Box::new(recipe)],
415    }
416}
417
418/// Returns the different recipes of the [`Subscription`].
419pub fn into_recipes<T>(
420    subscription: Subscription<T>,
421) -> Vec<Box<dyn Recipe<Output = T>>> {
422    subscription.recipes
423}
424
425impl<T> std::fmt::Debug for Subscription<T> {
426    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
427        f.debug_struct("Subscription").finish()
428    }
429}
430
431/// The description of a [`Subscription`].
432///
433/// A [`Recipe`] is the internal definition of a [`Subscription`]. It is used
434/// by runtimes to run and identify subscriptions. You can use it to create your
435/// own!
436pub trait Recipe {
437    /// The events that will be produced by a [`Subscription`] with this
438    /// [`Recipe`].
439    type Output;
440
441    /// Hashes the [`Recipe`].
442    ///
443    /// This is used by runtimes to uniquely identify a [`Subscription`].
444    fn hash(&self, state: &mut Hasher);
445
446    /// Executes the [`Recipe`] and produces the stream of events of its
447    /// [`Subscription`].
448    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output>;
449}
450
451/// Creates a [`Subscription`] from a hashable id and a filter function.
452pub fn filter_map<I, F, T>(id: I, f: F) -> Subscription<T>
453where
454    I: Hash + 'static,
455    F: Fn(Event) -> Option<T> + MaybeSend + 'static,
456    T: 'static + MaybeSend,
457{
458    from_recipe(Runner {
459        data: id,
460        spawn: |_, events| {
461            use futures::future;
462            use futures::stream::StreamExt;
463
464            events.filter_map(move |event| future::ready(f(event)))
465        },
466    })
467}
468
469struct Runner<I, F, S, T>
470where
471    F: FnOnce(&I, EventStream) -> S,
472    S: Stream<Item = T>,
473{
474    data: I,
475    spawn: F,
476}
477
478impl<I, F, S, T> Recipe for Runner<I, F, S, T>
479where
480    I: Hash + 'static,
481    F: FnOnce(&I, EventStream) -> S,
482    S: Stream<Item = T> + MaybeSend + 'static,
483{
484    type Output = T;
485
486    fn hash(&self, state: &mut Hasher) {
487        std::any::TypeId::of::<I>().hash(state);
488        self.data.hash(state);
489    }
490
491    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
492        crate::boxed_stream((self.spawn)(&self.data, input))
493    }
494}
495
496const fn check_zero_sized<T>() {
497    if std::mem::size_of::<T>() != 0 {
498        panic!(
499            "The Subscription closure provided is not non-capturing. \
500            Closures given to Subscription::map or filter_map cannot \
501            capture external variables. If you need to capture state, \
502            consider using Subscription::with."
503        );
504    }
505}