tokio/sync/
oneshot.rs

1#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3//! A one-shot channel is used for sending a single message between
4//! asynchronous tasks. The [`channel`] function is used to create a
5//! [`Sender`] and [`Receiver`] handle pair that form the channel.
6//!
7//! The `Sender` handle is used by the producer to send the value.
8//! The `Receiver` handle is used by the consumer to receive the value.
9//!
10//! Each handle can be used on separate tasks.
11//!
12//! Since the `send` method is not async, it can be used anywhere. This includes
13//! sending between two runtimes, and using it from non-async code.
14//!
15//! If the [`Receiver`] is closed before receiving a message which has already
16//! been sent, the message will remain in the channel until the receiver is
17//! dropped, at which point the message will be dropped immediately.
18//!
19//! # Examples
20//!
21//! ```
22//! use tokio::sync::oneshot;
23//!
24//! # #[tokio::main(flavor = "current_thread")]
25//! # async fn main() {
26//! let (tx, rx) = oneshot::channel();
27//!
28//! tokio::spawn(async move {
29//!     if let Err(_) = tx.send(3) {
30//!         println!("the receiver dropped");
31//!     }
32//! });
33//!
34//! match rx.await {
35//!     Ok(v) => println!("got = {:?}", v),
36//!     Err(_) => println!("the sender dropped"),
37//! }
38//! # }
39//! ```
40//!
41//! If the sender is dropped without sending, the receiver will fail with
42//! [`error::RecvError`]:
43//!
44//! ```
45//! use tokio::sync::oneshot;
46//!
47//! # #[tokio::main(flavor = "current_thread")]
48//! # async fn main() {
49//! let (tx, rx) = oneshot::channel::<u32>();
50//!
51//! tokio::spawn(async move {
52//!     drop(tx);
53//! });
54//!
55//! match rx.await {
56//!     Ok(_) => panic!("This doesn't happen"),
57//!     Err(_) => println!("the sender dropped"),
58//! }
59//! # }
60//! ```
61//!
62//! To use a `oneshot` channel in a `tokio::select!` loop, add `&mut` in front of
63//! the channel.
64//!
65//! ```
66//! use tokio::sync::oneshot;
67//! use tokio::time::{interval, sleep, Duration};
68//!
69//! # #[tokio::main(flavor = "current_thread")]
70//! # async fn _doc() {}
71//! # #[tokio::main(flavor = "current_thread", start_paused = true)]
72//! # async fn main() {
73//! let (send, mut recv) = oneshot::channel();
74//! let mut interval = interval(Duration::from_millis(100));
75//!
76//! # let handle =
77//! tokio::spawn(async move {
78//!     sleep(Duration::from_secs(1)).await;
79//!     send.send("shut down").unwrap();
80//! });
81//!
82//! loop {
83//!     tokio::select! {
84//!         _ = interval.tick() => println!("Another 100ms"),
85//!         msg = &mut recv => {
86//!             println!("Got message: {}", msg.unwrap());
87//!             break;
88//!         }
89//!     }
90//! }
91//! # handle.await.unwrap();
92//! # }
93//! ```
94//!
95//! To use a `Sender` from a destructor, put it in an [`Option`] and call
96//! [`Option::take`].
97//!
98//! ```
99//! use tokio::sync::oneshot;
100//!
101//! struct SendOnDrop {
102//!     sender: Option<oneshot::Sender<&'static str>>,
103//! }
104//! impl Drop for SendOnDrop {
105//!     fn drop(&mut self) {
106//!         if let Some(sender) = self.sender.take() {
107//!             // Using `let _ =` to ignore send errors.
108//!             let _ = sender.send("I got dropped!");
109//!         }
110//!     }
111//! }
112//!
113//! # #[tokio::main(flavor = "current_thread")]
114//! # async fn _doc() {}
115//! # #[tokio::main(flavor = "current_thread")]
116//! # async fn main() {
117//! let (send, recv) = oneshot::channel();
118//!
119//! let send_on_drop = SendOnDrop { sender: Some(send) };
120//! drop(send_on_drop);
121//!
122//! assert_eq!(recv.await, Ok("I got dropped!"));
123//! # }
124//! ```
125
126use crate::loom::cell::UnsafeCell;
127use crate::loom::sync::atomic::AtomicUsize;
128use crate::loom::sync::Arc;
129#[cfg(all(tokio_unstable, feature = "tracing"))]
130use crate::util::trace;
131
132use std::fmt;
133use std::future::Future;
134use std::mem::MaybeUninit;
135use std::pin::Pin;
136use std::sync::atomic::Ordering::{self, AcqRel, Acquire};
137use std::task::Poll::{Pending, Ready};
138use std::task::{ready, Context, Poll, Waker};
139
140/// Sends a value to the associated [`Receiver`].
141///
142/// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
143/// [`channel`](fn@channel) function.
144///
145/// # Examples
146///
147/// ```
148/// use tokio::sync::oneshot;
149///
150/// # #[tokio::main(flavor = "current_thread")]
151/// # async fn main() {
152/// let (tx, rx) = oneshot::channel();
153///
154/// tokio::spawn(async move {
155///     if let Err(_) = tx.send(3) {
156///         println!("the receiver dropped");
157///     }
158/// });
159///
160/// match rx.await {
161///     Ok(v) => println!("got = {:?}", v),
162///     Err(_) => println!("the sender dropped"),
163/// }
164/// # }
165/// ```
166///
167/// If the sender is dropped without sending, the receiver will fail with
168/// [`error::RecvError`]:
169///
170/// ```
171/// use tokio::sync::oneshot;
172///
173/// # #[tokio::main(flavor = "current_thread")]
174/// # async fn main() {
175/// let (tx, rx) = oneshot::channel::<u32>();
176///
177/// tokio::spawn(async move {
178///     drop(tx);
179/// });
180///
181/// match rx.await {
182///     Ok(_) => panic!("This doesn't happen"),
183///     Err(_) => println!("the sender dropped"),
184/// }
185/// # }
186/// ```
187///
188/// To use a `Sender` from a destructor, put it in an [`Option`] and call
189/// [`Option::take`].
190///
191/// ```
192/// use tokio::sync::oneshot;
193///
194/// struct SendOnDrop {
195///     sender: Option<oneshot::Sender<&'static str>>,
196/// }
197/// impl Drop for SendOnDrop {
198///     fn drop(&mut self) {
199///         if let Some(sender) = self.sender.take() {
200///             // Using `let _ =` to ignore send errors.
201///             let _ = sender.send("I got dropped!");
202///         }
203///     }
204/// }
205///
206/// # #[tokio::main(flavor = "current_thread")]
207/// # async fn _doc() {}
208/// # #[tokio::main(flavor = "current_thread")]
209/// # async fn main() {
210/// let (send, recv) = oneshot::channel();
211///
212/// let send_on_drop = SendOnDrop { sender: Some(send) };
213/// drop(send_on_drop);
214///
215/// assert_eq!(recv.await, Ok("I got dropped!"));
216/// # }
217/// ```
218///
219/// [`Option`]: std::option::Option
220/// [`Option::take`]: std::option::Option::take
221#[derive(Debug)]
222pub struct Sender<T> {
223    inner: Option<Arc<Inner<T>>>,
224    #[cfg(all(tokio_unstable, feature = "tracing"))]
225    resource_span: tracing::Span,
226}
227
228/// Receives a value from the associated [`Sender`].
229///
230/// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
231/// [`channel`](fn@channel) function.
232///
233/// This channel has no `recv` method because the receiver itself implements the
234/// [`Future`] trait. To receive a `Result<T, `[`error::RecvError`]`>`, `.await` the `Receiver` object directly.
235///
236/// The `poll` method on the `Future` trait is allowed to spuriously return
237/// `Poll::Pending` even if the message has been sent. If such a spurious
238/// failure happens, then the caller will be woken when the spurious failure has
239/// been resolved so that the caller can attempt to receive the message again.
240/// Note that receiving such a wakeup does not guarantee that the next call will
241/// succeed — it could fail with another spurious failure. (A spurious failure
242/// does not mean that the message is lost. It is just delayed.)
243///
244/// [`Future`]: trait@std::future::Future
245///
246/// # Cancellation safety
247///
248/// The `Receiver` is cancel safe. If it is used as the event in a
249/// [`tokio::select!`](crate::select) statement and some other branch
250/// completes first, it is guaranteed that no message was received on this
251/// channel.
252///
253/// # Examples
254///
255/// ```
256/// use tokio::sync::oneshot;
257///
258/// # #[tokio::main(flavor = "current_thread")]
259/// # async fn main() {
260/// let (tx, rx) = oneshot::channel();
261///
262/// tokio::spawn(async move {
263///     if let Err(_) = tx.send(3) {
264///         println!("the receiver dropped");
265///     }
266/// });
267///
268/// match rx.await {
269///     Ok(v) => println!("got = {:?}", v),
270///     Err(_) => println!("the sender dropped"),
271/// }
272/// # }
273/// ```
274///
275/// If the sender is dropped without sending, the receiver will fail with
276/// [`error::RecvError`]:
277///
278/// ```
279/// use tokio::sync::oneshot;
280///
281/// # #[tokio::main(flavor = "current_thread")]
282/// # async fn main() {
283/// let (tx, rx) = oneshot::channel::<u32>();
284///
285/// tokio::spawn(async move {
286///     drop(tx);
287/// });
288///
289/// match rx.await {
290///     Ok(_) => panic!("This doesn't happen"),
291///     Err(_) => println!("the sender dropped"),
292/// }
293/// # }
294/// ```
295///
296/// To use a `Receiver` in a `tokio::select!` loop, add `&mut` in front of the
297/// channel.
298///
299/// ```
300/// use tokio::sync::oneshot;
301/// use tokio::time::{interval, sleep, Duration};
302///
303/// # #[tokio::main(flavor = "current_thread")]
304/// # async fn _doc() {}
305/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
306/// # async fn main() {
307/// let (send, mut recv) = oneshot::channel();
308/// let mut interval = interval(Duration::from_millis(100));
309///
310/// # let handle =
311/// tokio::spawn(async move {
312///     sleep(Duration::from_secs(1)).await;
313///     send.send("shut down").unwrap();
314/// });
315///
316/// loop {
317///     tokio::select! {
318///         _ = interval.tick() => println!("Another 100ms"),
319///         msg = &mut recv => {
320///             println!("Got message: {}", msg.unwrap());
321///             break;
322///         }
323///     }
324/// }
325/// # handle.await.unwrap();
326/// # }
327/// ```
328#[derive(Debug)]
329pub struct Receiver<T> {
330    inner: Option<Arc<Inner<T>>>,
331    #[cfg(all(tokio_unstable, feature = "tracing"))]
332    resource_span: tracing::Span,
333    #[cfg(all(tokio_unstable, feature = "tracing"))]
334    async_op_span: tracing::Span,
335    #[cfg(all(tokio_unstable, feature = "tracing"))]
336    async_op_poll_span: tracing::Span,
337}
338
339pub mod error {
340    //! `Oneshot` error types.
341
342    use std::fmt;
343
344    /// Error returned by the `Future` implementation for `Receiver`.
345    ///
346    /// This error is returned by the receiver when the sender is dropped without sending.
347    #[derive(Debug, Eq, PartialEq, Clone)]
348    pub struct RecvError(pub(super) ());
349
350    /// Error returned by the `try_recv` function on `Receiver`.
351    #[derive(Debug, Eq, PartialEq, Clone)]
352    pub enum TryRecvError {
353        /// The send half of the channel has not yet sent a value.
354        Empty,
355
356        /// The send half of the channel was dropped without sending a value.
357        Closed,
358    }
359
360    // ===== impl RecvError =====
361
362    impl fmt::Display for RecvError {
363        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
364            write!(fmt, "channel closed")
365        }
366    }
367
368    impl std::error::Error for RecvError {}
369
370    // ===== impl TryRecvError =====
371
372    impl fmt::Display for TryRecvError {
373        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
374            match self {
375                TryRecvError::Empty => write!(fmt, "channel empty"),
376                TryRecvError::Closed => write!(fmt, "channel closed"),
377            }
378        }
379    }
380
381    impl std::error::Error for TryRecvError {}
382}
383
384use self::error::*;
385
386struct Inner<T> {
387    /// Manages the state of the inner cell.
388    state: AtomicUsize,
389
390    /// The value. This is set by `Sender` and read by `Receiver`. The state of
391    /// the cell is tracked by `state`.
392    value: UnsafeCell<Option<T>>,
393
394    /// The task to notify when the receiver drops without consuming the value.
395    ///
396    /// ## Safety
397    ///
398    /// The `TX_TASK_SET` bit in the `state` field is set if this field is
399    /// initialized. If that bit is unset, this field may be uninitialized.
400    tx_task: Task,
401
402    /// The task to notify when the value is sent.
403    ///
404    /// ## Safety
405    ///
406    /// The `RX_TASK_SET` bit in the `state` field is set if this field is
407    /// initialized. If that bit is unset, this field may be uninitialized.
408    rx_task: Task,
409}
410
411struct Task(UnsafeCell<MaybeUninit<Waker>>);
412
413impl Task {
414    /// # Safety
415    ///
416    /// The caller must do the necessary synchronization to ensure that
417    /// the [`Self::0`] contains the valid [`Waker`] during the call.
418    unsafe fn will_wake(&self, cx: &mut Context<'_>) -> bool {
419        unsafe { self.with_task(|w| w.will_wake(cx.waker())) }
420    }
421
422    /// # Safety
423    ///
424    /// The caller must do the necessary synchronization to ensure that
425    /// the [`Self::0`] contains the valid [`Waker`] during the call.
426    unsafe fn with_task<F, R>(&self, f: F) -> R
427    where
428        F: FnOnce(&Waker) -> R,
429    {
430        self.0.with(|ptr| {
431            let waker: *const Waker = unsafe { (*ptr).as_ptr() };
432            f(unsafe { &*waker })
433        })
434    }
435
436    /// # Safety
437    ///
438    /// The caller must do the necessary synchronization to ensure that
439    /// the [`Self::0`] contains the valid [`Waker`] during the call.
440    unsafe fn drop_task(&self) {
441        self.0.with_mut(|ptr| {
442            let ptr: *mut Waker = unsafe { (*ptr).as_mut_ptr() };
443            unsafe {
444                ptr.drop_in_place();
445            }
446        });
447    }
448
449    /// # Safety
450    ///
451    /// The caller must do the necessary synchronization to ensure that
452    /// the [`Self::0`] contains the valid [`Waker`] during the call.
453    unsafe fn set_task(&self, cx: &mut Context<'_>) {
454        self.0.with_mut(|ptr| {
455            let ptr: *mut Waker = unsafe { (*ptr).as_mut_ptr() };
456            unsafe {
457                ptr.write(cx.waker().clone());
458            }
459        });
460    }
461}
462
463#[derive(Clone, Copy)]
464struct State(usize);
465
466/// Creates a new one-shot channel for sending single values across asynchronous
467/// tasks.
468///
469/// The function returns separate "send" and "receive" handles. The `Sender`
470/// handle is used by the producer to send the value. The `Receiver` handle is
471/// used by the consumer to receive the value.
472///
473/// Each handle can be used on separate tasks.
474///
475/// # Examples
476///
477/// ```
478/// use tokio::sync::oneshot;
479///
480/// # #[tokio::main(flavor = "current_thread")]
481/// # async fn main() {
482/// let (tx, rx) = oneshot::channel();
483///
484/// tokio::spawn(async move {
485///     if let Err(_) = tx.send(3) {
486///         println!("the receiver dropped");
487///     }
488/// });
489///
490/// match rx.await {
491///     Ok(v) => println!("got = {:?}", v),
492///     Err(_) => println!("the sender dropped"),
493/// }
494/// # }
495/// ```
496#[track_caller]
497pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
498    #[cfg(all(tokio_unstable, feature = "tracing"))]
499    let resource_span = {
500        let location = std::panic::Location::caller();
501
502        let resource_span = tracing::trace_span!(
503            parent: None,
504            "runtime.resource",
505            concrete_type = "Sender|Receiver",
506            kind = "Sync",
507            loc.file = location.file(),
508            loc.line = location.line(),
509            loc.col = location.column(),
510        );
511
512        resource_span.in_scope(|| {
513            tracing::trace!(
514            target: "runtime::resource::state_update",
515            tx_dropped = false,
516            tx_dropped.op = "override",
517            )
518        });
519
520        resource_span.in_scope(|| {
521            tracing::trace!(
522            target: "runtime::resource::state_update",
523            rx_dropped = false,
524            rx_dropped.op = "override",
525            )
526        });
527
528        resource_span.in_scope(|| {
529            tracing::trace!(
530            target: "runtime::resource::state_update",
531            value_sent = false,
532            value_sent.op = "override",
533            )
534        });
535
536        resource_span.in_scope(|| {
537            tracing::trace!(
538            target: "runtime::resource::state_update",
539            value_received = false,
540            value_received.op = "override",
541            )
542        });
543
544        resource_span
545    };
546
547    let inner = Arc::new(Inner {
548        state: AtomicUsize::new(State::new().as_usize()),
549        value: UnsafeCell::new(None),
550        tx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
551        rx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
552    });
553
554    let tx = Sender {
555        inner: Some(inner.clone()),
556        #[cfg(all(tokio_unstable, feature = "tracing"))]
557        resource_span: resource_span.clone(),
558    };
559
560    #[cfg(all(tokio_unstable, feature = "tracing"))]
561    let async_op_span = resource_span
562        .in_scope(|| tracing::trace_span!("runtime.resource.async_op", source = "Receiver::await"));
563
564    #[cfg(all(tokio_unstable, feature = "tracing"))]
565    let async_op_poll_span =
566        async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll"));
567
568    let rx = Receiver {
569        inner: Some(inner),
570        #[cfg(all(tokio_unstable, feature = "tracing"))]
571        resource_span,
572        #[cfg(all(tokio_unstable, feature = "tracing"))]
573        async_op_span,
574        #[cfg(all(tokio_unstable, feature = "tracing"))]
575        async_op_poll_span,
576    };
577
578    (tx, rx)
579}
580
581impl<T> Sender<T> {
582    /// Attempts to send a value on this channel, returning it back if it could
583    /// not be sent.
584    ///
585    /// This method consumes `self` as only one value may ever be sent on a `oneshot`
586    /// channel. It is not marked async because sending a message to an `oneshot`
587    /// channel never requires any form of waiting.  Because of this, the `send`
588    /// method can be used in both synchronous and asynchronous code without
589    /// problems.
590    ///
591    /// A successful send occurs when it is determined that the other end of the
592    /// channel has not hung up already. An unsuccessful send would be one where
593    /// the corresponding receiver has already been deallocated. Note that a
594    /// return value of `Err` means that the data will never be received, but
595    /// a return value of `Ok` does *not* mean that the data will be received.
596    /// It is possible for the corresponding receiver to hang up immediately
597    /// after this function returns `Ok`.
598    ///
599    /// # Examples
600    ///
601    /// Send a value to another task
602    ///
603    /// ```
604    /// use tokio::sync::oneshot;
605    ///
606    /// # #[tokio::main(flavor = "current_thread")]
607    /// # async fn main() {
608    /// let (tx, rx) = oneshot::channel();
609    ///
610    /// tokio::spawn(async move {
611    ///     if let Err(_) = tx.send(3) {
612    ///         println!("the receiver dropped");
613    ///     }
614    /// });
615    ///
616    /// match rx.await {
617    ///     Ok(v) => println!("got = {:?}", v),
618    ///     Err(_) => println!("the sender dropped"),
619    /// }
620    /// # }
621    /// ```
622    pub fn send(mut self, t: T) -> Result<(), T> {
623        let inner = self.inner.take().unwrap();
624
625        inner.value.with_mut(|ptr| unsafe {
626            // SAFETY: The receiver will not access the `UnsafeCell` unless the
627            // channel has been marked as "complete" (the `VALUE_SENT` state bit
628            // is set).
629            // That bit is only set by the sender later on in this method, and
630            // calling this method consumes `self`. Therefore, if it was possible to
631            // call this method, we know that the `VALUE_SENT` bit is unset, and
632            // the receiver is not currently accessing the `UnsafeCell`.
633            *ptr = Some(t);
634        });
635
636        if !inner.complete() {
637            unsafe {
638                // SAFETY: The receiver will not access the `UnsafeCell` unless
639                // the channel has been marked as "complete". Calling
640                // `complete()` will return true if this bit is set, and false
641                // if it is not set. Thus, if `complete()` returned false, it is
642                // safe for us to access the value, because we know that the
643                // receiver will not.
644                return Err(inner.consume_value().unwrap());
645            }
646        }
647
648        #[cfg(all(tokio_unstable, feature = "tracing"))]
649        self.resource_span.in_scope(|| {
650            tracing::trace!(
651            target: "runtime::resource::state_update",
652            value_sent = true,
653            value_sent.op = "override",
654            )
655        });
656
657        Ok(())
658    }
659
660    /// Waits for the associated [`Receiver`] handle to close.
661    ///
662    /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
663    /// [`Receiver`] value is dropped.
664    ///
665    /// This function is useful when paired with `select!` to abort a
666    /// computation when the receiver is no longer interested in the result.
667    ///
668    /// # Return
669    ///
670    /// Returns a `Future` which must be awaited on.
671    ///
672    /// [`Receiver`]: Receiver
673    /// [`close`]: Receiver::close
674    ///
675    /// # Examples
676    ///
677    /// Basic usage
678    ///
679    /// ```
680    /// use tokio::sync::oneshot;
681    ///
682    /// # #[tokio::main(flavor = "current_thread")]
683    /// # async fn main() {
684    /// let (mut tx, rx) = oneshot::channel::<()>();
685    ///
686    /// tokio::spawn(async move {
687    ///     drop(rx);
688    /// });
689    ///
690    /// tx.closed().await;
691    /// println!("the receiver dropped");
692    /// # }
693    /// ```
694    ///
695    /// Paired with select
696    ///
697    /// ```
698    /// use tokio::sync::oneshot;
699    /// use tokio::time::{self, Duration};
700    ///
701    /// async fn compute() -> String {
702    ///     // Complex computation returning a `String`
703    /// # "hello".to_string()
704    /// }
705    ///
706    /// # #[tokio::main(flavor = "current_thread")]
707    /// # async fn main() {
708    /// let (mut tx, rx) = oneshot::channel();
709    ///
710    /// tokio::spawn(async move {
711    ///     tokio::select! {
712    ///         _ = tx.closed() => {
713    ///             // The receiver dropped, no need to do any further work
714    ///         }
715    ///         value = compute() => {
716    ///             // The send can fail if the channel was closed at the exact same
717    ///             // time as when compute() finished, so just ignore the failure.
718    ///             let _ = tx.send(value);
719    ///         }
720    ///     }
721    /// });
722    ///
723    /// // Wait for up to 10 seconds
724    /// let _ = time::timeout(Duration::from_secs(10), rx).await;
725    /// # }
726    /// ```
727    pub async fn closed(&mut self) {
728        use std::future::poll_fn;
729
730        #[cfg(all(tokio_unstable, feature = "tracing"))]
731        let resource_span = self.resource_span.clone();
732        #[cfg(all(tokio_unstable, feature = "tracing"))]
733        let closed = trace::async_op(
734            || poll_fn(|cx| self.poll_closed(cx)),
735            resource_span,
736            "Sender::closed",
737            "poll_closed",
738            false,
739        );
740        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
741        let closed = poll_fn(|cx| self.poll_closed(cx));
742
743        closed.await;
744    }
745
746    /// Returns `true` if the associated [`Receiver`] handle has been dropped.
747    ///
748    /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
749    /// [`Receiver`] value is dropped.
750    ///
751    /// If `true` is returned, a call to `send` will always result in an error.
752    ///
753    /// [`Receiver`]: Receiver
754    /// [`close`]: Receiver::close
755    ///
756    /// # Examples
757    ///
758    /// ```
759    /// use tokio::sync::oneshot;
760    ///
761    /// # #[tokio::main(flavor = "current_thread")]
762    /// # async fn main() {
763    /// let (tx, rx) = oneshot::channel();
764    ///
765    /// assert!(!tx.is_closed());
766    ///
767    /// drop(rx);
768    ///
769    /// assert!(tx.is_closed());
770    /// assert!(tx.send("never received").is_err());
771    /// # }
772    /// ```
773    pub fn is_closed(&self) -> bool {
774        let inner = self.inner.as_ref().unwrap();
775
776        let state = State::load(&inner.state, Acquire);
777        state.is_closed()
778    }
779
780    /// Checks whether the `oneshot` channel has been closed, and if not, schedules the
781    /// `Waker` in the provided `Context` to receive a notification when the channel is
782    /// closed.
783    ///
784    /// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the
785    /// [`Receiver`] value is dropped.
786    ///
787    /// Note that on multiple calls to poll, only the `Waker` from the `Context` passed
788    /// to the most recent call will be scheduled to receive a wakeup.
789    ///
790    /// [`Receiver`]: struct@crate::sync::oneshot::Receiver
791    /// [`close`]: fn@crate::sync::oneshot::Receiver::close
792    ///
793    /// # Return value
794    ///
795    /// This function returns:
796    ///
797    ///  * `Poll::Pending` if the channel is still open.
798    ///  * `Poll::Ready(())` if the channel is closed.
799    ///
800    /// # Examples
801    ///
802    /// ```
803    /// use tokio::sync::oneshot;
804    ///
805    /// use std::future::poll_fn;
806    ///
807    /// # #[tokio::main(flavor = "current_thread")]
808    /// # async fn main() {
809    /// let (mut tx, mut rx) = oneshot::channel::<()>();
810    ///
811    /// tokio::spawn(async move {
812    ///     rx.close();
813    /// });
814    ///
815    /// poll_fn(|cx| tx.poll_closed(cx)).await;
816    ///
817    /// println!("the receiver dropped");
818    /// # }
819    /// ```
820    pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
821        ready!(crate::trace::trace_leaf(cx));
822
823        // Keep track of task budget
824        let coop = ready!(crate::task::coop::poll_proceed(cx));
825
826        let inner = self.inner.as_ref().unwrap();
827
828        let mut state = State::load(&inner.state, Acquire);
829
830        if state.is_closed() {
831            coop.made_progress();
832            return Ready(());
833        }
834
835        if state.is_tx_task_set() {
836            let will_notify = unsafe { inner.tx_task.will_wake(cx) };
837
838            if !will_notify {
839                state = State::unset_tx_task(&inner.state);
840
841                if state.is_closed() {
842                    // Set the flag again so that the waker is released in drop
843                    State::set_tx_task(&inner.state);
844                    coop.made_progress();
845                    return Ready(());
846                } else {
847                    unsafe { inner.tx_task.drop_task() };
848                }
849            }
850        }
851
852        if !state.is_tx_task_set() {
853            // Attempt to set the task
854            unsafe {
855                inner.tx_task.set_task(cx);
856            }
857
858            // Update the state
859            state = State::set_tx_task(&inner.state);
860
861            if state.is_closed() {
862                coop.made_progress();
863                return Ready(());
864            }
865        }
866
867        Pending
868    }
869}
870
871impl<T> Drop for Sender<T> {
872    fn drop(&mut self) {
873        if let Some(inner) = self.inner.as_ref() {
874            inner.complete();
875            #[cfg(all(tokio_unstable, feature = "tracing"))]
876            self.resource_span.in_scope(|| {
877                tracing::trace!(
878                target: "runtime::resource::state_update",
879                tx_dropped = true,
880                tx_dropped.op = "override",
881                )
882            });
883        }
884    }
885}
886
887impl<T> Receiver<T> {
888    /// Prevents the associated [`Sender`] handle from sending a value.
889    ///
890    /// Any `send` operation which happens after calling `close` is guaranteed
891    /// to fail. After calling `close`, [`try_recv`] should be called to
892    /// receive a value if one was sent **before** the call to `close`
893    /// completed.
894    ///
895    /// This function is useful to perform a graceful shutdown and ensure that a
896    /// value will not be sent into the channel and never received.
897    ///
898    /// `close` is no-op if a message is already received or the channel
899    /// is already closed.
900    ///
901    /// [`Sender`]: Sender
902    /// [`try_recv`]: Receiver::try_recv
903    ///
904    /// # Examples
905    ///
906    /// Prevent a value from being sent
907    ///
908    /// ```
909    /// use tokio::sync::oneshot;
910    /// use tokio::sync::oneshot::error::TryRecvError;
911    ///
912    /// # #[tokio::main(flavor = "current_thread")]
913    /// # async fn main() {
914    /// let (tx, mut rx) = oneshot::channel();
915    ///
916    /// assert!(!tx.is_closed());
917    ///
918    /// rx.close();
919    ///
920    /// assert!(tx.is_closed());
921    /// assert!(tx.send("never received").is_err());
922    ///
923    /// match rx.try_recv() {
924    ///     Err(TryRecvError::Closed) => {}
925    ///     _ => unreachable!(),
926    /// }
927    /// # }
928    /// ```
929    ///
930    /// Receive a value sent **before** calling `close`
931    ///
932    /// ```
933    /// use tokio::sync::oneshot;
934    ///
935    /// # #[tokio::main(flavor = "current_thread")]
936    /// # async fn main() {
937    /// let (tx, mut rx) = oneshot::channel();
938    ///
939    /// assert!(tx.send("will receive").is_ok());
940    ///
941    /// rx.close();
942    ///
943    /// let msg = rx.try_recv().unwrap();
944    /// assert_eq!(msg, "will receive");
945    /// # }
946    /// ```
947    pub fn close(&mut self) {
948        if let Some(inner) = self.inner.as_ref() {
949            inner.close();
950            #[cfg(all(tokio_unstable, feature = "tracing"))]
951            self.resource_span.in_scope(|| {
952                tracing::trace!(
953                target: "runtime::resource::state_update",
954                rx_dropped = true,
955                rx_dropped.op = "override",
956                )
957            });
958        }
959    }
960
961    /// Checks if this receiver is terminated.
962    ///
963    /// This function returns true if this receiver has already yielded a [`Poll::Ready`] result.
964    /// If so, this receiver should no longer be polled.
965    ///
966    /// # Examples
967    ///
968    /// Sending a value and polling it.
969    ///
970    /// ```
971    /// use tokio::sync::oneshot;
972    ///
973    /// use std::task::Poll;
974    ///
975    /// # #[tokio::main(flavor = "current_thread")]
976    /// # async fn main() {
977    /// let (tx, mut rx) = oneshot::channel();
978    ///
979    /// // A receiver is not terminated when it is initialized.
980    /// assert!(!rx.is_terminated());
981    ///
982    /// // A receiver is not terminated it is polled and is still pending.
983    /// let poll = futures::poll!(&mut rx);
984    /// assert_eq!(poll, Poll::Pending);
985    /// assert!(!rx.is_terminated());
986    ///
987    /// // A receiver is not terminated if a value has been sent, but not yet read.
988    /// tx.send(0).unwrap();
989    /// assert!(!rx.is_terminated());
990    ///
991    /// // A receiver *is* terminated after it has been polled and yielded a value.
992    /// assert_eq!((&mut rx).await, Ok(0));
993    /// assert!(rx.is_terminated());
994    /// # }
995    /// ```
996    ///
997    /// Dropping the sender.
998    ///
999    /// ```
1000    /// use tokio::sync::oneshot;
1001    ///
1002    /// # #[tokio::main(flavor = "current_thread")]
1003    /// # async fn main() {
1004    /// let (tx, mut rx) = oneshot::channel::<()>();
1005    ///
1006    /// // A receiver is not immediately terminated when the sender is dropped.
1007    /// drop(tx);
1008    /// assert!(!rx.is_terminated());
1009    ///
1010    /// // A receiver *is* terminated after it has been polled and yielded an error.
1011    /// let _ = (&mut rx).await.unwrap_err();
1012    /// assert!(rx.is_terminated());
1013    /// # }
1014    /// ```
1015    pub fn is_terminated(&self) -> bool {
1016        self.inner.is_none()
1017    }
1018
1019    /// Checks if a channel is empty.
1020    ///
1021    /// This method returns `true` if the channel has no messages.
1022    ///
1023    /// It is not necessarily safe to poll an empty receiver, which may have
1024    /// already yielded a value. Use [`is_terminated()`][Self::is_terminated]
1025    /// to check whether or not a receiver can be safely polled, instead.
1026    ///
1027    /// # Examples
1028    ///
1029    /// Sending a value.
1030    ///
1031    /// ```
1032    /// use tokio::sync::oneshot;
1033    ///
1034    /// # #[tokio::main(flavor = "current_thread")]
1035    /// # async fn main() {
1036    /// let (tx, mut rx) = oneshot::channel();
1037    /// assert!(rx.is_empty());
1038    ///
1039    /// tx.send(0).unwrap();
1040    /// assert!(!rx.is_empty());
1041    ///
1042    /// let _ = (&mut rx).await;
1043    /// assert!(rx.is_empty());
1044    /// # }
1045    /// ```
1046    ///
1047    /// Dropping the sender.
1048    ///
1049    /// ```
1050    /// use tokio::sync::oneshot;
1051    ///
1052    /// # #[tokio::main(flavor = "current_thread")]
1053    /// # async fn main() {
1054    /// let (tx, mut rx) = oneshot::channel::<()>();
1055    ///
1056    /// // A channel is empty if the sender is dropped.
1057    /// drop(tx);
1058    /// assert!(rx.is_empty());
1059    ///
1060    /// // A closed channel still yields an error, however.
1061    /// (&mut rx).await.expect_err("should yield an error");
1062    /// assert!(rx.is_empty());
1063    /// # }
1064    /// ```
1065    ///
1066    /// Terminated channels are empty.
1067    ///
1068    /// ```should_panic,ignore-wasm
1069    /// use tokio::sync::oneshot;
1070    ///
1071    /// #[tokio::main]
1072    /// async fn main() {
1073    ///     let (tx, mut rx) = oneshot::channel();
1074    ///     tx.send(0).unwrap();
1075    ///     let _ = (&mut rx).await;
1076    ///
1077    ///     // NB: an empty channel is not necessarily safe to poll!
1078    ///     assert!(rx.is_empty());
1079    ///     let _ = (&mut rx).await;
1080    /// }
1081    /// ```
1082    pub fn is_empty(&self) -> bool {
1083        let Some(inner) = self.inner.as_ref() else {
1084            // The channel has already terminated.
1085            return true;
1086        };
1087
1088        let state = State::load(&inner.state, Acquire);
1089        if state.is_complete() {
1090            // SAFETY: If `state.is_complete()` returns true, then the
1091            // `VALUE_SENT` bit has been set and the sender side of the
1092            // channel will no longer attempt to access the inner
1093            // `UnsafeCell`. Therefore, it is now safe for us to access the
1094            // cell.
1095            //
1096            // The channel is empty if it does not have a value.
1097            unsafe { !inner.has_value() }
1098        } else {
1099            // The receiver closed the channel or no value has been sent yet.
1100            true
1101        }
1102    }
1103
1104    /// Attempts to receive a value.
1105    ///
1106    /// If a pending value exists in the channel, it is returned. If no value
1107    /// has been sent, the current task **will not** be registered for
1108    /// future notification.
1109    ///
1110    /// This function is useful to call from outside the context of an
1111    /// asynchronous task.
1112    ///
1113    /// Note that unlike the `poll` method, the `try_recv` method cannot fail
1114    /// spuriously. Any send or close event that happens before this call to
1115    /// `try_recv` will be correctly returned to the caller.
1116    ///
1117    /// # Return
1118    ///
1119    /// - `Ok(T)` if a value is pending in the channel.
1120    /// - `Err(TryRecvError::Empty)` if no value has been sent yet.
1121    /// - `Err(TryRecvError::Closed)` if the sender has dropped without sending
1122    ///   a value, or if the message has already been received.
1123    ///
1124    /// # Examples
1125    ///
1126    /// `try_recv` before a value is sent, then after.
1127    ///
1128    /// ```
1129    /// use tokio::sync::oneshot;
1130    /// use tokio::sync::oneshot::error::TryRecvError;
1131    ///
1132    /// # #[tokio::main(flavor = "current_thread")]
1133    /// # async fn main() {
1134    /// let (tx, mut rx) = oneshot::channel();
1135    ///
1136    /// match rx.try_recv() {
1137    ///     // The channel is currently empty
1138    ///     Err(TryRecvError::Empty) => {}
1139    ///     _ => unreachable!(),
1140    /// }
1141    ///
1142    /// // Send a value
1143    /// tx.send("hello").unwrap();
1144    ///
1145    /// match rx.try_recv() {
1146    ///      Ok(value) => assert_eq!(value, "hello"),
1147    ///      _ => unreachable!(),
1148    /// }
1149    /// # }
1150    /// ```
1151    ///
1152    /// `try_recv` when the sender dropped before sending a value
1153    ///
1154    /// ```
1155    /// use tokio::sync::oneshot;
1156    /// use tokio::sync::oneshot::error::TryRecvError;
1157    ///
1158    /// # #[tokio::main(flavor = "current_thread")]
1159    /// # async fn main() {
1160    /// let (tx, mut rx) = oneshot::channel::<()>();
1161    ///
1162    /// drop(tx);
1163    ///
1164    /// match rx.try_recv() {
1165    ///     // The channel will never receive a value.
1166    ///     Err(TryRecvError::Closed) => {}
1167    ///     _ => unreachable!(),
1168    /// }
1169    /// # }
1170    /// ```
1171    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1172        let result = if let Some(inner) = self.inner.as_ref() {
1173            let state = State::load(&inner.state, Acquire);
1174
1175            if state.is_complete() {
1176                // SAFETY: If `state.is_complete()` returns true, then the
1177                // `VALUE_SENT` bit has been set and the sender side of the
1178                // channel will no longer attempt to access the inner
1179                // `UnsafeCell`. Therefore, it is now safe for us to access the
1180                // cell.
1181                match unsafe { inner.consume_value() } {
1182                    Some(value) => {
1183                        #[cfg(all(tokio_unstable, feature = "tracing"))]
1184                        self.resource_span.in_scope(|| {
1185                            tracing::trace!(
1186                            target: "runtime::resource::state_update",
1187                            value_received = true,
1188                            value_received.op = "override",
1189                            )
1190                        });
1191                        Ok(value)
1192                    }
1193                    None => Err(TryRecvError::Closed),
1194                }
1195            } else if state.is_closed() {
1196                Err(TryRecvError::Closed)
1197            } else {
1198                // Not ready, this does not clear `inner`
1199                return Err(TryRecvError::Empty);
1200            }
1201        } else {
1202            Err(TryRecvError::Closed)
1203        };
1204
1205        self.inner = None;
1206        result
1207    }
1208
1209    /// Blocking receive to call outside of asynchronous contexts.
1210    ///
1211    /// # Panics
1212    ///
1213    /// This function panics if called within an asynchronous execution
1214    /// context.
1215    ///
1216    /// # Examples
1217    ///
1218    /// ```
1219    /// # #[cfg(not(target_family = "wasm"))]
1220    /// # {
1221    /// use std::thread;
1222    /// use tokio::sync::oneshot;
1223    ///
1224    /// #[tokio::main]
1225    /// async fn main() {
1226    ///     let (tx, rx) = oneshot::channel::<u8>();
1227    ///
1228    ///     let sync_code = thread::spawn(move || {
1229    ///         assert_eq!(Ok(10), rx.blocking_recv());
1230    ///     });
1231    ///
1232    ///     let _ = tx.send(10);
1233    ///     sync_code.join().unwrap();
1234    /// }
1235    /// # }
1236    /// ```
1237    #[track_caller]
1238    #[cfg(feature = "sync")]
1239    #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
1240    pub fn blocking_recv(self) -> Result<T, RecvError> {
1241        crate::future::block_on(self)
1242    }
1243}
1244
1245impl<T> Drop for Receiver<T> {
1246    fn drop(&mut self) {
1247        if let Some(inner) = self.inner.as_ref() {
1248            let state = inner.close();
1249
1250            if state.is_complete() {
1251                // SAFETY: we have ensured that the `VALUE_SENT` bit has been set,
1252                // so only the receiver can access the value.
1253                drop(unsafe { inner.consume_value() });
1254            }
1255
1256            #[cfg(all(tokio_unstable, feature = "tracing"))]
1257            self.resource_span.in_scope(|| {
1258                tracing::trace!(
1259                target: "runtime::resource::state_update",
1260                rx_dropped = true,
1261                rx_dropped.op = "override",
1262                )
1263            });
1264        }
1265    }
1266}
1267
1268impl<T> Future for Receiver<T> {
1269    type Output = Result<T, RecvError>;
1270
1271    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1272        // If `inner` is `None`, then `poll()` has already completed.
1273        #[cfg(all(tokio_unstable, feature = "tracing"))]
1274        let _res_span = self.resource_span.clone().entered();
1275        #[cfg(all(tokio_unstable, feature = "tracing"))]
1276        let _ao_span = self.async_op_span.clone().entered();
1277        #[cfg(all(tokio_unstable, feature = "tracing"))]
1278        let _ao_poll_span = self.async_op_poll_span.clone().entered();
1279
1280        let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
1281            #[cfg(all(tokio_unstable, feature = "tracing"))]
1282            let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx))).map_err(Into::into);
1283
1284            #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
1285            let res = ready!(inner.poll_recv(cx)).map_err(Into::into);
1286
1287            res
1288        } else {
1289            panic!("called after complete");
1290        };
1291
1292        self.inner = None;
1293        Ready(ret)
1294    }
1295}
1296
1297impl<T> Inner<T> {
1298    fn complete(&self) -> bool {
1299        let prev = State::set_complete(&self.state);
1300
1301        if prev.is_closed() {
1302            return false;
1303        }
1304
1305        if prev.is_rx_task_set() {
1306            // TODO: Consume waker?
1307            unsafe {
1308                self.rx_task.with_task(Waker::wake_by_ref);
1309            }
1310        }
1311
1312        true
1313    }
1314
1315    fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1316        ready!(crate::trace::trace_leaf(cx));
1317        // Keep track of task budget
1318        let coop = ready!(crate::task::coop::poll_proceed(cx));
1319
1320        // Load the state
1321        let mut state = State::load(&self.state, Acquire);
1322
1323        if state.is_complete() {
1324            coop.made_progress();
1325            match unsafe { self.consume_value() } {
1326                Some(value) => Ready(Ok(value)),
1327                None => Ready(Err(RecvError(()))),
1328            }
1329        } else if state.is_closed() {
1330            coop.made_progress();
1331            Ready(Err(RecvError(())))
1332        } else {
1333            if state.is_rx_task_set() {
1334                let will_notify = unsafe { self.rx_task.will_wake(cx) };
1335
1336                // Check if the task is still the same
1337                if !will_notify {
1338                    // Unset the task
1339                    state = State::unset_rx_task(&self.state);
1340                    if state.is_complete() {
1341                        // Set the flag again so that the waker is released in drop
1342                        State::set_rx_task(&self.state);
1343
1344                        coop.made_progress();
1345                        // SAFETY: If `state.is_complete()` returns true, then the
1346                        // `VALUE_SENT` bit has been set and the sender side of the
1347                        // channel will no longer attempt to access the inner
1348                        // `UnsafeCell`. Therefore, it is now safe for us to access the
1349                        // cell.
1350                        return match unsafe { self.consume_value() } {
1351                            Some(value) => Ready(Ok(value)),
1352                            None => Ready(Err(RecvError(()))),
1353                        };
1354                    } else {
1355                        unsafe { self.rx_task.drop_task() };
1356                    }
1357                }
1358            }
1359
1360            if !state.is_rx_task_set() {
1361                // Attempt to set the task
1362                unsafe {
1363                    self.rx_task.set_task(cx);
1364                }
1365
1366                // Update the state
1367                state = State::set_rx_task(&self.state);
1368
1369                if state.is_complete() {
1370                    coop.made_progress();
1371                    match unsafe { self.consume_value() } {
1372                        Some(value) => Ready(Ok(value)),
1373                        None => Ready(Err(RecvError(()))),
1374                    }
1375                } else {
1376                    Pending
1377                }
1378            } else {
1379                Pending
1380            }
1381        }
1382    }
1383
1384    /// Called by `Receiver` to indicate that the value will never be received.
1385    fn close(&self) -> State {
1386        let prev = State::set_closed(&self.state);
1387
1388        if prev.is_tx_task_set() && !prev.is_complete() {
1389            unsafe {
1390                self.tx_task.with_task(Waker::wake_by_ref);
1391            }
1392        }
1393
1394        prev
1395    }
1396
1397    /// Consumes the value. This function does not check `state`.
1398    ///
1399    /// # Safety
1400    ///
1401    /// Calling this method concurrently on multiple threads will result in a
1402    /// data race. The `VALUE_SENT` state bit is used to ensure that only the
1403    /// sender *or* the receiver will call this method at a given point in time.
1404    /// If `VALUE_SENT` is not set, then only the sender may call this method;
1405    /// if it is set, then only the receiver may call this method.
1406    unsafe fn consume_value(&self) -> Option<T> {
1407        self.value.with_mut(|ptr| unsafe { (*ptr).take() })
1408    }
1409
1410    /// Returns true if there is a value. This function does not check `state`.
1411    ///
1412    /// # Safety
1413    ///
1414    /// Calling this method concurrently on multiple threads will result in a
1415    /// data race. The `VALUE_SENT` state bit is used to ensure that only the
1416    /// sender *or* the receiver will call this method at a given point in time.
1417    /// If `VALUE_SENT` is not set, then only the sender may call this method;
1418    /// if it is set, then only the receiver may call this method.
1419    unsafe fn has_value(&self) -> bool {
1420        self.value.with(|ptr| unsafe { (*ptr).is_some() })
1421    }
1422}
1423
1424unsafe impl<T: Send> Send for Inner<T> {}
1425unsafe impl<T: Send> Sync for Inner<T> {}
1426
1427fn mut_load(this: &mut AtomicUsize) -> usize {
1428    this.with_mut(|v| *v)
1429}
1430
1431impl<T> Drop for Inner<T> {
1432    fn drop(&mut self) {
1433        let state = State(mut_load(&mut self.state));
1434
1435        if state.is_rx_task_set() {
1436            unsafe {
1437                self.rx_task.drop_task();
1438            }
1439        }
1440
1441        if state.is_tx_task_set() {
1442            unsafe {
1443                self.tx_task.drop_task();
1444            }
1445        }
1446
1447        // SAFETY: we have `&mut self`, and therefore we have
1448        // exclusive access to the value.
1449        unsafe {
1450            // Note: the assertion holds because if the value has been sent by sender,
1451            // we must ensure that the value must have been consumed by the receiver before
1452            // dropping the `Inner`.
1453            debug_assert!(self.consume_value().is_none());
1454        }
1455    }
1456}
1457
1458impl<T: fmt::Debug> fmt::Debug for Inner<T> {
1459    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1460        use std::sync::atomic::Ordering::Relaxed;
1461
1462        fmt.debug_struct("Inner")
1463            .field("state", &State::load(&self.state, Relaxed))
1464            .finish()
1465    }
1466}
1467
1468/// Indicates that a waker for the receiving task has been set.
1469///
1470/// # Safety
1471///
1472/// If this bit is not set, the `rx_task` field may be uninitialized.
1473const RX_TASK_SET: usize = 0b00001;
1474/// Indicates that a value has been stored in the channel's inner `UnsafeCell`.
1475///
1476/// # Safety
1477///
1478/// This bit controls which side of the channel is permitted to access the
1479/// `UnsafeCell`. If it is set, the `UnsafeCell` may ONLY be accessed by the
1480/// receiver. If this bit is NOT set, the `UnsafeCell` may ONLY be accessed by
1481/// the sender.
1482const VALUE_SENT: usize = 0b00010;
1483const CLOSED: usize = 0b00100;
1484
1485/// Indicates that a waker for the sending task has been set.
1486///
1487/// # Safety
1488///
1489/// If this bit is not set, the `tx_task` field may be uninitialized.
1490const TX_TASK_SET: usize = 0b01000;
1491
1492impl State {
1493    fn new() -> State {
1494        State(0)
1495    }
1496
1497    fn is_complete(self) -> bool {
1498        self.0 & VALUE_SENT == VALUE_SENT
1499    }
1500
1501    fn set_complete(cell: &AtomicUsize) -> State {
1502        // This method is a compare-and-swap loop rather than a fetch-or like
1503        // other `set_$WHATEVER` methods on `State`. This is because we must
1504        // check if the state has been closed before setting the `VALUE_SENT`
1505        // bit.
1506        //
1507        // We don't want to set both the `VALUE_SENT` bit if the `CLOSED`
1508        // bit is already set, because `VALUE_SENT` will tell the receiver that
1509        // it's okay to access the inner `UnsafeCell`. Immediately after calling
1510        // `set_complete`, if the channel was closed, the sender will _also_
1511        // access the `UnsafeCell` to take the value back out, so if a
1512        // `poll_recv` or `try_recv` call is occurring concurrently, both
1513        // threads may try to access the `UnsafeCell` if we were to set the
1514        // `VALUE_SENT` bit on a closed channel.
1515        let mut state = cell.load(Ordering::Relaxed);
1516        loop {
1517            if State(state).is_closed() {
1518                break;
1519            }
1520            // TODO: This could be `Release`, followed by an `Acquire` fence *if*
1521            // the `RX_TASK_SET` flag is set. However, `loom` does not support
1522            // fences yet.
1523            match cell.compare_exchange_weak(
1524                state,
1525                state | VALUE_SENT,
1526                Ordering::AcqRel,
1527                Ordering::Acquire,
1528            ) {
1529                Ok(_) => break,
1530                Err(actual) => state = actual,
1531            }
1532        }
1533        State(state)
1534    }
1535
1536    fn is_rx_task_set(self) -> bool {
1537        self.0 & RX_TASK_SET == RX_TASK_SET
1538    }
1539
1540    fn set_rx_task(cell: &AtomicUsize) -> State {
1541        let val = cell.fetch_or(RX_TASK_SET, AcqRel);
1542        State(val | RX_TASK_SET)
1543    }
1544
1545    fn unset_rx_task(cell: &AtomicUsize) -> State {
1546        let val = cell.fetch_and(!RX_TASK_SET, AcqRel);
1547        State(val & !RX_TASK_SET)
1548    }
1549
1550    fn is_closed(self) -> bool {
1551        self.0 & CLOSED == CLOSED
1552    }
1553
1554    fn set_closed(cell: &AtomicUsize) -> State {
1555        // Acquire because we want all later writes (attempting to poll) to be
1556        // ordered after this.
1557        let val = cell.fetch_or(CLOSED, Acquire);
1558        State(val)
1559    }
1560
1561    fn set_tx_task(cell: &AtomicUsize) -> State {
1562        let val = cell.fetch_or(TX_TASK_SET, AcqRel);
1563        State(val | TX_TASK_SET)
1564    }
1565
1566    fn unset_tx_task(cell: &AtomicUsize) -> State {
1567        let val = cell.fetch_and(!TX_TASK_SET, AcqRel);
1568        State(val & !TX_TASK_SET)
1569    }
1570
1571    fn is_tx_task_set(self) -> bool {
1572        self.0 & TX_TASK_SET == TX_TASK_SET
1573    }
1574
1575    fn as_usize(self) -> usize {
1576        self.0
1577    }
1578
1579    fn load(cell: &AtomicUsize, order: Ordering) -> State {
1580        let val = cell.load(order);
1581        State(val)
1582    }
1583}
1584
1585impl fmt::Debug for State {
1586    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1587        fmt.debug_struct("State")
1588            .field("is_complete", &self.is_complete())
1589            .field("is_closed", &self.is_closed())
1590            .field("is_rx_task_set", &self.is_rx_task_set())
1591            .field("is_tx_task_set", &self.is_tx_task_set())
1592            .finish()
1593    }
1594}