tokio/sync/oneshot.rs
1#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3//! A one-shot channel is used for sending a single message between
4//! asynchronous tasks. The [`channel`] function is used to create a
5//! [`Sender`] and [`Receiver`] handle pair that form the channel.
6//!
7//! The `Sender` handle is used by the producer to send the value.
8//! The `Receiver` handle is used by the consumer to receive the value.
9//!
10//! Each handle can be used on separate tasks.
11//!
12//! Since the `send` method is not async, it can be used anywhere. This includes
13//! sending between two runtimes, and using it from non-async code.
14//!
15//! If the [`Receiver`] is closed before receiving a message which has already
16//! been sent, the message will remain in the channel until the receiver is
17//! dropped, at which point the message will be dropped immediately.
18//!
19//! # Examples
20//!
21//! ```
22//! use tokio::sync::oneshot;
23//!
24//! # #[tokio::main(flavor = "current_thread")]
25//! # async fn main() {
26//! let (tx, rx) = oneshot::channel();
27//!
28//! tokio::spawn(async move {
29//! if let Err(_) = tx.send(3) {
30//! println!("the receiver dropped");
31//! }
32//! });
33//!
34//! match rx.await {
35//! Ok(v) => println!("got = {:?}", v),
36//! Err(_) => println!("the sender dropped"),
37//! }
38//! # }
39//! ```
40//!
41//! If the sender is dropped without sending, the receiver will fail with
42//! [`error::RecvError`]:
43//!
44//! ```
45//! use tokio::sync::oneshot;
46//!
47//! # #[tokio::main(flavor = "current_thread")]
48//! # async fn main() {
49//! let (tx, rx) = oneshot::channel::<u32>();
50//!
51//! tokio::spawn(async move {
52//! drop(tx);
53//! });
54//!
55//! match rx.await {
56//! Ok(_) => panic!("This doesn't happen"),
57//! Err(_) => println!("the sender dropped"),
58//! }
59//! # }
60//! ```
61//!
62//! To use a `oneshot` channel in a `tokio::select!` loop, add `&mut` in front of
63//! the channel.
64//!
65//! ```
66//! use tokio::sync::oneshot;
67//! use tokio::time::{interval, sleep, Duration};
68//!
69//! # #[tokio::main(flavor = "current_thread")]
70//! # async fn _doc() {}
71//! # #[tokio::main(flavor = "current_thread", start_paused = true)]
72//! # async fn main() {
73//! let (send, mut recv) = oneshot::channel();
74//! let mut interval = interval(Duration::from_millis(100));
75//!
76//! # let handle =
77//! tokio::spawn(async move {
78//! sleep(Duration::from_secs(1)).await;
79//! send.send("shut down").unwrap();
80//! });
81//!
82//! loop {
83//! tokio::select! {
84//! _ = interval.tick() => println!("Another 100ms"),
85//! msg = &mut recv => {
86//! println!("Got message: {}", msg.unwrap());
87//! break;
88//! }
89//! }
90//! }
91//! # handle.await.unwrap();
92//! # }
93//! ```
94//!
95//! To use a `Sender` from a destructor, put it in an [`Option`] and call
96//! [`Option::take`].
97//!
98//! ```
99//! use tokio::sync::oneshot;
100//!
101//! struct SendOnDrop {
102//! sender: Option<oneshot::Sender<&'static str>>,
103//! }
104//! impl Drop for SendOnDrop {
105//! fn drop(&mut self) {
106//! if let Some(sender) = self.sender.take() {
107//! // Using `let _ =` to ignore send errors.
108//! let _ = sender.send("I got dropped!");
109//! }
110//! }
111//! }
112//!
113//! # #[tokio::main(flavor = "current_thread")]
114//! # async fn _doc() {}
115//! # #[tokio::main(flavor = "current_thread")]
116//! # async fn main() {
117//! let (send, recv) = oneshot::channel();
118//!
119//! let send_on_drop = SendOnDrop { sender: Some(send) };
120//! drop(send_on_drop);
121//!
122//! assert_eq!(recv.await, Ok("I got dropped!"));
123//! # }
124//! ```
125
126use crate::loom::cell::UnsafeCell;
127use crate::loom::sync::atomic::AtomicUsize;
128use crate::loom::sync::Arc;
129#[cfg(all(tokio_unstable, feature = "tracing"))]
130use crate::util::trace;
131
132use std::fmt;
133use std::future::Future;
134use std::mem::MaybeUninit;
135use std::pin::Pin;
136use std::sync::atomic::Ordering::{self, AcqRel, Acquire};
137use std::task::Poll::{Pending, Ready};
138use std::task::{ready, Context, Poll, Waker};
139
140/// Sends a value to the associated [`Receiver`].
141///
142/// A pair of both a [`Sender`] and a [`Receiver`] are created by the
143/// [`channel`](fn@channel) function.
144///
145/// # Examples
146///
147/// ```
148/// use tokio::sync::oneshot;
149///
150/// # #[tokio::main(flavor = "current_thread")]
151/// # async fn main() {
152/// let (tx, rx) = oneshot::channel();
153///
154/// tokio::spawn(async move {
155/// if let Err(_) = tx.send(3) {
156/// println!("the receiver dropped");
157/// }
158/// });
159///
160/// match rx.await {
161/// Ok(v) => println!("got = {:?}", v),
162/// Err(_) => println!("the sender dropped"),
163/// }
164/// # }
165/// ```
166///
167/// If the sender is dropped without sending, the receiver will fail with
168/// [`error::RecvError`]:
169///
170/// ```
171/// use tokio::sync::oneshot;
172///
173/// # #[tokio::main(flavor = "current_thread")]
174/// # async fn main() {
175/// let (tx, rx) = oneshot::channel::<u32>();
176///
177/// tokio::spawn(async move {
178/// drop(tx);
179/// });
180///
181/// match rx.await {
182/// Ok(_) => panic!("This doesn't happen"),
183/// Err(_) => println!("the sender dropped"),
184/// }
185/// # }
186/// ```
187///
188/// To use a `Sender` from a destructor, put it in an [`Option`] and call
189/// [`Option::take`].
190///
191/// ```
192/// use tokio::sync::oneshot;
193///
194/// struct SendOnDrop {
195/// sender: Option<oneshot::Sender<&'static str>>,
196/// }
197/// impl Drop for SendOnDrop {
198/// fn drop(&mut self) {
199/// if let Some(sender) = self.sender.take() {
200/// // Using `let _ =` to ignore send errors.
201/// let _ = sender.send("I got dropped!");
202/// }
203/// }
204/// }
205///
206/// # #[tokio::main(flavor = "current_thread")]
207/// # async fn _doc() {}
208/// # #[tokio::main(flavor = "current_thread")]
209/// # async fn main() {
210/// let (send, recv) = oneshot::channel();
211///
212/// let send_on_drop = SendOnDrop { sender: Some(send) };
213/// drop(send_on_drop);
214///
215/// assert_eq!(recv.await, Ok("I got dropped!"));
216/// # }
217/// ```
218///
219/// [`Option`]: std::option::Option
220/// [`Option::take`]: std::option::Option::take
221#[derive(Debug)]
222pub struct Sender<T> {
223 inner: Option<Arc<Inner<T>>>,
224 #[cfg(all(tokio_unstable, feature = "tracing"))]
225 resource_span: tracing::Span,
226}
227
228/// Receives a value from the associated [`Sender`].
229///
230/// A pair of both a [`Sender`] and a [`Receiver`] are created by the
231/// [`channel`](fn@channel) function.
232///
233/// This channel has no `recv` method because the receiver itself implements the
234/// [`Future`] trait. To receive a `Result<T, `[`error::RecvError`]`>`, `.await` the `Receiver` object directly.
235///
236/// The `poll` method on the `Future` trait is allowed to spuriously return
237/// `Poll::Pending` even if the message has been sent. If such a spurious
238/// failure happens, then the caller will be woken when the spurious failure has
239/// been resolved so that the caller can attempt to receive the message again.
240/// Note that receiving such a wakeup does not guarantee that the next call will
241/// succeed — it could fail with another spurious failure. (A spurious failure
242/// does not mean that the message is lost. It is just delayed.)
243///
244/// [`Future`]: trait@std::future::Future
245///
246/// # Cancellation safety
247///
248/// The `Receiver` is cancel safe. If it is used as the event in a
249/// [`tokio::select!`](crate::select) statement and some other branch
250/// completes first, it is guaranteed that no message was received on this
251/// channel.
252///
253/// # Examples
254///
255/// ```
256/// use tokio::sync::oneshot;
257///
258/// # #[tokio::main(flavor = "current_thread")]
259/// # async fn main() {
260/// let (tx, rx) = oneshot::channel();
261///
262/// tokio::spawn(async move {
263/// if let Err(_) = tx.send(3) {
264/// println!("the receiver dropped");
265/// }
266/// });
267///
268/// match rx.await {
269/// Ok(v) => println!("got = {:?}", v),
270/// Err(_) => println!("the sender dropped"),
271/// }
272/// # }
273/// ```
274///
275/// If the sender is dropped without sending, the receiver will fail with
276/// [`error::RecvError`]:
277///
278/// ```
279/// use tokio::sync::oneshot;
280///
281/// # #[tokio::main(flavor = "current_thread")]
282/// # async fn main() {
283/// let (tx, rx) = oneshot::channel::<u32>();
284///
285/// tokio::spawn(async move {
286/// drop(tx);
287/// });
288///
289/// match rx.await {
290/// Ok(_) => panic!("This doesn't happen"),
291/// Err(_) => println!("the sender dropped"),
292/// }
293/// # }
294/// ```
295///
296/// To use a `Receiver` in a `tokio::select!` loop, add `&mut` in front of the
297/// channel.
298///
299/// ```
300/// use tokio::sync::oneshot;
301/// use tokio::time::{interval, sleep, Duration};
302///
303/// # #[tokio::main(flavor = "current_thread")]
304/// # async fn _doc() {}
305/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
306/// # async fn main() {
307/// let (send, mut recv) = oneshot::channel();
308/// let mut interval = interval(Duration::from_millis(100));
309///
310/// # let handle =
311/// tokio::spawn(async move {
312/// sleep(Duration::from_secs(1)).await;
313/// send.send("shut down").unwrap();
314/// });
315///
316/// loop {
317/// tokio::select! {
318/// _ = interval.tick() => println!("Another 100ms"),
319/// msg = &mut recv => {
320/// println!("Got message: {}", msg.unwrap());
321/// break;
322/// }
323/// }
324/// }
325/// # handle.await.unwrap();
326/// # }
327/// ```
328#[derive(Debug)]
329pub struct Receiver<T> {
330 inner: Option<Arc<Inner<T>>>,
331 #[cfg(all(tokio_unstable, feature = "tracing"))]
332 resource_span: tracing::Span,
333 #[cfg(all(tokio_unstable, feature = "tracing"))]
334 async_op_span: tracing::Span,
335 #[cfg(all(tokio_unstable, feature = "tracing"))]
336 async_op_poll_span: tracing::Span,
337}
338
339pub mod error {
340 //! `Oneshot` error types.
341
342 use std::fmt;
343
344 /// Error returned by the `Future` implementation for `Receiver`.
345 ///
346 /// This error is returned by the receiver when the sender is dropped without sending.
347 #[derive(Debug, Eq, PartialEq, Clone)]
348 pub struct RecvError(pub(super) ());
349
350 /// Error returned by the `try_recv` function on `Receiver`.
351 #[derive(Debug, Eq, PartialEq, Clone)]
352 pub enum TryRecvError {
353 /// The send half of the channel has not yet sent a value.
354 Empty,
355
356 /// The send half of the channel was dropped without sending a value.
357 Closed,
358 }
359
360 // ===== impl RecvError =====
361
362 impl fmt::Display for RecvError {
363 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
364 write!(fmt, "channel closed")
365 }
366 }
367
368 impl std::error::Error for RecvError {}
369
370 // ===== impl TryRecvError =====
371
372 impl fmt::Display for TryRecvError {
373 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
374 match self {
375 TryRecvError::Empty => write!(fmt, "channel empty"),
376 TryRecvError::Closed => write!(fmt, "channel closed"),
377 }
378 }
379 }
380
381 impl std::error::Error for TryRecvError {}
382}
383
384use self::error::*;
385
386struct Inner<T> {
387 /// Manages the state of the inner cell.
388 state: AtomicUsize,
389
390 /// The value. This is set by `Sender` and read by `Receiver`. The state of
391 /// the cell is tracked by `state`.
392 value: UnsafeCell<Option<T>>,
393
394 /// The task to notify when the receiver drops without consuming the value.
395 ///
396 /// ## Safety
397 ///
398 /// The `TX_TASK_SET` bit in the `state` field is set if this field is
399 /// initialized. If that bit is unset, this field may be uninitialized.
400 tx_task: Task,
401
402 /// The task to notify when the value is sent.
403 ///
404 /// ## Safety
405 ///
406 /// The `RX_TASK_SET` bit in the `state` field is set if this field is
407 /// initialized. If that bit is unset, this field may be uninitialized.
408 rx_task: Task,
409}
410
411struct Task(UnsafeCell<MaybeUninit<Waker>>);
412
413impl Task {
414 /// # Safety
415 ///
416 /// The caller must do the necessary synchronization to ensure that
417 /// the [`Self::0`] contains the valid [`Waker`] during the call.
418 unsafe fn will_wake(&self, cx: &mut Context<'_>) -> bool {
419 unsafe { self.with_task(|w| w.will_wake(cx.waker())) }
420 }
421
422 /// # Safety
423 ///
424 /// The caller must do the necessary synchronization to ensure that
425 /// the [`Self::0`] contains the valid [`Waker`] during the call.
426 unsafe fn with_task<F, R>(&self, f: F) -> R
427 where
428 F: FnOnce(&Waker) -> R,
429 {
430 self.0.with(|ptr| {
431 let waker: *const Waker = unsafe { (*ptr).as_ptr() };
432 f(unsafe { &*waker })
433 })
434 }
435
436 /// # Safety
437 ///
438 /// The caller must do the necessary synchronization to ensure that
439 /// the [`Self::0`] contains the valid [`Waker`] during the call.
440 unsafe fn drop_task(&self) {
441 self.0.with_mut(|ptr| {
442 let ptr: *mut Waker = unsafe { (*ptr).as_mut_ptr() };
443 unsafe {
444 ptr.drop_in_place();
445 }
446 });
447 }
448
449 /// # Safety
450 ///
451 /// The caller must do the necessary synchronization to ensure that
452 /// the [`Self::0`] contains the valid [`Waker`] during the call.
453 unsafe fn set_task(&self, cx: &mut Context<'_>) {
454 self.0.with_mut(|ptr| {
455 let ptr: *mut Waker = unsafe { (*ptr).as_mut_ptr() };
456 unsafe {
457 ptr.write(cx.waker().clone());
458 }
459 });
460 }
461}
462
463#[derive(Clone, Copy)]
464struct State(usize);
465
466/// Creates a new one-shot channel for sending single values across asynchronous
467/// tasks.
468///
469/// The function returns separate "send" and "receive" handles. The `Sender`
470/// handle is used by the producer to send the value. The `Receiver` handle is
471/// used by the consumer to receive the value.
472///
473/// Each handle can be used on separate tasks.
474///
475/// # Examples
476///
477/// ```
478/// use tokio::sync::oneshot;
479///
480/// # #[tokio::main(flavor = "current_thread")]
481/// # async fn main() {
482/// let (tx, rx) = oneshot::channel();
483///
484/// tokio::spawn(async move {
485/// if let Err(_) = tx.send(3) {
486/// println!("the receiver dropped");
487/// }
488/// });
489///
490/// match rx.await {
491/// Ok(v) => println!("got = {:?}", v),
492/// Err(_) => println!("the sender dropped"),
493/// }
494/// # }
495/// ```
496#[track_caller]
497pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
498 #[cfg(all(tokio_unstable, feature = "tracing"))]
499 let resource_span = {
500 let location = std::panic::Location::caller();
501
502 let resource_span = tracing::trace_span!(
503 parent: None,
504 "runtime.resource",
505 concrete_type = "Sender|Receiver",
506 kind = "Sync",
507 loc.file = location.file(),
508 loc.line = location.line(),
509 loc.col = location.column(),
510 );
511
512 resource_span.in_scope(|| {
513 tracing::trace!(
514 target: "runtime::resource::state_update",
515 tx_dropped = false,
516 tx_dropped.op = "override",
517 )
518 });
519
520 resource_span.in_scope(|| {
521 tracing::trace!(
522 target: "runtime::resource::state_update",
523 rx_dropped = false,
524 rx_dropped.op = "override",
525 )
526 });
527
528 resource_span.in_scope(|| {
529 tracing::trace!(
530 target: "runtime::resource::state_update",
531 value_sent = false,
532 value_sent.op = "override",
533 )
534 });
535
536 resource_span.in_scope(|| {
537 tracing::trace!(
538 target: "runtime::resource::state_update",
539 value_received = false,
540 value_received.op = "override",
541 )
542 });
543
544 resource_span
545 };
546
547 let inner = Arc::new(Inner {
548 state: AtomicUsize::new(State::new().as_usize()),
549 value: UnsafeCell::new(None),
550 tx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
551 rx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
552 });
553
554 let tx = Sender {
555 inner: Some(inner.clone()),
556 #[cfg(all(tokio_unstable, feature = "tracing"))]
557 resource_span: resource_span.clone(),
558 };
559
560 #[cfg(all(tokio_unstable, feature = "tracing"))]
561 let async_op_span = resource_span
562 .in_scope(|| tracing::trace_span!("runtime.resource.async_op", source = "Receiver::await"));
563
564 #[cfg(all(tokio_unstable, feature = "tracing"))]
565 let async_op_poll_span =
566 async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll"));
567
568 let rx = Receiver {
569 inner: Some(inner),
570 #[cfg(all(tokio_unstable, feature = "tracing"))]
571 resource_span,
572 #[cfg(all(tokio_unstable, feature = "tracing"))]
573 async_op_span,
574 #[cfg(all(tokio_unstable, feature = "tracing"))]
575 async_op_poll_span,
576 };
577
578 (tx, rx)
579}
580
581impl<T> Sender<T> {
582 /// Attempts to send a value on this channel, returning it back if it could
583 /// not be sent.
584 ///
585 /// This method consumes `self` as only one value may ever be sent on a `oneshot`
586 /// channel. It is not marked async because sending a message to an `oneshot`
587 /// channel never requires any form of waiting. Because of this, the `send`
588 /// method can be used in both synchronous and asynchronous code without
589 /// problems.
590 ///
591 /// A successful send occurs when it is determined that the other end of the
592 /// channel has not hung up already. An unsuccessful send would be one where
593 /// the corresponding receiver has already been deallocated. Note that a
594 /// return value of `Err` means that the data will never be received, but
595 /// a return value of `Ok` does *not* mean that the data will be received.
596 /// It is possible for the corresponding receiver to hang up immediately
597 /// after this function returns `Ok`.
598 ///
599 /// # Examples
600 ///
601 /// Send a value to another task
602 ///
603 /// ```
604 /// use tokio::sync::oneshot;
605 ///
606 /// # #[tokio::main(flavor = "current_thread")]
607 /// # async fn main() {
608 /// let (tx, rx) = oneshot::channel();
609 ///
610 /// tokio::spawn(async move {
611 /// if let Err(_) = tx.send(3) {
612 /// println!("the receiver dropped");
613 /// }
614 /// });
615 ///
616 /// match rx.await {
617 /// Ok(v) => println!("got = {:?}", v),
618 /// Err(_) => println!("the sender dropped"),
619 /// }
620 /// # }
621 /// ```
622 pub fn send(mut self, t: T) -> Result<(), T> {
623 let inner = self.inner.take().unwrap();
624
625 inner.value.with_mut(|ptr| unsafe {
626 // SAFETY: The receiver will not access the `UnsafeCell` unless the
627 // channel has been marked as "complete" (the `VALUE_SENT` state bit
628 // is set).
629 // That bit is only set by the sender later on in this method, and
630 // calling this method consumes `self`. Therefore, if it was possible to
631 // call this method, we know that the `VALUE_SENT` bit is unset, and
632 // the receiver is not currently accessing the `UnsafeCell`.
633 *ptr = Some(t);
634 });
635
636 if !inner.complete() {
637 unsafe {
638 // SAFETY: The receiver will not access the `UnsafeCell` unless
639 // the channel has been marked as "complete". Calling
640 // `complete()` will return true if this bit is set, and false
641 // if it is not set. Thus, if `complete()` returned false, it is
642 // safe for us to access the value, because we know that the
643 // receiver will not.
644 return Err(inner.consume_value().unwrap());
645 }
646 }
647
648 #[cfg(all(tokio_unstable, feature = "tracing"))]
649 self.resource_span.in_scope(|| {
650 tracing::trace!(
651 target: "runtime::resource::state_update",
652 value_sent = true,
653 value_sent.op = "override",
654 )
655 });
656
657 Ok(())
658 }
659
660 /// Waits for the associated [`Receiver`] handle to close.
661 ///
662 /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
663 /// [`Receiver`] value is dropped.
664 ///
665 /// This function is useful when paired with `select!` to abort a
666 /// computation when the receiver is no longer interested in the result.
667 ///
668 /// # Return
669 ///
670 /// Returns a `Future` which must be awaited on.
671 ///
672 /// [`Receiver`]: Receiver
673 /// [`close`]: Receiver::close
674 ///
675 /// # Examples
676 ///
677 /// Basic usage
678 ///
679 /// ```
680 /// use tokio::sync::oneshot;
681 ///
682 /// # #[tokio::main(flavor = "current_thread")]
683 /// # async fn main() {
684 /// let (mut tx, rx) = oneshot::channel::<()>();
685 ///
686 /// tokio::spawn(async move {
687 /// drop(rx);
688 /// });
689 ///
690 /// tx.closed().await;
691 /// println!("the receiver dropped");
692 /// # }
693 /// ```
694 ///
695 /// Paired with select
696 ///
697 /// ```
698 /// use tokio::sync::oneshot;
699 /// use tokio::time::{self, Duration};
700 ///
701 /// async fn compute() -> String {
702 /// // Complex computation returning a `String`
703 /// # "hello".to_string()
704 /// }
705 ///
706 /// # #[tokio::main(flavor = "current_thread")]
707 /// # async fn main() {
708 /// let (mut tx, rx) = oneshot::channel();
709 ///
710 /// tokio::spawn(async move {
711 /// tokio::select! {
712 /// _ = tx.closed() => {
713 /// // The receiver dropped, no need to do any further work
714 /// }
715 /// value = compute() => {
716 /// // The send can fail if the channel was closed at the exact same
717 /// // time as when compute() finished, so just ignore the failure.
718 /// let _ = tx.send(value);
719 /// }
720 /// }
721 /// });
722 ///
723 /// // Wait for up to 10 seconds
724 /// let _ = time::timeout(Duration::from_secs(10), rx).await;
725 /// # }
726 /// ```
727 pub async fn closed(&mut self) {
728 use std::future::poll_fn;
729
730 #[cfg(all(tokio_unstable, feature = "tracing"))]
731 let resource_span = self.resource_span.clone();
732 #[cfg(all(tokio_unstable, feature = "tracing"))]
733 let closed = trace::async_op(
734 || poll_fn(|cx| self.poll_closed(cx)),
735 resource_span,
736 "Sender::closed",
737 "poll_closed",
738 false,
739 );
740 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
741 let closed = poll_fn(|cx| self.poll_closed(cx));
742
743 closed.await;
744 }
745
746 /// Returns `true` if the associated [`Receiver`] handle has been dropped.
747 ///
748 /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
749 /// [`Receiver`] value is dropped.
750 ///
751 /// If `true` is returned, a call to `send` will always result in an error.
752 ///
753 /// [`Receiver`]: Receiver
754 /// [`close`]: Receiver::close
755 ///
756 /// # Examples
757 ///
758 /// ```
759 /// use tokio::sync::oneshot;
760 ///
761 /// # #[tokio::main(flavor = "current_thread")]
762 /// # async fn main() {
763 /// let (tx, rx) = oneshot::channel();
764 ///
765 /// assert!(!tx.is_closed());
766 ///
767 /// drop(rx);
768 ///
769 /// assert!(tx.is_closed());
770 /// assert!(tx.send("never received").is_err());
771 /// # }
772 /// ```
773 pub fn is_closed(&self) -> bool {
774 let inner = self.inner.as_ref().unwrap();
775
776 let state = State::load(&inner.state, Acquire);
777 state.is_closed()
778 }
779
780 /// Checks whether the `oneshot` channel has been closed, and if not, schedules the
781 /// `Waker` in the provided `Context` to receive a notification when the channel is
782 /// closed.
783 ///
784 /// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the
785 /// [`Receiver`] value is dropped.
786 ///
787 /// Note that on multiple calls to poll, only the `Waker` from the `Context` passed
788 /// to the most recent call will be scheduled to receive a wakeup.
789 ///
790 /// [`Receiver`]: struct@crate::sync::oneshot::Receiver
791 /// [`close`]: fn@crate::sync::oneshot::Receiver::close
792 ///
793 /// # Return value
794 ///
795 /// This function returns:
796 ///
797 /// * `Poll::Pending` if the channel is still open.
798 /// * `Poll::Ready(())` if the channel is closed.
799 ///
800 /// # Examples
801 ///
802 /// ```
803 /// use tokio::sync::oneshot;
804 ///
805 /// use std::future::poll_fn;
806 ///
807 /// # #[tokio::main(flavor = "current_thread")]
808 /// # async fn main() {
809 /// let (mut tx, mut rx) = oneshot::channel::<()>();
810 ///
811 /// tokio::spawn(async move {
812 /// rx.close();
813 /// });
814 ///
815 /// poll_fn(|cx| tx.poll_closed(cx)).await;
816 ///
817 /// println!("the receiver dropped");
818 /// # }
819 /// ```
820 pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
821 ready!(crate::trace::trace_leaf(cx));
822
823 // Keep track of task budget
824 let coop = ready!(crate::task::coop::poll_proceed(cx));
825
826 let inner = self.inner.as_ref().unwrap();
827
828 let mut state = State::load(&inner.state, Acquire);
829
830 if state.is_closed() {
831 coop.made_progress();
832 return Ready(());
833 }
834
835 if state.is_tx_task_set() {
836 let will_notify = unsafe { inner.tx_task.will_wake(cx) };
837
838 if !will_notify {
839 state = State::unset_tx_task(&inner.state);
840
841 if state.is_closed() {
842 // Set the flag again so that the waker is released in drop
843 State::set_tx_task(&inner.state);
844 coop.made_progress();
845 return Ready(());
846 } else {
847 unsafe { inner.tx_task.drop_task() };
848 }
849 }
850 }
851
852 if !state.is_tx_task_set() {
853 // Attempt to set the task
854 unsafe {
855 inner.tx_task.set_task(cx);
856 }
857
858 // Update the state
859 state = State::set_tx_task(&inner.state);
860
861 if state.is_closed() {
862 coop.made_progress();
863 return Ready(());
864 }
865 }
866
867 Pending
868 }
869}
870
871impl<T> Drop for Sender<T> {
872 fn drop(&mut self) {
873 if let Some(inner) = self.inner.as_ref() {
874 inner.complete();
875 #[cfg(all(tokio_unstable, feature = "tracing"))]
876 self.resource_span.in_scope(|| {
877 tracing::trace!(
878 target: "runtime::resource::state_update",
879 tx_dropped = true,
880 tx_dropped.op = "override",
881 )
882 });
883 }
884 }
885}
886
887impl<T> Receiver<T> {
888 /// Prevents the associated [`Sender`] handle from sending a value.
889 ///
890 /// Any `send` operation which happens after calling `close` is guaranteed
891 /// to fail. After calling `close`, [`try_recv`] should be called to
892 /// receive a value if one was sent **before** the call to `close`
893 /// completed.
894 ///
895 /// This function is useful to perform a graceful shutdown and ensure that a
896 /// value will not be sent into the channel and never received.
897 ///
898 /// `close` is no-op if a message is already received or the channel
899 /// is already closed.
900 ///
901 /// [`Sender`]: Sender
902 /// [`try_recv`]: Receiver::try_recv
903 ///
904 /// # Examples
905 ///
906 /// Prevent a value from being sent
907 ///
908 /// ```
909 /// use tokio::sync::oneshot;
910 /// use tokio::sync::oneshot::error::TryRecvError;
911 ///
912 /// # #[tokio::main(flavor = "current_thread")]
913 /// # async fn main() {
914 /// let (tx, mut rx) = oneshot::channel();
915 ///
916 /// assert!(!tx.is_closed());
917 ///
918 /// rx.close();
919 ///
920 /// assert!(tx.is_closed());
921 /// assert!(tx.send("never received").is_err());
922 ///
923 /// match rx.try_recv() {
924 /// Err(TryRecvError::Closed) => {}
925 /// _ => unreachable!(),
926 /// }
927 /// # }
928 /// ```
929 ///
930 /// Receive a value sent **before** calling `close`
931 ///
932 /// ```
933 /// use tokio::sync::oneshot;
934 ///
935 /// # #[tokio::main(flavor = "current_thread")]
936 /// # async fn main() {
937 /// let (tx, mut rx) = oneshot::channel();
938 ///
939 /// assert!(tx.send("will receive").is_ok());
940 ///
941 /// rx.close();
942 ///
943 /// let msg = rx.try_recv().unwrap();
944 /// assert_eq!(msg, "will receive");
945 /// # }
946 /// ```
947 pub fn close(&mut self) {
948 if let Some(inner) = self.inner.as_ref() {
949 inner.close();
950 #[cfg(all(tokio_unstable, feature = "tracing"))]
951 self.resource_span.in_scope(|| {
952 tracing::trace!(
953 target: "runtime::resource::state_update",
954 rx_dropped = true,
955 rx_dropped.op = "override",
956 )
957 });
958 }
959 }
960
961 /// Checks if this receiver is terminated.
962 ///
963 /// This function returns true if this receiver has already yielded a [`Poll::Ready`] result.
964 /// If so, this receiver should no longer be polled.
965 ///
966 /// # Examples
967 ///
968 /// Sending a value and polling it.
969 ///
970 /// ```
971 /// use tokio::sync::oneshot;
972 ///
973 /// use std::task::Poll;
974 ///
975 /// # #[tokio::main(flavor = "current_thread")]
976 /// # async fn main() {
977 /// let (tx, mut rx) = oneshot::channel();
978 ///
979 /// // A receiver is not terminated when it is initialized.
980 /// assert!(!rx.is_terminated());
981 ///
982 /// // A receiver is not terminated it is polled and is still pending.
983 /// let poll = futures::poll!(&mut rx);
984 /// assert_eq!(poll, Poll::Pending);
985 /// assert!(!rx.is_terminated());
986 ///
987 /// // A receiver is not terminated if a value has been sent, but not yet read.
988 /// tx.send(0).unwrap();
989 /// assert!(!rx.is_terminated());
990 ///
991 /// // A receiver *is* terminated after it has been polled and yielded a value.
992 /// assert_eq!((&mut rx).await, Ok(0));
993 /// assert!(rx.is_terminated());
994 /// # }
995 /// ```
996 ///
997 /// Dropping the sender.
998 ///
999 /// ```
1000 /// use tokio::sync::oneshot;
1001 ///
1002 /// # #[tokio::main(flavor = "current_thread")]
1003 /// # async fn main() {
1004 /// let (tx, mut rx) = oneshot::channel::<()>();
1005 ///
1006 /// // A receiver is not immediately terminated when the sender is dropped.
1007 /// drop(tx);
1008 /// assert!(!rx.is_terminated());
1009 ///
1010 /// // A receiver *is* terminated after it has been polled and yielded an error.
1011 /// let _ = (&mut rx).await.unwrap_err();
1012 /// assert!(rx.is_terminated());
1013 /// # }
1014 /// ```
1015 pub fn is_terminated(&self) -> bool {
1016 self.inner.is_none()
1017 }
1018
1019 /// Checks if a channel is empty.
1020 ///
1021 /// This method returns `true` if the channel has no messages.
1022 ///
1023 /// It is not necessarily safe to poll an empty receiver, which may have
1024 /// already yielded a value. Use [`is_terminated()`][Self::is_terminated]
1025 /// to check whether or not a receiver can be safely polled, instead.
1026 ///
1027 /// # Examples
1028 ///
1029 /// Sending a value.
1030 ///
1031 /// ```
1032 /// use tokio::sync::oneshot;
1033 ///
1034 /// # #[tokio::main(flavor = "current_thread")]
1035 /// # async fn main() {
1036 /// let (tx, mut rx) = oneshot::channel();
1037 /// assert!(rx.is_empty());
1038 ///
1039 /// tx.send(0).unwrap();
1040 /// assert!(!rx.is_empty());
1041 ///
1042 /// let _ = (&mut rx).await;
1043 /// assert!(rx.is_empty());
1044 /// # }
1045 /// ```
1046 ///
1047 /// Dropping the sender.
1048 ///
1049 /// ```
1050 /// use tokio::sync::oneshot;
1051 ///
1052 /// # #[tokio::main(flavor = "current_thread")]
1053 /// # async fn main() {
1054 /// let (tx, mut rx) = oneshot::channel::<()>();
1055 ///
1056 /// // A channel is empty if the sender is dropped.
1057 /// drop(tx);
1058 /// assert!(rx.is_empty());
1059 ///
1060 /// // A closed channel still yields an error, however.
1061 /// (&mut rx).await.expect_err("should yield an error");
1062 /// assert!(rx.is_empty());
1063 /// # }
1064 /// ```
1065 ///
1066 /// Terminated channels are empty.
1067 ///
1068 /// ```should_panic,ignore-wasm
1069 /// use tokio::sync::oneshot;
1070 ///
1071 /// #[tokio::main]
1072 /// async fn main() {
1073 /// let (tx, mut rx) = oneshot::channel();
1074 /// tx.send(0).unwrap();
1075 /// let _ = (&mut rx).await;
1076 ///
1077 /// // NB: an empty channel is not necessarily safe to poll!
1078 /// assert!(rx.is_empty());
1079 /// let _ = (&mut rx).await;
1080 /// }
1081 /// ```
1082 pub fn is_empty(&self) -> bool {
1083 let Some(inner) = self.inner.as_ref() else {
1084 // The channel has already terminated.
1085 return true;
1086 };
1087
1088 let state = State::load(&inner.state, Acquire);
1089 if state.is_complete() {
1090 // SAFETY: If `state.is_complete()` returns true, then the
1091 // `VALUE_SENT` bit has been set and the sender side of the
1092 // channel will no longer attempt to access the inner
1093 // `UnsafeCell`. Therefore, it is now safe for us to access the
1094 // cell.
1095 //
1096 // The channel is empty if it does not have a value.
1097 unsafe { !inner.has_value() }
1098 } else {
1099 // The receiver closed the channel or no value has been sent yet.
1100 true
1101 }
1102 }
1103
1104 /// Attempts to receive a value.
1105 ///
1106 /// If a pending value exists in the channel, it is returned. If no value
1107 /// has been sent, the current task **will not** be registered for
1108 /// future notification.
1109 ///
1110 /// This function is useful to call from outside the context of an
1111 /// asynchronous task.
1112 ///
1113 /// Note that unlike the `poll` method, the `try_recv` method cannot fail
1114 /// spuriously. Any send or close event that happens before this call to
1115 /// `try_recv` will be correctly returned to the caller.
1116 ///
1117 /// # Return
1118 ///
1119 /// - `Ok(T)` if a value is pending in the channel.
1120 /// - `Err(TryRecvError::Empty)` if no value has been sent yet.
1121 /// - `Err(TryRecvError::Closed)` if the sender has dropped without sending
1122 /// a value, or if the message has already been received.
1123 ///
1124 /// # Examples
1125 ///
1126 /// `try_recv` before a value is sent, then after.
1127 ///
1128 /// ```
1129 /// use tokio::sync::oneshot;
1130 /// use tokio::sync::oneshot::error::TryRecvError;
1131 ///
1132 /// # #[tokio::main(flavor = "current_thread")]
1133 /// # async fn main() {
1134 /// let (tx, mut rx) = oneshot::channel();
1135 ///
1136 /// match rx.try_recv() {
1137 /// // The channel is currently empty
1138 /// Err(TryRecvError::Empty) => {}
1139 /// _ => unreachable!(),
1140 /// }
1141 ///
1142 /// // Send a value
1143 /// tx.send("hello").unwrap();
1144 ///
1145 /// match rx.try_recv() {
1146 /// Ok(value) => assert_eq!(value, "hello"),
1147 /// _ => unreachable!(),
1148 /// }
1149 /// # }
1150 /// ```
1151 ///
1152 /// `try_recv` when the sender dropped before sending a value
1153 ///
1154 /// ```
1155 /// use tokio::sync::oneshot;
1156 /// use tokio::sync::oneshot::error::TryRecvError;
1157 ///
1158 /// # #[tokio::main(flavor = "current_thread")]
1159 /// # async fn main() {
1160 /// let (tx, mut rx) = oneshot::channel::<()>();
1161 ///
1162 /// drop(tx);
1163 ///
1164 /// match rx.try_recv() {
1165 /// // The channel will never receive a value.
1166 /// Err(TryRecvError::Closed) => {}
1167 /// _ => unreachable!(),
1168 /// }
1169 /// # }
1170 /// ```
1171 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1172 let result = if let Some(inner) = self.inner.as_ref() {
1173 let state = State::load(&inner.state, Acquire);
1174
1175 if state.is_complete() {
1176 // SAFETY: If `state.is_complete()` returns true, then the
1177 // `VALUE_SENT` bit has been set and the sender side of the
1178 // channel will no longer attempt to access the inner
1179 // `UnsafeCell`. Therefore, it is now safe for us to access the
1180 // cell.
1181 match unsafe { inner.consume_value() } {
1182 Some(value) => {
1183 #[cfg(all(tokio_unstable, feature = "tracing"))]
1184 self.resource_span.in_scope(|| {
1185 tracing::trace!(
1186 target: "runtime::resource::state_update",
1187 value_received = true,
1188 value_received.op = "override",
1189 )
1190 });
1191 Ok(value)
1192 }
1193 None => Err(TryRecvError::Closed),
1194 }
1195 } else if state.is_closed() {
1196 Err(TryRecvError::Closed)
1197 } else {
1198 // Not ready, this does not clear `inner`
1199 return Err(TryRecvError::Empty);
1200 }
1201 } else {
1202 Err(TryRecvError::Closed)
1203 };
1204
1205 self.inner = None;
1206 result
1207 }
1208
1209 /// Blocking receive to call outside of asynchronous contexts.
1210 ///
1211 /// # Panics
1212 ///
1213 /// This function panics if called within an asynchronous execution
1214 /// context.
1215 ///
1216 /// # Examples
1217 ///
1218 /// ```
1219 /// # #[cfg(not(target_family = "wasm"))]
1220 /// # {
1221 /// use std::thread;
1222 /// use tokio::sync::oneshot;
1223 ///
1224 /// #[tokio::main]
1225 /// async fn main() {
1226 /// let (tx, rx) = oneshot::channel::<u8>();
1227 ///
1228 /// let sync_code = thread::spawn(move || {
1229 /// assert_eq!(Ok(10), rx.blocking_recv());
1230 /// });
1231 ///
1232 /// let _ = tx.send(10);
1233 /// sync_code.join().unwrap();
1234 /// }
1235 /// # }
1236 /// ```
1237 #[track_caller]
1238 #[cfg(feature = "sync")]
1239 #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
1240 pub fn blocking_recv(self) -> Result<T, RecvError> {
1241 crate::future::block_on(self)
1242 }
1243}
1244
1245impl<T> Drop for Receiver<T> {
1246 fn drop(&mut self) {
1247 if let Some(inner) = self.inner.as_ref() {
1248 let state = inner.close();
1249
1250 if state.is_complete() {
1251 // SAFETY: we have ensured that the `VALUE_SENT` bit has been set,
1252 // so only the receiver can access the value.
1253 drop(unsafe { inner.consume_value() });
1254 }
1255
1256 #[cfg(all(tokio_unstable, feature = "tracing"))]
1257 self.resource_span.in_scope(|| {
1258 tracing::trace!(
1259 target: "runtime::resource::state_update",
1260 rx_dropped = true,
1261 rx_dropped.op = "override",
1262 )
1263 });
1264 }
1265 }
1266}
1267
1268impl<T> Future for Receiver<T> {
1269 type Output = Result<T, RecvError>;
1270
1271 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1272 // If `inner` is `None`, then `poll()` has already completed.
1273 #[cfg(all(tokio_unstable, feature = "tracing"))]
1274 let _res_span = self.resource_span.clone().entered();
1275 #[cfg(all(tokio_unstable, feature = "tracing"))]
1276 let _ao_span = self.async_op_span.clone().entered();
1277 #[cfg(all(tokio_unstable, feature = "tracing"))]
1278 let _ao_poll_span = self.async_op_poll_span.clone().entered();
1279
1280 let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
1281 #[cfg(all(tokio_unstable, feature = "tracing"))]
1282 let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx))).map_err(Into::into);
1283
1284 #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
1285 let res = ready!(inner.poll_recv(cx)).map_err(Into::into);
1286
1287 res
1288 } else {
1289 panic!("called after complete");
1290 };
1291
1292 self.inner = None;
1293 Ready(ret)
1294 }
1295}
1296
1297impl<T> Inner<T> {
1298 fn complete(&self) -> bool {
1299 let prev = State::set_complete(&self.state);
1300
1301 if prev.is_closed() {
1302 return false;
1303 }
1304
1305 if prev.is_rx_task_set() {
1306 // TODO: Consume waker?
1307 unsafe {
1308 self.rx_task.with_task(Waker::wake_by_ref);
1309 }
1310 }
1311
1312 true
1313 }
1314
1315 fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1316 ready!(crate::trace::trace_leaf(cx));
1317 // Keep track of task budget
1318 let coop = ready!(crate::task::coop::poll_proceed(cx));
1319
1320 // Load the state
1321 let mut state = State::load(&self.state, Acquire);
1322
1323 if state.is_complete() {
1324 coop.made_progress();
1325 match unsafe { self.consume_value() } {
1326 Some(value) => Ready(Ok(value)),
1327 None => Ready(Err(RecvError(()))),
1328 }
1329 } else if state.is_closed() {
1330 coop.made_progress();
1331 Ready(Err(RecvError(())))
1332 } else {
1333 if state.is_rx_task_set() {
1334 let will_notify = unsafe { self.rx_task.will_wake(cx) };
1335
1336 // Check if the task is still the same
1337 if !will_notify {
1338 // Unset the task
1339 state = State::unset_rx_task(&self.state);
1340 if state.is_complete() {
1341 // Set the flag again so that the waker is released in drop
1342 State::set_rx_task(&self.state);
1343
1344 coop.made_progress();
1345 // SAFETY: If `state.is_complete()` returns true, then the
1346 // `VALUE_SENT` bit has been set and the sender side of the
1347 // channel will no longer attempt to access the inner
1348 // `UnsafeCell`. Therefore, it is now safe for us to access the
1349 // cell.
1350 return match unsafe { self.consume_value() } {
1351 Some(value) => Ready(Ok(value)),
1352 None => Ready(Err(RecvError(()))),
1353 };
1354 } else {
1355 unsafe { self.rx_task.drop_task() };
1356 }
1357 }
1358 }
1359
1360 if !state.is_rx_task_set() {
1361 // Attempt to set the task
1362 unsafe {
1363 self.rx_task.set_task(cx);
1364 }
1365
1366 // Update the state
1367 state = State::set_rx_task(&self.state);
1368
1369 if state.is_complete() {
1370 coop.made_progress();
1371 match unsafe { self.consume_value() } {
1372 Some(value) => Ready(Ok(value)),
1373 None => Ready(Err(RecvError(()))),
1374 }
1375 } else {
1376 Pending
1377 }
1378 } else {
1379 Pending
1380 }
1381 }
1382 }
1383
1384 /// Called by `Receiver` to indicate that the value will never be received.
1385 fn close(&self) -> State {
1386 let prev = State::set_closed(&self.state);
1387
1388 if prev.is_tx_task_set() && !prev.is_complete() {
1389 unsafe {
1390 self.tx_task.with_task(Waker::wake_by_ref);
1391 }
1392 }
1393
1394 prev
1395 }
1396
1397 /// Consumes the value. This function does not check `state`.
1398 ///
1399 /// # Safety
1400 ///
1401 /// Calling this method concurrently on multiple threads will result in a
1402 /// data race. The `VALUE_SENT` state bit is used to ensure that only the
1403 /// sender *or* the receiver will call this method at a given point in time.
1404 /// If `VALUE_SENT` is not set, then only the sender may call this method;
1405 /// if it is set, then only the receiver may call this method.
1406 unsafe fn consume_value(&self) -> Option<T> {
1407 self.value.with_mut(|ptr| unsafe { (*ptr).take() })
1408 }
1409
1410 /// Returns true if there is a value. This function does not check `state`.
1411 ///
1412 /// # Safety
1413 ///
1414 /// Calling this method concurrently on multiple threads will result in a
1415 /// data race. The `VALUE_SENT` state bit is used to ensure that only the
1416 /// sender *or* the receiver will call this method at a given point in time.
1417 /// If `VALUE_SENT` is not set, then only the sender may call this method;
1418 /// if it is set, then only the receiver may call this method.
1419 unsafe fn has_value(&self) -> bool {
1420 self.value.with(|ptr| unsafe { (*ptr).is_some() })
1421 }
1422}
1423
1424unsafe impl<T: Send> Send for Inner<T> {}
1425unsafe impl<T: Send> Sync for Inner<T> {}
1426
1427fn mut_load(this: &mut AtomicUsize) -> usize {
1428 this.with_mut(|v| *v)
1429}
1430
1431impl<T> Drop for Inner<T> {
1432 fn drop(&mut self) {
1433 let state = State(mut_load(&mut self.state));
1434
1435 if state.is_rx_task_set() {
1436 unsafe {
1437 self.rx_task.drop_task();
1438 }
1439 }
1440
1441 if state.is_tx_task_set() {
1442 unsafe {
1443 self.tx_task.drop_task();
1444 }
1445 }
1446
1447 // SAFETY: we have `&mut self`, and therefore we have
1448 // exclusive access to the value.
1449 unsafe {
1450 // Note: the assertion holds because if the value has been sent by sender,
1451 // we must ensure that the value must have been consumed by the receiver before
1452 // dropping the `Inner`.
1453 debug_assert!(self.consume_value().is_none());
1454 }
1455 }
1456}
1457
1458impl<T: fmt::Debug> fmt::Debug for Inner<T> {
1459 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1460 use std::sync::atomic::Ordering::Relaxed;
1461
1462 fmt.debug_struct("Inner")
1463 .field("state", &State::load(&self.state, Relaxed))
1464 .finish()
1465 }
1466}
1467
1468/// Indicates that a waker for the receiving task has been set.
1469///
1470/// # Safety
1471///
1472/// If this bit is not set, the `rx_task` field may be uninitialized.
1473const RX_TASK_SET: usize = 0b00001;
1474/// Indicates that a value has been stored in the channel's inner `UnsafeCell`.
1475///
1476/// # Safety
1477///
1478/// This bit controls which side of the channel is permitted to access the
1479/// `UnsafeCell`. If it is set, the `UnsafeCell` may ONLY be accessed by the
1480/// receiver. If this bit is NOT set, the `UnsafeCell` may ONLY be accessed by
1481/// the sender.
1482const VALUE_SENT: usize = 0b00010;
1483const CLOSED: usize = 0b00100;
1484
1485/// Indicates that a waker for the sending task has been set.
1486///
1487/// # Safety
1488///
1489/// If this bit is not set, the `tx_task` field may be uninitialized.
1490const TX_TASK_SET: usize = 0b01000;
1491
1492impl State {
1493 fn new() -> State {
1494 State(0)
1495 }
1496
1497 fn is_complete(self) -> bool {
1498 self.0 & VALUE_SENT == VALUE_SENT
1499 }
1500
1501 fn set_complete(cell: &AtomicUsize) -> State {
1502 // This method is a compare-and-swap loop rather than a fetch-or like
1503 // other `set_$WHATEVER` methods on `State`. This is because we must
1504 // check if the state has been closed before setting the `VALUE_SENT`
1505 // bit.
1506 //
1507 // We don't want to set both the `VALUE_SENT` bit if the `CLOSED`
1508 // bit is already set, because `VALUE_SENT` will tell the receiver that
1509 // it's okay to access the inner `UnsafeCell`. Immediately after calling
1510 // `set_complete`, if the channel was closed, the sender will _also_
1511 // access the `UnsafeCell` to take the value back out, so if a
1512 // `poll_recv` or `try_recv` call is occurring concurrently, both
1513 // threads may try to access the `UnsafeCell` if we were to set the
1514 // `VALUE_SENT` bit on a closed channel.
1515 let mut state = cell.load(Ordering::Relaxed);
1516 loop {
1517 if State(state).is_closed() {
1518 break;
1519 }
1520 // TODO: This could be `Release`, followed by an `Acquire` fence *if*
1521 // the `RX_TASK_SET` flag is set. However, `loom` does not support
1522 // fences yet.
1523 match cell.compare_exchange_weak(
1524 state,
1525 state | VALUE_SENT,
1526 Ordering::AcqRel,
1527 Ordering::Acquire,
1528 ) {
1529 Ok(_) => break,
1530 Err(actual) => state = actual,
1531 }
1532 }
1533 State(state)
1534 }
1535
1536 fn is_rx_task_set(self) -> bool {
1537 self.0 & RX_TASK_SET == RX_TASK_SET
1538 }
1539
1540 fn set_rx_task(cell: &AtomicUsize) -> State {
1541 let val = cell.fetch_or(RX_TASK_SET, AcqRel);
1542 State(val | RX_TASK_SET)
1543 }
1544
1545 fn unset_rx_task(cell: &AtomicUsize) -> State {
1546 let val = cell.fetch_and(!RX_TASK_SET, AcqRel);
1547 State(val & !RX_TASK_SET)
1548 }
1549
1550 fn is_closed(self) -> bool {
1551 self.0 & CLOSED == CLOSED
1552 }
1553
1554 fn set_closed(cell: &AtomicUsize) -> State {
1555 // Acquire because we want all later writes (attempting to poll) to be
1556 // ordered after this.
1557 let val = cell.fetch_or(CLOSED, Acquire);
1558 State(val)
1559 }
1560
1561 fn set_tx_task(cell: &AtomicUsize) -> State {
1562 let val = cell.fetch_or(TX_TASK_SET, AcqRel);
1563 State(val | TX_TASK_SET)
1564 }
1565
1566 fn unset_tx_task(cell: &AtomicUsize) -> State {
1567 let val = cell.fetch_and(!TX_TASK_SET, AcqRel);
1568 State(val & !TX_TASK_SET)
1569 }
1570
1571 fn is_tx_task_set(self) -> bool {
1572 self.0 & TX_TASK_SET == TX_TASK_SET
1573 }
1574
1575 fn as_usize(self) -> usize {
1576 self.0
1577 }
1578
1579 fn load(cell: &AtomicUsize, order: Ordering) -> State {
1580 let val = cell.load(order);
1581 State(val)
1582 }
1583}
1584
1585impl fmt::Debug for State {
1586 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1587 fmt.debug_struct("State")
1588 .field("is_complete", &self.is_complete())
1589 .field("is_closed", &self.is_closed())
1590 .field("is_rx_task_set", &self.is_rx_task_set())
1591 .field("is_tx_task_set", &self.is_tx_task_set())
1592 .finish()
1593 }
1594}