Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/third_party/rust/want/src/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 16 kB image not shown  

Quelle  lib.rs   Sprache: unbekannt

 
Spracherkennung für: .rs vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]

#![doc(html_root_url = "https://docs.rs/want/0.3.0")]
#![deny(warnings)]
#![deny(missing_docs)]
#![deny(missing_debug_implementations)]

//! A Futures channel-like utility to signal when a value is wanted.
//!
//! Futures are supposed to be lazy, and only starting work if `Future::poll`
//! is called. The same is true of `Stream`s, but when using a channel as
//! a `Stream`, it can be hard to know if the receiver is ready for the next
//! value.
//!
//! Put another way, given a `(tx, rx)` from `futures::sync::mpsc::channel()`,
//! how can the sender (`tx`) know when the receiver (`rx`) actually wants more
//! work to be produced? Just because there is room in the channel buffer
//! doesn't mean the work would be used by the receiver.
//!
//! This is where something like `want` comes in. Added to a channel, you can
//! make sure that the `tx` only creates the message and sends it when the `rx`
//! has `poll()` for it, and the buffer was empty.
//!
//! # Example
//!
//! ```nightly
//! # //#![feature(async_await)]
//! extern crate want;
//!
//! # fn spawn<T>(_t: T) {}
//! # fn we_still_want_message() -> bool { true }
//! # fn mpsc_channel() -> (Tx, Rx) { (Tx, Rx) }
//! # struct Tx;
//! # impl Tx { fn send<T>(&mut self, _: T) {} }
//! # struct Rx;
//! # impl Rx { async fn recv(&mut self) -> Option<Expensive> { Some(Expensive) } }
//!
//! // Some message that is expensive to produce.
//! struct Expensive;
//!
//! // Some futures-aware MPSC channel...
//! let (mut tx, mut rx) = mpsc_channel();
//!
//! // And our `want` channel!
//! let (mut gv, mut tk) = want::new();
//!
//!
//! // Our receiving task...
//! spawn(async move {
//!     // Maybe something comes up that prevents us from ever
//!     // using the expensive message.
//!     //
//!     // Without `want`, the "send" task may have started to
//!     // produce the expensive message even though we wouldn't
//!     // be able to use it.
//!     if !we_still_want_message() {
//!         return;
//!     }
//!
//!     // But we can use it! So tell the `want` channel.
//!     tk.want();
//!
//!     match rx.recv().await {
//!         Some(_msg) => println!("got a message"),
//!         None => println!("DONE"),
//!     }
//! });
//!
//! // Our sending task
//! spawn(async move {
//!     // It's expensive to create a new message, so we wait until the
//!     // receiving end truly *wants* the message.
//!     if let Err(_closed) = gv.want().await {
//!         // Looks like they will never want it...
//!         return;
//!     }
//!
//!     // They want it, let's go!
//!     tx.send(Expensive);
//! });
//!
//! # fn main() {}
//! ```

#[macro_use]
extern crate log;

use std::fmt;
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
// SeqCst is the only ordering used to ensure accessing the state and
// TryLock are never re-ordered.
use std::sync::atomic::Ordering::SeqCst;
use std::task::{self, Poll, Waker};


use try_lock::TryLock;

/// Create a new `want` channel.
pub fn new() -> (Giver, Taker) {
    let inner = Arc::new(Inner {
        state: AtomicUsize::new(State::Idle.into()),
        task: TryLock::new(None),
    });
    let inner2 = inner.clone();
    (
        Giver {
            inner: inner,
        },
        Taker {
            inner: inner2,
        },
    )
}

/// An entity that gives a value when wanted.
pub struct Giver {
    inner: Arc<Inner>,
}

/// An entity that wants a value.
pub struct Taker {
    inner: Arc<Inner>,
}

/// A cloneable `Giver`.
///
/// It differs from `Giver` in that you cannot poll for `want`. It's only
/// usable as a cancellation watcher.
#[derive(Clone)]
pub struct SharedGiver {
    inner: Arc<Inner>,
}

/// The `Taker` has canceled its interest in a value.
pub struct Closed {
    _inner: (),
}

#[derive(Clone, Copy, Debug)]
enum State {
    Idle,
    Want,
    Give,
    Closed,
}

impl From<State> for usize {
    fn from(s: State) -> usize {
        match s {
            State::Idle => 0,
            State::Want => 1,
            State::Give => 2,
            State::Closed => 3,
        }
    }
}

impl From<usize> for State {
    fn from(num: usize) -> State {
        match num {
            0 => State::Idle,
            1 => State::Want,
            2 => State::Give,
            3 => State::Closed,
            _ => unreachable!("unknown state: {}", num),
        }
    }
}

struct Inner {
    state: AtomicUsize,
    task: TryLock<Option<Waker>>,
}

// ===== impl Giver ======

impl Giver {
    /// Returns a `Future` that fulfills when the `Taker` has done some action.
    pub fn want<'a>(&'a mut self) -> impl Future<Output = Result<(), Closed>> + 'a {
        Want(self)
    }

    /// Poll whether the `Taker` has registered interest in another value.
    ///
    /// - If the `Taker` has called `want()`, this returns `Async::Ready(())`.
    /// - If the `Taker` has not called `want()` since last poll, this
    ///   returns `Async::NotReady`, and parks the current task to be notified
    ///   when the `Taker` does call `want()`.
    /// - If the `Taker` has canceled (or dropped), this returns `Closed`.
    ///
    /// After knowing that the Taker is wanting, the state can be reset by
    /// calling [`give`](Giver::give).
    pub fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Closed>> {
        loop {
            let state = self.inner.state.load(SeqCst).into();
            match state {
                State::Want => {
                    trace!("poll_want: taker wants!");
                    return Poll::Ready(Ok(()));
                },
                State::Closed => {
                    trace!("poll_want: closed");
                    return Poll::Ready(Err(Closed { _inner: () }));
                },
                State::Idle | State::Give => {
                    // Taker doesn't want anything yet, so park.
                    if let Some(mut locked) = self.inner.task.try_lock_order(SeqCst, SeqCst) {

                        // While we have the lock, try to set to GIVE.
                        let old = self.inner.state.compare_and_swap(
                            state.into(),
                            State::Give.into(),
                            SeqCst,
                        );
                        // If it's still the first state (Idle or Give), park current task.
                        if old == state.into() {
                            let park = locked.as_ref()
                                .map(|w| !w.will_wake(cx.waker()))
                                .unwrap_or(true);
                            if park {
                                let old = mem::replace(&mut *locked, Some(cx.waker().clone()));
                                drop(locked);
                                old.map(|prev_task| {
                                    // there was an old task parked here.
                                    // it might be waiting to be notified,
                                    // so poke it before dropping.
                                    prev_task.wake();
                                });
                            }
                            return Poll::Pending;
                        }
                        // Otherwise, something happened! Go around the loop again.
                    } else {
                        // if we couldn't take the lock, then a Taker has it.
                        // The *ONLY* reason is because it is in the process of notifying us
                        // of its want.
                        //
                        // We need to loop again to see what state it was changed to.
                    }
                },
            }
        }
    }

    /// Mark the state as idle, if the Taker currently is wanting.
    ///
    /// Returns true if Taker was wanting, false otherwise.
    #[inline]
    pub fn give(&self) -> bool {
        // only set to IDLE if it is still Want
        self.inner.state.compare_and_swap(
            State::Want.into(),
            State::Idle.into(),
            SeqCst,
        ) == State::Want.into()
    }

    /// Check if the `Taker` has called `want()` without parking a task.
    ///
    /// This is safe to call outside of a futures task context, but other
    /// means of being notified is left to the user.
    #[inline]
    pub fn is_wanting(&self) -> bool {
        self.inner.state.load(SeqCst) == State::Want.into()
    }


    /// Check if the `Taker` has canceled interest without parking a task.
    #[inline]
    pub fn is_canceled(&self) -> bool {
        self.inner.state.load(SeqCst) == State::Closed.into()
    }

    /// Converts this into a `SharedGiver`.
    #[inline]
    pub fn shared(self) -> SharedGiver {
        SharedGiver {
            inner: self.inner,
        }
    }
}

impl fmt::Debug for Giver {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.debug_struct("Giver")
            .field("state", &self.inner.state())
            .finish()
    }
}

// ===== impl SharedGiver ======

impl SharedGiver {
    /// Check if the `Taker` has called `want()` without parking a task.
    ///
    /// This is safe to call outside of a futures task context, but other
    /// means of being notified is left to the user.
    #[inline]
    pub fn is_wanting(&self) -> bool {
        self.inner.state.load(SeqCst) == State::Want.into()
    }


    /// Check if the `Taker` has canceled interest without parking a task.
    #[inline]
    pub fn is_canceled(&self) -> bool {
        self.inner.state.load(SeqCst) == State::Closed.into()
    }
}

impl fmt::Debug for SharedGiver {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.debug_struct("SharedGiver")
            .field("state", &self.inner.state())
            .finish()
    }
}

// ===== impl Taker ======

impl Taker {
    /// Signal to the `Giver` that the want is canceled.
    ///
    /// This is useful to tell that the channel is closed if you cannot
    /// drop the value yet.
    #[inline]
    pub fn cancel(&mut self) {
        trace!("signal: {:?}", State::Closed);
        self.signal(State::Closed)
    }

    /// Signal to the `Giver` that a value is wanted.
    #[inline]
    pub fn want(&mut self) {
        debug_assert!(
            self.inner.state.load(SeqCst) != State::Closed.into(),
            "want called after cancel"
        );
        trace!("signal: {:?}", State::Want);
        self.signal(State::Want)
    }

    #[inline]
    fn signal(&mut self, state: State) {
        let old_state = self.inner.state.swap(state.into(), SeqCst).into();
        match old_state {
            State::Idle | State::Want | State::Closed => (),
            State::Give => {
                loop {
                    if let Some(mut locked) = self.inner.task.try_lock_order(SeqCst, SeqCst) {
                        if let Some(task) = locked.take() {
                            drop(locked);
                            trace!("signal found waiting giver, notifying");
                            task.wake();
                        }
                        return;
                    } else {
                        // if we couldn't take the lock, then a Giver has it.
                        // The *ONLY* reason is because it is in the process of parking.
                        //
                        // We need to loop and take the lock so we can notify this task.
                    }
                }
            },
        }
    }
}

impl Drop for Taker {
    #[inline]
    fn drop(&mut self) {
        self.signal(State::Closed);
    }
}

impl fmt::Debug for Taker {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.debug_struct("Taker")
            .field("state", &self.inner.state())
            .finish()
    }
}

// ===== impl Closed ======

impl fmt::Debug for Closed {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.debug_struct("Closed")
            .finish()
    }
}

// ===== impl Inner ======

impl Inner {
    #[inline]
    fn state(&self) -> State {
        self.state.load(SeqCst).into()
    }
}

// ===== impl PollFn ======

struct Want<'a>(&'a mut Giver);


impl Future for Want<'_> {
    type Output = Result<(), Closed>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
        self.0.poll_want(cx)
    }
}

#[cfg(test)]
mod tests {
    use std::thread;
    use tokio_sync::oneshot;
    use super::*;

    fn block_on<F: Future>(f: F) -> F::Output {
        tokio_executor::enter()
            .expect("block_on enter")
            .block_on(f)
    }

    #[test]
    fn want_ready() {
        let (mut gv, mut tk) = new();

        tk.want();

        block_on(gv.want()).unwrap();
    }

    #[test]
    fn want_notify_0() {
        let (mut gv, mut tk) = new();
        let (tx, rx) = oneshot::channel();

        thread::spawn(move || {
            tk.want();
            // use a oneshot to keep this thread alive
            // until other thread was notified of want
            block_on(rx).expect("rx");
        });

        block_on(gv.want()).expect("want");

        assert!(gv.is_wanting(), "still wanting after poll_want success");
        assert!(gv.give(), "give is true when wanting");

        assert!(!gv.is_wanting(), "no longer wanting after give");
        assert!(!gv.is_canceled(), "give doesn't cancel");

        assert!(!gv.give(), "give is false if not wanting");

        tx.send(()).expect("tx");
    }

    /*
    /// This tests that if the Giver moves tasks after parking,
    /// it will still wake up the correct task.
    #[test]
    fn want_notify_moving_tasks() {
        use std::sync::Arc;
        use futures::executor::{spawn, Notify, NotifyHandle};

        struct WantNotify;

        impl Notify for WantNotify {
            fn notify(&self, _id: usize) {
            }
        }

        fn n() -> NotifyHandle {
            Arc::new(WantNotify).into()
        }

        let (mut gv, mut tk) = new();

        let mut s = spawn(poll_fn(move || {
            gv.poll_want()
        }));

        // Register with t1 as the task::current()
        let t1 = n();
        assert!(s.poll_future_notify(&t1, 1).unwrap().is_not_ready());

        thread::spawn(move || {
            thread::sleep(::std::time::Duration::from_millis(100));
            tk.want();
        });

        // And now, move to a ThreadNotify task.
        s.into_inner().wait().expect("poll_want");
    }
    */

    #[test]
    fn cancel() {
        // explicit
        let (mut gv, mut tk) = new();

        assert!(!gv.is_canceled());

        tk.cancel();

        assert!(gv.is_canceled());
        block_on(gv.want()).unwrap_err();

        // implicit
        let (mut gv, tk) = new();

        assert!(!gv.is_canceled());

        drop(tk);

        assert!(gv.is_canceled());
        block_on(gv.want()).unwrap_err();

        // notifies
        let (mut gv, tk) = new();

        thread::spawn(move || {
            let _tk = tk;
            // and dropped
        });

        block_on(gv.want()).unwrap_err();
    }

    /*
    #[test]
    fn stress() {
        let nthreads = 5;
        let nwants = 100;

        for _ in 0..nthreads {
            let (mut gv, mut tk) = new();
            let (mut tx, mut rx) = mpsc::channel(0);

            // rx thread
            thread::spawn(move || {
                let mut cnt = 0;
                poll_fn(move || {
                    while cnt < nwants {
                        let n = match rx.poll().expect("rx poll") {
                            Async::Ready(n) => n.expect("rx opt"),
                            Async::NotReady => {
                                tk.want();
                                return Ok(Async::NotReady);
                            },
                        };
                        assert_eq!(cnt, n);
                        cnt += 1;
                    }
                    Ok::<_, ()>(Async::Ready(()))
                }).wait().expect("rx wait");
            });

            // tx thread
            thread::spawn(move || {
                let mut cnt = 0;
                let nsent = poll_fn(move || {
                    loop {
                        while let Ok(()) = tx.try_send(cnt) {
                            cnt += 1;
                        }
                        match gv.poll_want() {
                            Ok(Async::Ready(_)) => (),
                            Ok(Async::NotReady) => return Ok::<_, ()>(Async::NotReady),
                            Err(_) => return Ok(Async::Ready(cnt)),
                        }
                    }
                }).wait().expect("tx wait");

                assert_eq!(nsent, nwants);
            }).join().expect("thread join");
        }
    }
    */
}

[ Dauer der Verarbeitung: 0.43 Sekunden  ]