iced_futures/subscription.rs
1//! Listen to external events in your application.
2mod tracker;
3
4pub use tracker::Tracker;
5
6use crate::core::event;
7use crate::core::theme;
8use crate::core::window;
9use crate::futures::Stream;
10use crate::{BoxStream, MaybeSend};
11
12use std::any::TypeId;
13use std::hash::Hash;
14
15/// A subscription event.
16#[derive(Debug, Clone, PartialEq)]
17pub enum Event {
18 /// A user interacted with a user interface in a window.
19 Interaction {
20 /// The window holding the interface of the interaction.
21 window: window::Id,
22 /// The [`Event`] describing the interaction.
23 ///
24 /// [`Event`]: event::Event
25 event: event::Event,
26
27 /// The [`event::Status`] of the interaction.
28 status: event::Status,
29 },
30
31 /// The system theme has changed.
32 SystemThemeChanged(theme::Mode),
33}
34
35/// A stream of runtime events.
36///
37/// It is the input of a [`Subscription`].
38pub type EventStream = BoxStream<Event>;
39
40/// The hasher used for identifying subscriptions.
41pub type Hasher = rustc_hash::FxHasher;
42
43/// A request to listen to external events.
44///
45/// Besides performing async actions on demand with `Task`, most
46/// applications also need to listen to external events passively.
47///
48/// A [`Subscription`] is normally provided to some runtime, like a `Task`,
49/// and it will generate events as long as the user keeps requesting it.
50///
51/// For instance, you can use a [`Subscription`] to listen to a `WebSocket`
52/// connection, keyboard presses, mouse events, time ticks, etc.
53///
54/// # The Lifetime of a [`Subscription`]
55/// Much like a [`Future`] or a [`Stream`], a [`Subscription`] does not produce any effects
56/// on its own. For a [`Subscription`] to run, it must be returned to the iced runtime—normally
57/// in the `subscription` function of an `application` or a `daemon`.
58///
59/// When a [`Subscription`] is provided to the runtime for the first time, the runtime will
60/// start running it asynchronously. Running a [`Subscription`] consists in building its underlying
61/// [`Stream`] and executing it in an async runtime.
62///
63/// Therefore, you can think of a [`Subscription`] as a "stream builder". It simply represents a way
64/// to build a certain [`Stream`] together with some way to _identify_ it.
65///
66/// Identification is important because when a specific [`Subscription`] stops being returned to the
67/// iced runtime, the runtime will kill its associated [`Stream`]. The runtime uses the identity of a
68/// [`Subscription`] to keep track of it.
69///
70/// This way, iced allows you to declaratively __subscribe__ to particular streams of data temporarily
71/// and whenever necessary.
72///
73/// ```
74/// # mod iced {
75/// # pub mod time {
76/// # pub use iced_futures::backend::default::time::every;
77/// # pub use std::time::{Duration, Instant};
78/// # }
79/// #
80/// # pub use iced_futures::Subscription;
81/// # }
82/// use iced::time::{self, Duration, Instant};
83/// use iced::Subscription;
84///
85/// struct State {
86/// timer_enabled: bool,
87/// }
88///
89/// fn subscription(state: &State) -> Subscription<Instant> {
90/// if state.timer_enabled {
91/// time::every(Duration::from_secs(1))
92/// } else {
93/// Subscription::none()
94/// }
95/// }
96/// ```
97///
98/// [`Future`]: std::future::Future
99#[must_use = "`Subscription` must be returned to the runtime to take effect; normally in your `subscription` function."]
100pub struct Subscription<T> {
101 recipes: Vec<Box<dyn Recipe<Output = T>>>,
102}
103
104impl<T> Subscription<T> {
105 /// Returns an empty [`Subscription`] that will not produce any output.
106 pub fn none() -> Self {
107 Self {
108 recipes: Vec::new(),
109 }
110 }
111
112 /// Returns a [`Subscription`] that will call the given function to create and
113 /// asynchronously run the given [`Stream`].
114 ///
115 /// # Creating an asynchronous worker with bidirectional communication
116 /// You can leverage this helper to create a [`Subscription`] that spawns
117 /// an asynchronous worker in the background and establish a channel of
118 /// communication with an `iced` application.
119 ///
120 /// You can achieve this by creating an `mpsc` channel inside the closure
121 /// and returning the `Sender` as a `Message` for the `Application`:
122 ///
123 /// ```
124 /// # mod iced {
125 /// # pub use iced_futures::Subscription;
126 /// # pub use iced_futures::futures;
127 /// # pub use iced_futures::stream;
128 /// # }
129 /// use iced::futures::channel::mpsc;
130 /// use iced::futures::sink::SinkExt;
131 /// use iced::futures::Stream;
132 /// use iced::stream;
133 /// use iced::Subscription;
134 ///
135 /// pub enum Event {
136 /// Ready(mpsc::Sender<Input>),
137 /// WorkFinished,
138 /// // ...
139 /// }
140 ///
141 /// enum Input {
142 /// DoSomeWork,
143 /// // ...
144 /// }
145 ///
146 /// fn some_worker() -> impl Stream<Item = Event> {
147 /// stream::channel(100, async |mut output| {
148 /// // Create channel
149 /// let (sender, mut receiver) = mpsc::channel(100);
150 ///
151 /// // Send the sender back to the application
152 /// output.send(Event::Ready(sender)).await;
153 ///
154 /// loop {
155 /// use iced_futures::futures::StreamExt;
156 ///
157 /// // Read next input sent from `Application`
158 /// let input = receiver.select_next_some().await;
159 ///
160 /// match input {
161 /// Input::DoSomeWork => {
162 /// // Do some async work...
163 ///
164 /// // Finally, we can optionally produce a message to tell the
165 /// // `Application` the work is done
166 /// output.send(Event::WorkFinished).await;
167 /// }
168 /// }
169 /// }
170 /// })
171 /// }
172 ///
173 /// fn subscription() -> Subscription<Event> {
174 /// Subscription::run(some_worker)
175 /// }
176 /// ```
177 ///
178 /// Check out the [`websocket`] example, which showcases this pattern to maintain a `WebSocket`
179 /// connection open.
180 ///
181 /// [`websocket`]: https://github.com/iced-rs/iced/tree/0.14/examples/websocket
182 pub fn run<S>(builder: fn() -> S) -> Self
183 where
184 S: Stream<Item = T> + MaybeSend + 'static,
185 T: 'static,
186 {
187 from_recipe(Runner {
188 data: builder,
189 spawn: |builder, _| builder(),
190 })
191 }
192
193 /// Returns a [`Subscription`] that will create and asynchronously run the
194 /// given [`Stream`].
195 ///
196 /// Both the `data` and the function pointer will be used to uniquely identify
197 /// the [`Subscription`].
198 pub fn run_with<D, S>(data: D, builder: fn(&D) -> S) -> Self
199 where
200 D: Hash + 'static,
201 S: Stream<Item = T> + MaybeSend + 'static,
202 T: 'static,
203 {
204 from_recipe(Runner {
205 data: (data, builder),
206 spawn: |(data, builder), _| builder(data),
207 })
208 }
209
210 /// Batches all the provided subscriptions and returns the resulting
211 /// [`Subscription`].
212 pub fn batch(
213 subscriptions: impl IntoIterator<Item = Subscription<T>>,
214 ) -> Self {
215 Self {
216 recipes: subscriptions
217 .into_iter()
218 .flat_map(|subscription| subscription.recipes)
219 .collect(),
220 }
221 }
222
223 /// Adds a value to the [`Subscription`] context.
224 ///
225 /// The value will be part of the identity of a [`Subscription`].
226 pub fn with<A>(self, value: A) -> Subscription<(A, T)>
227 where
228 T: 'static,
229 A: std::hash::Hash + Clone + Send + Sync + 'static,
230 {
231 struct With<A, B> {
232 recipe: Box<dyn Recipe<Output = A>>,
233 value: B,
234 }
235
236 impl<A, B> Recipe for With<A, B>
237 where
238 A: 'static,
239 B: 'static + std::hash::Hash + Clone + Send + Sync,
240 {
241 type Output = (B, A);
242
243 fn hash(&self, state: &mut Hasher) {
244 std::any::TypeId::of::<B>().hash(state);
245 self.value.hash(state);
246 self.recipe.hash(state);
247 }
248
249 fn stream(
250 self: Box<Self>,
251 input: EventStream,
252 ) -> BoxStream<Self::Output> {
253 use futures::StreamExt;
254
255 let value = self.value;
256
257 Box::pin(
258 self.recipe
259 .stream(input)
260 .map(move |element| (value.clone(), element)),
261 )
262 }
263 }
264
265 Subscription {
266 recipes: self
267 .recipes
268 .into_iter()
269 .map(|recipe| {
270 Box::new(With {
271 recipe,
272 value: value.clone(),
273 }) as Box<dyn Recipe<Output = (A, T)>>
274 })
275 .collect(),
276 }
277 }
278
279 /// Transforms the [`Subscription`] output with the given function.
280 ///
281 /// The closure provided must be a non-capturing closure.
282 pub fn map<F, A>(self, f: F) -> Subscription<A>
283 where
284 T: 'static,
285 F: Fn(T) -> A + MaybeSend + Clone + 'static,
286 A: 'static,
287 {
288 const {
289 check_zero_sized::<F>();
290 }
291
292 struct Map<A, B, F>
293 where
294 F: Fn(A) -> B + 'static,
295 {
296 recipe: Box<dyn Recipe<Output = A>>,
297 mapper: F,
298 }
299
300 impl<A, B, F> Recipe for Map<A, B, F>
301 where
302 A: 'static,
303 B: 'static,
304 F: Fn(A) -> B + 'static + MaybeSend,
305 {
306 type Output = B;
307
308 fn hash(&self, state: &mut Hasher) {
309 TypeId::of::<F>().hash(state);
310 self.recipe.hash(state);
311 }
312
313 fn stream(
314 self: Box<Self>,
315 input: EventStream,
316 ) -> BoxStream<Self::Output> {
317 use futures::StreamExt;
318
319 Box::pin(self.recipe.stream(input).map(self.mapper))
320 }
321 }
322
323 Subscription {
324 recipes: self
325 .recipes
326 .into_iter()
327 .map(|recipe| {
328 Box::new(Map {
329 recipe,
330 mapper: f.clone(),
331 }) as Box<dyn Recipe<Output = A>>
332 })
333 .collect(),
334 }
335 }
336
337 /// Transforms the [`Subscription`] output with the given function, yielding only
338 /// values only when the function returns `Some(A)`.
339 ///
340 /// The closure provided must be a non-capturing closure.
341 pub fn filter_map<F, A>(mut self, f: F) -> Subscription<A>
342 where
343 T: MaybeSend + 'static,
344 F: Fn(T) -> Option<A> + MaybeSend + Clone + 'static,
345 A: MaybeSend + 'static,
346 {
347 const {
348 check_zero_sized::<F>();
349 }
350
351 struct FilterMap<A, B, F>
352 where
353 F: Fn(A) -> Option<B> + 'static,
354 {
355 recipe: Box<dyn Recipe<Output = A>>,
356 mapper: F,
357 }
358
359 impl<A, B, F> Recipe for FilterMap<A, B, F>
360 where
361 A: 'static,
362 B: 'static + MaybeSend,
363 F: Fn(A) -> Option<B> + MaybeSend,
364 {
365 type Output = B;
366
367 fn hash(&self, state: &mut Hasher) {
368 TypeId::of::<F>().hash(state);
369 self.recipe.hash(state);
370 }
371
372 fn stream(
373 self: Box<Self>,
374 input: EventStream,
375 ) -> BoxStream<Self::Output> {
376 use futures::StreamExt;
377 use futures::future;
378
379 let mapper = self.mapper;
380
381 Box::pin(
382 self.recipe
383 .stream(input)
384 .filter_map(move |a| future::ready(mapper(a))),
385 )
386 }
387 }
388
389 Subscription {
390 recipes: self
391 .recipes
392 .drain(..)
393 .map(|recipe| {
394 Box::new(FilterMap {
395 recipe,
396 mapper: f.clone(),
397 }) as Box<dyn Recipe<Output = A>>
398 })
399 .collect(),
400 }
401 }
402
403 /// Returns the amount of recipe units in this [`Subscription`].
404 pub fn units(&self) -> usize {
405 self.recipes.len()
406 }
407}
408
409/// Creates a [`Subscription`] from a [`Recipe`] describing it.
410pub fn from_recipe<T>(
411 recipe: impl Recipe<Output = T> + 'static,
412) -> Subscription<T> {
413 Subscription {
414 recipes: vec![Box::new(recipe)],
415 }
416}
417
418/// Returns the different recipes of the [`Subscription`].
419pub fn into_recipes<T>(
420 subscription: Subscription<T>,
421) -> Vec<Box<dyn Recipe<Output = T>>> {
422 subscription.recipes
423}
424
425impl<T> std::fmt::Debug for Subscription<T> {
426 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
427 f.debug_struct("Subscription").finish()
428 }
429}
430
431/// The description of a [`Subscription`].
432///
433/// A [`Recipe`] is the internal definition of a [`Subscription`]. It is used
434/// by runtimes to run and identify subscriptions. You can use it to create your
435/// own!
436pub trait Recipe {
437 /// The events that will be produced by a [`Subscription`] with this
438 /// [`Recipe`].
439 type Output;
440
441 /// Hashes the [`Recipe`].
442 ///
443 /// This is used by runtimes to uniquely identify a [`Subscription`].
444 fn hash(&self, state: &mut Hasher);
445
446 /// Executes the [`Recipe`] and produces the stream of events of its
447 /// [`Subscription`].
448 fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output>;
449}
450
451/// Creates a [`Subscription`] from a hashable id and a filter function.
452pub fn filter_map<I, F, T>(id: I, f: F) -> Subscription<T>
453where
454 I: Hash + 'static,
455 F: Fn(Event) -> Option<T> + MaybeSend + 'static,
456 T: 'static + MaybeSend,
457{
458 from_recipe(Runner {
459 data: id,
460 spawn: |_, events| {
461 use futures::future;
462 use futures::stream::StreamExt;
463
464 events.filter_map(move |event| future::ready(f(event)))
465 },
466 })
467}
468
469struct Runner<I, F, S, T>
470where
471 F: FnOnce(&I, EventStream) -> S,
472 S: Stream<Item = T>,
473{
474 data: I,
475 spawn: F,
476}
477
478impl<I, F, S, T> Recipe for Runner<I, F, S, T>
479where
480 I: Hash + 'static,
481 F: FnOnce(&I, EventStream) -> S,
482 S: Stream<Item = T> + MaybeSend + 'static,
483{
484 type Output = T;
485
486 fn hash(&self, state: &mut Hasher) {
487 std::any::TypeId::of::<I>().hash(state);
488 self.data.hash(state);
489 }
490
491 fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
492 crate::boxed_stream((self.spawn)(&self.data, input))
493 }
494}
495
496const fn check_zero_sized<T>() {
497 if std::mem::size_of::<T>() != 0 {
498 panic!(
499 "The Subscription closure provided is not non-capturing. \
500 Closures given to Subscription::map or filter_map cannot \
501 capture external variables. If you need to capture state, \
502 consider using Subscription::with."
503 );
504 }
505}