Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  oneshot.rs   Sprache: unbekannt

 
#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]

//! A one-shot channel is used for sending a single message between
//! asynchronous tasks. The [`channel`] function is used to create a
//! [`Sender`] and [`Receiver`] handle pair that form the channel.
//!
//! The `Sender` handle is used by the producer to send the value.
//! The `Receiver` handle is used by the consumer to receive the value.
//!
//! Each handle can be used on separate tasks.
//!
//! Since the `send` method is not async, it can be used anywhere. This includes
//! sending between two runtimes, and using it from non-async code.
//!
//! If the [`Receiver`] is closed before receiving a message which has already
//! been sent, the message will remain in the channel until the receiver is
//! dropped, at which point the message will be dropped immediately.
//!
//! # Examples
//!
//! ```
//! use tokio::sync::oneshot;
//!
//! #[tokio::main]
//! async fn main() {
//!     let (tx, rx) = oneshot::channel();
//!
//!     tokio::spawn(async move {
//!         if let Err(_) = tx.send(3) {
//!             println!("the receiver dropped");
//!         }
//!     });
//!
//!     match rx.await {
//!         Ok(v) => println!("got = {:?}", v),
//!         Err(_) => println!("the sender dropped"),
//!     }
//! }
//! ```
//!
//! If the sender is dropped without sending, the receiver will fail with
//! [`error::RecvError`]:
//!
//! ```
//! use tokio::sync::oneshot;
//!
//! #[tokio::main]
//! async fn main() {
//!     let (tx, rx) = oneshot::channel::<u32>();
//!
//!     tokio::spawn(async move {
//!         drop(tx);
//!     });
//!
//!     match rx.await {
//!         Ok(_) => panic!("This doesn't happen"),
//!         Err(_) => println!("the sender dropped"),
//!     }
//! }
//! ```
//!
//! To use a `oneshot` channel in a `tokio::select!` loop, add `&mut` in front of
//! the channel.
//!
//! ```
//! use tokio::sync::oneshot;
//! use tokio::time::{interval, sleep, Duration};
//!
//! #[tokio::main]
//! # async fn _doc() {}
//! # #[tokio::main(flavor = "current_thread", start_paused = true)]
//! async fn main() {
//!     let (send, mut recv) = oneshot::channel();
//!     let mut interval = interval(Duration::from_millis(100));
//!
//!     # let handle =
//!     tokio::spawn(async move {
//!         sleep(Duration::from_secs(1)).await;
//!         send.send("shut down").unwrap();
//!     });
//!
//!     loop {
//!         tokio::select! {
//!             _ = interval.tick() => println!("Another 100ms"),
//!             msg = &mut recv => {
//!                 println!("Got message: {}", msg.unwrap());
//!                 break;
//!             }
//!         }
//!     }
//!     # handle.await.unwrap();
//! }
//! ```
//!
//! To use a `Sender` from a destructor, put it in an [`Option`] and call
//! [`Option::take`].
//!
//! ```
//! use tokio::sync::oneshot;
//!
//! struct SendOnDrop {
//!     sender: Option<oneshot::Sender<&'static str>>,
//! }
//! impl Drop for SendOnDrop {
//!     fn drop(&mut self) {
//!         if let Some(sender) = self.sender.take() {
//!             // Using `let _ =` to ignore send errors.
//!             let _ = sender.send("I got dropped!");
//!         }
//!     }
//! }
//!
//! #[tokio::main]
//! # async fn _doc() {}
//! # #[tokio::main(flavor = "current_thread")]
//! async fn main() {
//!     let (send, recv) = oneshot::channel();
//!
//!     let send_on_drop = SendOnDrop { sender: Some(send) };
//!     drop(send_on_drop);
//!
//!     assert_eq!(recv.await, Ok("I got dropped!"));
//! }
//! ```

use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
#[cfg(all(tokio_unstable, feature = "tracing"))]
use crate::util::trace;

use std::fmt;
use std::future::Future;
use std::mem::MaybeUninit;
use std::pin::Pin;
use std::sync::atomic::Ordering::{self, AcqRel, Acquire};
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll, Waker};

/// Sends a value to the associated [`Receiver`].
///
/// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
/// [`channel`](fn@channel) function.
///
/// # Examples
///
/// ```
/// use tokio::sync::oneshot;
///
/// #[tokio::main]
/// async fn main() {
///     let (tx, rx) = oneshot::channel();
///
///     tokio::spawn(async move {
///         if let Err(_) = tx.send(3) {
///             println!("the receiver dropped");
///         }
///     });
///
///     match rx.await {
///         Ok(v) => println!("got = {:?}", v),
///         Err(_) => println!("the sender dropped"),
///     }
/// }
/// ```
///
/// If the sender is dropped without sending, the receiver will fail with
/// [`error::RecvError`]:
///
/// ```
/// use tokio::sync::oneshot;
///
/// #[tokio::main]
/// async fn main() {
///     let (tx, rx) = oneshot::channel::<u32>();
///
///     tokio::spawn(async move {
///         drop(tx);
///     });
///
///     match rx.await {
///         Ok(_) => panic!("This doesn't happen"),
///         Err(_) => println!("the sender dropped"),
///     }
/// }
/// ```
///
/// To use a `Sender` from a destructor, put it in an [`Option`] and call
/// [`Option::take`].
///
/// ```
/// use tokio::sync::oneshot;
///
/// struct SendOnDrop {
///     sender: Option<oneshot::Sender<&'static str>>,
/// }
/// impl Drop for SendOnDrop {
///     fn drop(&mut self) {
///         if let Some(sender) = self.sender.take() {
///             // Using `let _ =` to ignore send errors.
///             let _ = sender.send("I got dropped!");
///         }
///     }
/// }
///
/// #[tokio::main]
/// # async fn _doc() {}
/// # #[tokio::main(flavor = "current_thread")]
/// async fn main() {
///     let (send, recv) = oneshot::channel();
///
///     let send_on_drop = SendOnDrop { sender: Some(send) };
///     drop(send_on_drop);
///
///     assert_eq!(recv.await, Ok("I got dropped!"));
/// }
/// ```
///
/// [`Option`]: std::option::Option
/// [`Option::take`]: std::option::Option::take
#[derive(Debug)]
pub struct Sender<T> {
    inner: Option<Arc<Inner<T>>>,
    #[cfg(all(tokio_unstable, feature = "tracing"))]
    resource_span: tracing::Span,
}

/// Receives a value from the associated [`Sender`].
///
/// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
/// [`channel`](fn@channel) function.
///
/// This channel has no `recv` method because the receiver itself implements the
/// [`Future`] trait. To receive a `Result<T, `[`error::RecvError`]`>`, `.await` the `Receiver` object directly.
///
/// The `poll` method on the `Future` trait is allowed to spuriously return
/// `Poll::Pending` even if the message has been sent. If such a spurious
/// failure happens, then the caller will be woken when the spurious failure has
/// been resolved so that the caller can attempt to receive the message again.
/// Note that receiving such a wakeup does not guarantee that the next call will
/// succeed — it could fail with another spurious failure. (A spurious failure
/// does not mean that the message is lost. It is just delayed.)
///
/// [`Future`]: trait@std::future::Future
///
/// # Examples
///
/// ```
/// use tokio::sync::oneshot;
///
/// #[tokio::main]
/// async fn main() {
///     let (tx, rx) = oneshot::channel();
///
///     tokio::spawn(async move {
///         if let Err(_) = tx.send(3) {
///             println!("the receiver dropped");
///         }
///     });
///
///     match rx.await {
///         Ok(v) => println!("got = {:?}", v),
///         Err(_) => println!("the sender dropped"),
///     }
/// }
/// ```
///
/// If the sender is dropped without sending, the receiver will fail with
/// [`error::RecvError`]:
///
/// ```
/// use tokio::sync::oneshot;
///
/// #[tokio::main]
/// async fn main() {
///     let (tx, rx) = oneshot::channel::<u32>();
///
///     tokio::spawn(async move {
///         drop(tx);
///     });
///
///     match rx.await {
///         Ok(_) => panic!("This doesn't happen"),
///         Err(_) => println!("the sender dropped"),
///     }
/// }
/// ```
///
/// To use a `Receiver` in a `tokio::select!` loop, add `&mut` in front of the
/// channel.
///
/// ```
/// use tokio::sync::oneshot;
/// use tokio::time::{interval, sleep, Duration};
///
/// #[tokio::main]
/// # async fn _doc() {}
/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
/// async fn main() {
///     let (send, mut recv) = oneshot::channel();
///     let mut interval = interval(Duration::from_millis(100));
///
///     # let handle =
///     tokio::spawn(async move {
///         sleep(Duration::from_secs(1)).await;
///         send.send("shut down").unwrap();
///     });
///
///     loop {
///         tokio::select! {
///             _ = interval.tick() => println!("Another 100ms"),
///             msg = &mut recv => {
///                 println!("Got message: {}", msg.unwrap());
///                 break;
///             }
///         }
///     }
///     # handle.await.unwrap();
/// }
/// ```
#[derive(Debug)]
pub struct Receiver<T> {
    inner: Option<Arc<Inner<T>>>,
    #[cfg(all(tokio_unstable, feature = "tracing"))]
    resource_span: tracing::Span,
    #[cfg(all(tokio_unstable, feature = "tracing"))]
    async_op_span: tracing::Span,
    #[cfg(all(tokio_unstable, feature = "tracing"))]
    async_op_poll_span: tracing::Span,
}

pub mod error {
    //! `Oneshot` error types.

    use std::fmt;

    /// Error returned by the `Future` implementation for `Receiver`.
    ///
    /// This error is returned by the receiver when the sender is dropped without sending.
    #[derive(Debug, Eq, PartialEq, Clone)]
    pub struct RecvError(pub(super) ());

    /// Error returned by the `try_recv` function on `Receiver`.
    #[derive(Debug, Eq, PartialEq, Clone)]
    pub enum TryRecvError {
        /// The send half of the channel has not yet sent a value.
        Empty,

        /// The send half of the channel was dropped without sending a value.
        Closed,
    }

    // ===== impl RecvError =====

    impl fmt::Display for RecvError {
        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
            write!(fmt, "channel closed")
        }
    }

    impl std::error::Error for RecvError {}

    // ===== impl TryRecvError =====

    impl fmt::Display for TryRecvError {
        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
            match self {
                TryRecvError::Empty => write!(fmt, "channel empty"),
                TryRecvError::Closed => write!(fmt, "channel closed"),
            }
        }
    }

    impl std::error::Error for TryRecvError {}
}

use self::error::*;

struct Inner<T> {
    /// Manages the state of the inner cell.
    state: AtomicUsize,

    /// The value. This is set by `Sender` and read by `Receiver`. The state of
    /// the cell is tracked by `state`.
    value: UnsafeCell<Option<T>>,

    /// The task to notify when the receiver drops without consuming the value.
    ///
    /// ## Safety
    ///
    /// The `TX_TASK_SET` bit in the `state` field is set if this field is
    /// initialized. If that bit is unset, this field may be uninitialized.
    tx_task: Task,

    /// The task to notify when the value is sent.
    ///
    /// ## Safety
    ///
    /// The `RX_TASK_SET` bit in the `state` field is set if this field is
    /// initialized. If that bit is unset, this field may be uninitialized.
    rx_task: Task,
}

struct Task(UnsafeCell<MaybeUninit<Waker>>);

impl Task {
    unsafe fn will_wake(&self, cx: &mut Context<'_>) -> bool {
        self.with_task(|w| w.will_wake(cx.waker()))
    }

    unsafe fn with_task<F, R>(&self, f: F) -> R
    where
        F: FnOnce(&Waker) -> R,
    {
        self.0.with(|ptr| {
            let waker: *const Waker = (*ptr).as_ptr();
            f(&*waker)
        })
    }

    unsafe fn drop_task(&self) {
        self.0.with_mut(|ptr| {
            let ptr: *mut Waker = (*ptr).as_mut_ptr();
            ptr.drop_in_place();
        });
    }

    unsafe fn set_task(&self, cx: &mut Context<'_>) {
        self.0.with_mut(|ptr| {
            let ptr: *mut Waker = (*ptr).as_mut_ptr();
            ptr.write(cx.waker().clone());
        });
    }
}

#[derive(Clone, Copy)]
struct State(usize);

/// Creates a new one-shot channel for sending single values across asynchronous
/// tasks.
///
/// The function returns separate "send" and "receive" handles. The `Sender`
/// handle is used by the producer to send the value. The `Receiver` handle is
/// used by the consumer to receive the value.
///
/// Each handle can be used on separate tasks.
///
/// # Examples
///
/// ```
/// use tokio::sync::oneshot;
///
/// #[tokio::main]
/// async fn main() {
///     let (tx, rx) = oneshot::channel();
///
///     tokio::spawn(async move {
///         if let Err(_) = tx.send(3) {
///             println!("the receiver dropped");
///         }
///     });
///
///     match rx.await {
///         Ok(v) => println!("got = {:?}", v),
///         Err(_) => println!("the sender dropped"),
///     }
/// }
/// ```
#[track_caller]
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
    #[cfg(all(tokio_unstable, feature = "tracing"))]
    let resource_span = {
        let location = std::panic::Location::caller();

        let resource_span = tracing::trace_span!(
            parent: None,
            "runtime.resource",
            concrete_type = "Sender|Receiver",
            kind = "Sync",
            loc.file = location.file(),
            loc.line = location.line(),
            loc.col = location.column(),
        );

        resource_span.in_scope(|| {
            tracing::trace!(
            target: "runtime::resource::state_update",
            tx_dropped = false,
            tx_dropped.op = "override",
            )
        });

        resource_span.in_scope(|| {
            tracing::trace!(
            target: "runtime::resource::state_update",
            rx_dropped = false,
            rx_dropped.op = "override",
            )
        });

        resource_span.in_scope(|| {
            tracing::trace!(
            target: "runtime::resource::state_update",
            value_sent = false,
            value_sent.op = "override",
            )
        });

        resource_span.in_scope(|| {
            tracing::trace!(
            target: "runtime::resource::state_update",
            value_received = false,
            value_received.op = "override",
            )
        });

        resource_span
    };

    let inner = Arc::new(Inner {
        state: AtomicUsize::new(State::new().as_usize()),
        value: UnsafeCell::new(None),
        tx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
        rx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
    });

    let tx = Sender {
        inner: Some(inner.clone()),
        #[cfg(all(tokio_unstable, feature = "tracing"))]
        resource_span: resource_span.clone(),
    };

    #[cfg(all(tokio_unstable, feature = "tracing"))]
    let async_op_span = resource_span
        .in_scope(|| tracing::trace_span!("runtime.resource.async_op", source = "Receiver::await"));

    #[cfg(all(tokio_unstable, feature = "tracing"))]
    let async_op_poll_span =
        async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll"));

    let rx = Receiver {
        inner: Some(inner),
        #[cfg(all(tokio_unstable, feature = "tracing"))]
        resource_span,
        #[cfg(all(tokio_unstable, feature = "tracing"))]
        async_op_span,
        #[cfg(all(tokio_unstable, feature = "tracing"))]
        async_op_poll_span,
    };

    (tx, rx)
}

impl<T> Sender<T> {
    /// Attempts to send a value on this channel, returning it back if it could
    /// not be sent.
    ///
    /// This method consumes `self` as only one value may ever be sent on a `oneshot`
    /// channel. It is not marked async because sending a message to an `oneshot`
    /// channel never requires any form of waiting.  Because of this, the `send`
    /// method can be used in both synchronous and asynchronous code without
    /// problems.
    ///
    /// A successful send occurs when it is determined that the other end of the
    /// channel has not hung up already. An unsuccessful send would be one where
    /// the corresponding receiver has already been deallocated. Note that a
    /// return value of `Err` means that the data will never be received, but
    /// a return value of `Ok` does *not* mean that the data will be received.
    /// It is possible for the corresponding receiver to hang up immediately
    /// after this function returns `Ok`.
    ///
    /// # Examples
    ///
    /// Send a value to another task
    ///
    /// ```
    /// use tokio::sync::oneshot;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let (tx, rx) = oneshot::channel();
    ///
    ///     tokio::spawn(async move {
    ///         if let Err(_) = tx.send(3) {
    ///             println!("the receiver dropped");
    ///         }
    ///     });
    ///
    ///     match rx.await {
    ///         Ok(v) => println!("got = {:?}", v),
    ///         Err(_) => println!("the sender dropped"),
    ///     }
    /// }
    /// ```
    pub fn send(mut self, t: T) -> Result<(), T> {
        let inner = self.inner.take().unwrap();

        inner.value.with_mut(|ptr| unsafe {
            // SAFETY: The receiver will not access the `UnsafeCell` unless the
            // channel has been marked as "complete" (the `VALUE_SENT` state bit
            // is set).
            // That bit is only set by the sender later on in this method, and
            // calling this method consumes `self`. Therefore, if it was possible to
            // call this method, we know that the `VALUE_SENT` bit is unset, and
            // the receiver is not currently accessing the `UnsafeCell`.
            *ptr = Some(t);
        });

        if !inner.complete() {
            unsafe {
                // SAFETY: The receiver will not access the `UnsafeCell` unless
                // the channel has been marked as "complete". Calling
                // `complete()` will return true if this bit is set, and false
                // if it is not set. Thus, if `complete()` returned false, it is
                // safe for us to access the value, because we know that the
                // receiver will not.
                return Err(inner.consume_value().unwrap());
            }
        }

        #[cfg(all(tokio_unstable, feature = "tracing"))]
        self.resource_span.in_scope(|| {
            tracing::trace!(
            target: "runtime::resource::state_update",
            value_sent = true,
            value_sent.op = "override",
            )
        });

        Ok(())
    }

    /// Waits for the associated [`Receiver`] handle to close.
    ///
    /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
    /// [`Receiver`] value is dropped.
    ///
    /// This function is useful when paired with `select!` to abort a
    /// computation when the receiver is no longer interested in the result.
    ///
    /// # Return
    ///
    /// Returns a `Future` which must be awaited on.
    ///
    /// [`Receiver`]: Receiver
    /// [`close`]: Receiver::close
    ///
    /// # Examples
    ///
    /// Basic usage
    ///
    /// ```
    /// use tokio::sync::oneshot;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let (mut tx, rx) = oneshot::channel::<()>();
    ///
    ///     tokio::spawn(async move {
    ///         drop(rx);
    ///     });
    ///
    ///     tx.closed().await;
    ///     println!("the receiver dropped");
    /// }
    /// ```
    ///
    /// Paired with select
    ///
    /// ```
    /// use tokio::sync::oneshot;
    /// use tokio::time::{self, Duration};
    ///
    /// async fn compute() -> String {
    ///     // Complex computation returning a `String`
    /// # "hello".to_string()
    /// }
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let (mut tx, rx) = oneshot::channel();
    ///
    ///     tokio::spawn(async move {
    ///         tokio::select! {
    ///             _ = tx.closed() => {
    ///                 // The receiver dropped, no need to do any further work
    ///             }
    ///             value = compute() => {
    ///                 // The send can fail if the channel was closed at the exact same
    ///                 // time as when compute() finished, so just ignore the failure.
    ///                 let _ = tx.send(value);
    ///             }
    ///         }
    ///     });
    ///
    ///     // Wait for up to 10 seconds
    ///     let _ = time::timeout(Duration::from_secs(10), rx).await;
    /// }
    /// ```
    pub async fn closed(&mut self) {
        use crate::future::poll_fn;

        #[cfg(all(tokio_unstable, feature = "tracing"))]
        let resource_span = self.resource_span.clone();
        #[cfg(all(tokio_unstable, feature = "tracing"))]
        let closed = trace::async_op(
            || poll_fn(|cx| self.poll_closed(cx)),
            resource_span,
            "Sender::closed",
            "poll_closed",
            false,
        );
        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
        let closed = poll_fn(|cx| self.poll_closed(cx));

        closed.await;
    }

    /// Returns `true` if the associated [`Receiver`] handle has been dropped.
    ///
    /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
    /// [`Receiver`] value is dropped.
    ///
    /// If `true` is returned, a call to `send` will always result in an error.
    ///
    /// [`Receiver`]: Receiver
    /// [`close`]: Receiver::close
    ///
    /// # Examples
    ///
    /// ```
    /// use tokio::sync::oneshot;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let (tx, rx) = oneshot::channel();
    ///
    ///     assert!(!tx.is_closed());
    ///
    ///     drop(rx);
    ///
    ///     assert!(tx.is_closed());
    ///     assert!(tx.send("never received").is_err());
    /// }
    /// ```
    pub fn is_closed(&self) -> bool {
        let inner = self.inner.as_ref().unwrap();

        let state = State::load(&inner.state, Acquire);
        state.is_closed()
    }

    /// Checks whether the `oneshot` channel has been closed, and if not, schedules the
    /// `Waker` in the provided `Context` to receive a notification when the channel is
    /// closed.
    ///
    /// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the
    /// [`Receiver`] value is dropped.
    ///
    /// Note that on multiple calls to poll, only the `Waker` from the `Context` passed
    /// to the most recent call will be scheduled to receive a wakeup.
    ///
    /// [`Receiver`]: struct@crate::sync::oneshot::Receiver
    /// [`close`]: fn@crate::sync::oneshot::Receiver::close
    ///
    /// # Return value
    ///
    /// This function returns:
    ///
    ///  * `Poll::Pending` if the channel is still open.
    ///  * `Poll::Ready(())` if the channel is closed.
    ///
    /// # Examples
    ///
    /// ```
    /// use tokio::sync::oneshot;
    ///
    /// use futures::future::poll_fn;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let (mut tx, mut rx) = oneshot::channel::<()>();
    ///
    ///     tokio::spawn(async move {
    ///         rx.close();
    ///     });
    ///
    ///     poll_fn(|cx| tx.poll_closed(cx)).await;
    ///
    ///     println!("the receiver dropped");
    /// }
    /// ```
    pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
        ready!(crate::trace::trace_leaf(cx));

        // Keep track of task budget
        let coop = ready!(crate::runtime::coop::poll_proceed(cx));

        let inner = self.inner.as_ref().unwrap();

        let mut state = State::load(&inner.state, Acquire);

        if state.is_closed() {
            coop.made_progress();
            return Ready(());
        }

        if state.is_tx_task_set() {
            let will_notify = unsafe { inner.tx_task.will_wake(cx) };

            if !will_notify {
                state = State::unset_tx_task(&inner.state);

                if state.is_closed() {
                    // Set the flag again so that the waker is released in drop
                    State::set_tx_task(&inner.state);
                    coop.made_progress();
                    return Ready(());
                } else {
                    unsafe { inner.tx_task.drop_task() };
                }
            }
        }

        if !state.is_tx_task_set() {
            // Attempt to set the task
            unsafe {
                inner.tx_task.set_task(cx);
            }

            // Update the state
            state = State::set_tx_task(&inner.state);

            if state.is_closed() {
                coop.made_progress();
                return Ready(());
            }
        }

        Pending
    }
}

impl<T> Drop for Sender<T> {
    fn drop(&mut self) {
        if let Some(inner) = self.inner.as_ref() {
            inner.complete();
            #[cfg(all(tokio_unstable, feature = "tracing"))]
            self.resource_span.in_scope(|| {
                tracing::trace!(
                target: "runtime::resource::state_update",
                tx_dropped = true,
                tx_dropped.op = "override",
                )
            });
        }
    }
}

impl<T> Receiver<T> {
    /// Prevents the associated [`Sender`] handle from sending a value.
    ///
    /// Any `send` operation which happens after calling `close` is guaranteed
    /// to fail. After calling `close`, [`try_recv`] should be called to
    /// receive a value if one was sent **before** the call to `close`
    /// completed.
    ///
    /// This function is useful to perform a graceful shutdown and ensure that a
    /// value will not be sent into the channel and never received.
    ///
    /// `close` is no-op if a message is already received or the channel
    /// is already closed.
    ///
    /// [`Sender`]: Sender
    /// [`try_recv`]: Receiver::try_recv
    ///
    /// # Examples
    ///
    /// Prevent a value from being sent
    ///
    /// ```
    /// use tokio::sync::oneshot;
    /// use tokio::sync::oneshot::error::TryRecvError;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let (tx, mut rx) = oneshot::channel();
    ///
    ///     assert!(!tx.is_closed());
    ///
    ///     rx.close();
    ///
    ///     assert!(tx.is_closed());
    ///     assert!(tx.send("never received").is_err());
    ///
    ///     match rx.try_recv() {
    ///         Err(TryRecvError::Closed) => {}
    ///         _ => unreachable!(),
    ///     }
    /// }
    /// ```
    ///
    /// Receive a value sent **before** calling `close`
    ///
    /// ```
    /// use tokio::sync::oneshot;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let (tx, mut rx) = oneshot::channel();
    ///
    ///     assert!(tx.send("will receive").is_ok());
    ///
    ///     rx.close();
    ///
    ///     let msg = rx.try_recv().unwrap();
    ///     assert_eq!(msg, "will receive");
    /// }
    /// ```
    pub fn close(&mut self) {
        if let Some(inner) = self.inner.as_ref() {
            inner.close();
            #[cfg(all(tokio_unstable, feature = "tracing"))]
            self.resource_span.in_scope(|| {
                tracing::trace!(
                target: "runtime::resource::state_update",
                rx_dropped = true,
                rx_dropped.op = "override",
                )
            });
        }
    }

    /// Attempts to receive a value.
    ///
    /// If a pending value exists in the channel, it is returned. If no value
    /// has been sent, the current task **will not** be registered for
    /// future notification.
    ///
    /// This function is useful to call from outside the context of an
    /// asynchronous task.
    ///
    /// Note that unlike the `poll` method, the `try_recv` method cannot fail
    /// spuriously. Any send or close event that happens before this call to
    /// `try_recv` will be correctly returned to the caller.
    ///
    /// # Return
    ///
    /// - `Ok(T)` if a value is pending in the channel.
    /// - `Err(TryRecvError::Empty)` if no value has been sent yet.
    /// - `Err(TryRecvError::Closed)` if the sender has dropped without sending
    ///   a value, or if the message has already been received.
    ///
    /// # Examples
    ///
    /// `try_recv` before a value is sent, then after.
    ///
    /// ```
    /// use tokio::sync::oneshot;
    /// use tokio::sync::oneshot::error::TryRecvError;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let (tx, mut rx) = oneshot::channel();
    ///
    ///     match rx.try_recv() {
    ///         // The channel is currently empty
    ///         Err(TryRecvError::Empty) => {}
    ///         _ => unreachable!(),
    ///     }
    ///
    ///     // Send a value
    ///     tx.send("hello").unwrap();
    ///
    ///     match rx.try_recv() {
    ///         Ok(value) => assert_eq!(value, "hello"),
    ///         _ => unreachable!(),
    ///     }
    /// }
    /// ```
    ///
    /// `try_recv` when the sender dropped before sending a value
    ///
    /// ```
    /// use tokio::sync::oneshot;
    /// use tokio::sync::oneshot::error::TryRecvError;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let (tx, mut rx) = oneshot::channel::<()>();
    ///
    ///     drop(tx);
    ///
    ///     match rx.try_recv() {
    ///         // The channel will never receive a value.
    ///         Err(TryRecvError::Closed) => {}
    ///         _ => unreachable!(),
    ///     }
    /// }
    /// ```
    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
        let result = if let Some(inner) = self.inner.as_ref() {
            let state = State::load(&inner.state, Acquire);

            if state.is_complete() {
                // SAFETY: If `state.is_complete()` returns true, then the
                // `VALUE_SENT` bit has been set and the sender side of the
                // channel will no longer attempt to access the inner
                // `UnsafeCell`. Therefore, it is now safe for us to access the
                // cell.
                match unsafe { inner.consume_value() } {
                    Some(value) => {
                        #[cfg(all(tokio_unstable, feature = "tracing"))]
                        self.resource_span.in_scope(|| {
                            tracing::trace!(
                            target: "runtime::resource::state_update",
                            value_received = true,
                            value_received.op = "override",
                            )
                        });
                        Ok(value)
                    }
                    None => Err(TryRecvError::Closed),
                }
            } else if state.is_closed() {
                Err(TryRecvError::Closed)
            } else {
                // Not ready, this does not clear `inner`
                return Err(TryRecvError::Empty);
            }
        } else {
            Err(TryRecvError::Closed)
        };

        self.inner = None;
        result
    }

    /// Blocking receive to call outside of asynchronous contexts.
    ///
    /// # Panics
    ///
    /// This function panics if called within an asynchronous execution
    /// context.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::thread;
    /// use tokio::sync::oneshot;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let (tx, rx) = oneshot::channel::<u8>();
    ///
    ///     let sync_code = thread::spawn(move || {
    ///         assert_eq!(Ok(10), rx.blocking_recv());
    ///     });
    ///
    ///     let _ = tx.send(10);
    ///     sync_code.join().unwrap();
    /// }
    /// ```
    #[track_caller]
    #[cfg(feature = "sync")]
    #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
    pub fn blocking_recv(self) -> Result<T, RecvError> {
        crate::future::block_on(self)
    }
}

impl<T> Drop for Receiver<T> {
    fn drop(&mut self) {
        if let Some(inner) = self.inner.as_ref() {
            let state = inner.close();

            if state.is_complete() {
                // SAFETY: we have ensured that the `VALUE_SENT` bit has been set,
                // so only the receiver can access the value.
                drop(unsafe { inner.consume_value() });
            }

            #[cfg(all(tokio_unstable, feature = "tracing"))]
            self.resource_span.in_scope(|| {
                tracing::trace!(
                target: "runtime::resource::state_update",
                rx_dropped = true,
                rx_dropped.op = "override",
                )
            });
        }
    }
}

impl<T> Future for Receiver<T> {
    type Output = Result<T, RecvError>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // If `inner` is `None`, then `poll()` has already completed.
        #[cfg(all(tokio_unstable, feature = "tracing"))]
        let _res_span = self.resource_span.clone().entered();
        #[cfg(all(tokio_unstable, feature = "tracing"))]
        let _ao_span = self.async_op_span.clone().entered();
        #[cfg(all(tokio_unstable, feature = "tracing"))]
        let _ao_poll_span = self.async_op_poll_span.clone().entered();

        let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
            #[cfg(all(tokio_unstable, feature = "tracing"))]
            let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?;

            #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
            let res = ready!(inner.poll_recv(cx))?;

            res
        } else {
            panic!("called after complete");
        };

        self.inner = None;
        Ready(Ok(ret))
    }
}

impl<T> Inner<T> {
    fn complete(&self) -> bool {
        let prev = State::set_complete(&self.state);

        if prev.is_closed() {
            return false;
        }

        if prev.is_rx_task_set() {
            // TODO: Consume waker?
            unsafe {
                self.rx_task.with_task(Waker::wake_by_ref);
            }
        }

        true
    }

    fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
        ready!(crate::trace::trace_leaf(cx));
        // Keep track of task budget
        let coop = ready!(crate::runtime::coop::poll_proceed(cx));

        // Load the state
        let mut state = State::load(&self.state, Acquire);

        if state.is_complete() {
            coop.made_progress();
            match unsafe { self.consume_value() } {
                Some(value) => Ready(Ok(value)),
                None => Ready(Err(RecvError(()))),
            }
        } else if state.is_closed() {
            coop.made_progress();
            Ready(Err(RecvError(())))
        } else {
            if state.is_rx_task_set() {
                let will_notify = unsafe { self.rx_task.will_wake(cx) };

                // Check if the task is still the same
                if !will_notify {
                    // Unset the task
                    state = State::unset_rx_task(&self.state);
                    if state.is_complete() {
                        // Set the flag again so that the waker is released in drop
                        State::set_rx_task(&self.state);

                        coop.made_progress();
                        // SAFETY: If `state.is_complete()` returns true, then the
                        // `VALUE_SENT` bit has been set and the sender side of the
                        // channel will no longer attempt to access the inner
                        // `UnsafeCell`. Therefore, it is now safe for us to access the
                        // cell.
                        return match unsafe { self.consume_value() } {
                            Some(value) => Ready(Ok(value)),
                            None => Ready(Err(RecvError(()))),
                        };
                    } else {
                        unsafe { self.rx_task.drop_task() };
                    }
                }
            }

            if !state.is_rx_task_set() {
                // Attempt to set the task
                unsafe {
                    self.rx_task.set_task(cx);
                }

                // Update the state
                state = State::set_rx_task(&self.state);

                if state.is_complete() {
                    coop.made_progress();
                    match unsafe { self.consume_value() } {
                        Some(value) => Ready(Ok(value)),
                        None => Ready(Err(RecvError(()))),
                    }
                } else {
                    Pending
                }
            } else {
                Pending
            }
        }
    }

    /// Called by `Receiver` to indicate that the value will never be received.
    fn close(&self) -> State {
        let prev = State::set_closed(&self.state);

        if prev.is_tx_task_set() && !prev.is_complete() {
            unsafe {
                self.tx_task.with_task(Waker::wake_by_ref);
            }
        }

        prev
    }

    /// Consumes the value. This function does not check `state`.
    ///
    /// # Safety
    ///
    /// Calling this method concurrently on multiple threads will result in a
    /// data race. The `VALUE_SENT` state bit is used to ensure that only the
    /// sender *or* the receiver will call this method at a given point in time.
    /// If `VALUE_SENT` is not set, then only the sender may call this method;
    /// if it is set, then only the receiver may call this method.
    unsafe fn consume_value(&self) -> Option<T> {
        self.value.with_mut(|ptr| (*ptr).take())
    }
}

unsafe impl<T: Send> Send for Inner<T> {}
unsafe impl<T: Send> Sync for Inner<T> {}

fn mut_load(this: &mut AtomicUsize) -> usize {
    this.with_mut(|v| *v)
}

impl<T> Drop for Inner<T> {
    fn drop(&mut self) {
        let state = State(mut_load(&mut self.state));

        if state.is_rx_task_set() {
            unsafe {
                self.rx_task.drop_task();
            }
        }

        if state.is_tx_task_set() {
            unsafe {
                self.tx_task.drop_task();
            }
        }

        // SAFETY: we have `&mut self`, and therefore we have
        // exclusive access to the value.
        unsafe {
            // Note: the assertion holds because if the value has been sent by sender,
            // we must ensure that the value must have been consumed by the receiver before
            // dropping the `Inner`.
            debug_assert!(self.consume_value().is_none());
        }
    }
}

impl<T: fmt::Debug> fmt::Debug for Inner<T> {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        use std::sync::atomic::Ordering::Relaxed;

        fmt.debug_struct("Inner")
            .field("state", &State::load(&self.state, Relaxed))
            .finish()
    }
}

/// Indicates that a waker for the receiving task has been set.
///
/// # Safety
///
/// If this bit is not set, the `rx_task` field may be uninitialized.
const RX_TASK_SET: usize = 0b00001;
/// Indicates that a value has been stored in the channel's inner `UnsafeCell`.
///
/// # Safety
///
/// This bit controls which side of the channel is permitted to access the
/// `UnsafeCell`. If it is set, the `UnsafeCell` may ONLY be accessed by the
/// receiver. If this bit is NOT set, the `UnsafeCell` may ONLY be accessed by
/// the sender.
const VALUE_SENT: usize = 0b00010;
const CLOSED: usize = 0b00100;

/// Indicates that a waker for the sending task has been set.
///
/// # Safety
///
/// If this bit is not set, the `tx_task` field may be uninitialized.
const TX_TASK_SET: usize = 0b01000;

impl State {
    fn new() -> State {
        State(0)
    }

    fn is_complete(self) -> bool {
        self.0 & VALUE_SENT == VALUE_SENT
    }

    fn set_complete(cell: &AtomicUsize) -> State {
        // This method is a compare-and-swap loop rather than a fetch-or like
        // other `set_$WHATEVER` methods on `State`. This is because we must
        // check if the state has been closed before setting the `VALUE_SENT`
        // bit.
        //
        // We don't want to set both the `VALUE_SENT` bit if the `CLOSED`
        // bit is already set, because `VALUE_SENT` will tell the receiver that
        // it's okay to access the inner `UnsafeCell`. Immediately after calling
        // `set_complete`, if the channel was closed, the sender will _also_
        // access the `UnsafeCell` to take the value back out, so if a
        // `poll_recv` or `try_recv` call is occurring concurrently, both
        // threads may try to access the `UnsafeCell` if we were to set the
        // `VALUE_SENT` bit on a closed channel.
        let mut state = cell.load(Ordering::Relaxed);
        loop {
            if State(state).is_closed() {
                break;
            }
            // TODO: This could be `Release`, followed by an `Acquire` fence *if*
            // the `RX_TASK_SET` flag is set. However, `loom` does not support
            // fences yet.
            match cell.compare_exchange_weak(
                state,
                state | VALUE_SENT,
                Ordering::AcqRel,
                Ordering::Acquire,
            ) {
                Ok(_) => break,
                Err(actual) => state = actual,
            }
        }
        State(state)
    }

    fn is_rx_task_set(self) -> bool {
        self.0 & RX_TASK_SET == RX_TASK_SET
    }

    fn set_rx_task(cell: &AtomicUsize) -> State {
        let val = cell.fetch_or(RX_TASK_SET, AcqRel);
        State(val | RX_TASK_SET)
    }

    fn unset_rx_task(cell: &AtomicUsize) -> State {
        let val = cell.fetch_and(!RX_TASK_SET, AcqRel);
        State(val & !RX_TASK_SET)
    }

    fn is_closed(self) -> bool {
        self.0 & CLOSED == CLOSED
    }

    fn set_closed(cell: &AtomicUsize) -> State {
        // Acquire because we want all later writes (attempting to poll) to be
        // ordered after this.
        let val = cell.fetch_or(CLOSED, Acquire);
        State(val)
    }

    fn set_tx_task(cell: &AtomicUsize) -> State {
        let val = cell.fetch_or(TX_TASK_SET, AcqRel);
        State(val | TX_TASK_SET)
    }

    fn unset_tx_task(cell: &AtomicUsize) -> State {
        let val = cell.fetch_and(!TX_TASK_SET, AcqRel);
        State(val & !TX_TASK_SET)
    }

    fn is_tx_task_set(self) -> bool {
        self.0 & TX_TASK_SET == TX_TASK_SET
    }

    fn as_usize(self) -> usize {
        self.0
    }

    fn load(cell: &AtomicUsize, order: Ordering) -> State {
        let val = cell.load(order);
        State(val)
    }
}

impl fmt::Debug for State {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt.debug_struct("State")
            .field("is_complete", &self.is_complete())
            .field("is_closed", &self.is_closed())
            .field("is_rx_task_set", &self.is_rx_task_set())
            .field("is_tx_task_set", &self.is_tx_task_set())
            .finish()
    }
}

[ Dauer der Verarbeitung: 0.33 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge