Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  io_async_fd.rs   Sprache: unbekannt

 
#![warn(rust_2018_idioms)]
#![cfg(all(unix, feature = "full"))]

use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::{
    atomic::{AtomicBool, Ordering},
    Arc,
};
use std::time::Duration;
use std::{
    future::Future,
    io::{self, ErrorKind, Read, Write},
    task::{Context, Waker},
};

use nix::unistd::{read, write};

use futures::poll;

use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard};
use tokio::io::Interest;
use tokio_test::{assert_err, assert_pending};

struct TestWaker {
    inner: Arc<TestWakerInner>,
    waker: Waker,
}

#[derive(Default)]
struct TestWakerInner {
    awoken: AtomicBool,
}

impl futures::task::ArcWake for TestWakerInner {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        arc_self.awoken.store(true, Ordering::SeqCst);
    }
}

impl TestWaker {
    fn new() -> Self {
        let inner: Arc<TestWakerInner> = Default::default();

        Self {
            inner: inner.clone(),
            waker: futures::task::waker(inner),
        }
    }

    fn awoken(&self) -> bool {
        self.inner.awoken.swap(false, Ordering::SeqCst)
    }

    fn context(&self) -> Context<'_> {
        Context::from_waker(&self.waker)
    }
}

#[derive(Debug)]
struct FileDescriptor {
    fd: std::os::fd::OwnedFd,
}

impl AsRawFd for FileDescriptor {
    fn as_raw_fd(&self) -> RawFd {
        self.fd.as_raw_fd()
    }
}

impl Read for &FileDescriptor {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        read(self.fd.as_raw_fd(), buf).map_err(io::Error::from)
    }
}

impl Read for FileDescriptor {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        (self as &Self).read(buf)
    }
}

impl Write for &FileDescriptor {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        write(&self.fd, buf).map_err(io::Error::from)
    }

    fn flush(&mut self) -> io::Result<()> {
        Ok(())
    }
}

impl Write for FileDescriptor {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        (self as &Self).write(buf)
    }

    fn flush(&mut self) -> io::Result<()> {
        (self as &Self).flush()
    }
}

fn set_nonblocking(fd: RawFd) {
    use nix::fcntl::{OFlag, F_GETFL, F_SETFL};

    let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)");

    if flags < 0 {
        panic!(
            "bad return value from fcntl(F_GETFL): {} ({:?})",
            flags,
            nix::Error::last()
        );
    }

    let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;

    nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFD)");
}

fn socketpair() -> (FileDescriptor, FileDescriptor) {
    use nix::sys::socket::{self, AddressFamily, SockFlag, SockType};

    let (fd_a, fd_b) = socket::socketpair(
        AddressFamily::Unix,
        SockType::Stream,
        None,
        SockFlag::empty(),
    )
    .expect("socketpair");
    let fds = (FileDescriptor { fd: fd_a }, FileDescriptor { fd: fd_b });

    set_nonblocking(fds.0.fd.as_raw_fd());
    set_nonblocking(fds.1.fd.as_raw_fd());

    fds
}

fn drain(mut fd: &FileDescriptor) {
    let mut buf = [0u8; 512];
    #[allow(clippy::unused_io_amount)]
    loop {
        match fd.read(&mut buf[..]) {
            Err(e) if e.kind() == ErrorKind::WouldBlock => break,
            Ok(0) => panic!("unexpected EOF"),
            Err(e) => panic!("unexpected error: {:?}", e),
            Ok(_) => continue,
        }
    }
}

#[tokio::test]
async fn initially_writable() {
    let (a, b) = socketpair();

    let afd_a = AsyncFd::new(a).unwrap();
    let afd_b = AsyncFd::new(b).unwrap();

    afd_a.writable().await.unwrap().clear_ready();
    afd_b.writable().await.unwrap().clear_ready();

    tokio::select! {
        biased;
        _ = tokio::time::sleep(Duration::from_millis(10)) => {},
        _ = afd_a.readable() => panic!("Unexpected readable state"),
        _ = afd_b.readable() => panic!("Unexpected readable state"),
    }
}

#[tokio::test]
async fn reset_readable() {
    let (a, mut b) = socketpair();

    let afd_a = AsyncFd::new(a).unwrap();

    let readable = afd_a.readable();
    tokio::pin!(readable);

    tokio::select! {
        _ = readable.as_mut() => panic!(),
        _ = tokio::time::sleep(Duration::from_millis(10)) => {}
    }

    b.write_all(b"0").unwrap();

    let mut guard = readable.await.unwrap();

    guard
        .try_io(|_| afd_a.get_ref().read(&mut [0]))
        .unwrap()
        .unwrap();

    // `a` is not readable, but the reactor still thinks it is
    // (because we have not observed a not-ready error yet)
    afd_a.readable().await.unwrap().retain_ready();

    // Explicitly clear the ready state
    guard.clear_ready();

    let readable = afd_a.readable();
    tokio::pin!(readable);

    tokio::select! {
        _ = readable.as_mut() => panic!(),
        _ = tokio::time::sleep(Duration::from_millis(10)) => {}
    }

    b.write_all(b"0").unwrap();

    // We can observe the new readable event
    afd_a.readable().await.unwrap().clear_ready();
}

#[tokio::test]
async fn reset_writable() {
    let (a, b) = socketpair();

    let afd_a = AsyncFd::new(a).unwrap();

    let mut guard = afd_a.writable().await.unwrap();

    // Write until we get a WouldBlock. This also clears the ready state.
    while guard
        .try_io(|_| afd_a.get_ref().write(&[0; 512][..]))
        .is_ok()
    {}

    // Writable state should be cleared now.
    let writable = afd_a.writable();
    tokio::pin!(writable);

    tokio::select! {
        _ = writable.as_mut() => panic!(),
        _ = tokio::time::sleep(Duration::from_millis(10)) => {}
    }

    // Read from the other side; we should become writable now.
    drain(&b);

    let _ = writable.await.unwrap();
}

#[derive(Debug)]
struct ArcFd<T>(Arc<T>);
impl<T: AsRawFd> AsRawFd for ArcFd<T> {
    fn as_raw_fd(&self) -> RawFd {
        self.0.as_raw_fd()
    }
}

#[tokio::test]
async fn drop_closes() {
    let (a, mut b) = socketpair();

    let afd_a = AsyncFd::new(a).unwrap();

    assert_eq!(
        ErrorKind::WouldBlock,
        b.read(&mut [0]).err().unwrap().kind()
    );

    std::mem::drop(afd_a);

    assert_eq!(0, b.read(&mut [0]).unwrap());

    // into_inner does not close the fd

    let (a, mut b) = socketpair();
    let afd_a = AsyncFd::new(a).unwrap();
    let _a: FileDescriptor = afd_a.into_inner();

    assert_eq!(
        ErrorKind::WouldBlock,
        b.read(&mut [0]).err().unwrap().kind()
    );

    // Drop closure behavior is delegated to the inner object
    let (a, mut b) = socketpair();
    let arc_fd = Arc::new(a);
    let afd_a = AsyncFd::new(ArcFd(arc_fd.clone())).unwrap();
    std::mem::drop(afd_a);

    assert_eq!(
        ErrorKind::WouldBlock,
        b.read(&mut [0]).err().unwrap().kind()
    );

    std::mem::drop(arc_fd); // suppress unnecessary clone clippy warning
}

#[tokio::test]
async fn reregister() {
    let (a, _b) = socketpair();

    let afd_a = AsyncFd::new(a).unwrap();
    let a = afd_a.into_inner();
    AsyncFd::new(a).unwrap();
}

#[tokio::test]
async fn try_io() {
    let (a, mut b) = socketpair();

    b.write_all(b"0").unwrap();

    let afd_a = AsyncFd::new(a).unwrap();

    let mut guard = afd_a.readable().await.unwrap();

    afd_a.get_ref().read_exact(&mut [0]).unwrap();

    // Should not clear the readable state
    let _ = guard.try_io(|_| Ok(()));

    // Still readable...
    let _ = afd_a.readable().await.unwrap();

    // Should clear the readable state
    let _ = guard.try_io(|_| io::Result::<()>::Err(ErrorKind::WouldBlock.into()));

    // Assert not readable
    let readable = afd_a.readable();
    tokio::pin!(readable);

    tokio::select! {
        _ = readable.as_mut() => panic!(),
        _ = tokio::time::sleep(Duration::from_millis(10)) => {}
    }

    // Write something down b again and make sure we're reawoken
    b.write_all(b"0").unwrap();
    let _ = readable.await.unwrap();
}

#[tokio::test]
async fn multiple_waiters() {
    let (a, mut b) = socketpair();
    let afd_a = Arc::new(AsyncFd::new(a).unwrap());

    let barrier = Arc::new(tokio::sync::Barrier::new(11));

    let mut tasks = Vec::new();
    for _ in 0..10 {
        let afd_a = afd_a.clone();
        let barrier = barrier.clone();

        let f = async move {
            let notify_barrier = async {
                barrier.wait().await;
                futures::future::pending::<()>().await;
            };

            tokio::select! {
                biased;
                guard = afd_a.readable() => {
                    tokio::task::yield_now().await;
                    guard.unwrap().clear_ready()
                },
                _ = notify_barrier => unreachable!(),
            }

            std::mem::drop(afd_a);
        };

        tasks.push(tokio::spawn(f));
    }

    let mut all_tasks = futures::future::try_join_all(tasks);

    tokio::select! {
        r = std::pin::Pin::new(&mut all_tasks) => {
            r.unwrap(); // propagate panic
            panic!("Tasks exited unexpectedly")
        },
        _ = barrier.wait() => {}
    };

    b.write_all(b"0").unwrap();

    all_tasks.await.unwrap();
}

#[tokio::test]
async fn poll_fns() {
    let (a, b) = socketpair();
    let afd_a = Arc::new(AsyncFd::new(a).unwrap());
    let afd_b = Arc::new(AsyncFd::new(b).unwrap());

    // Fill up the write side of A
    while afd_a.get_ref().write(&[0; 512]).is_ok() {}

    let waker = TestWaker::new();

    assert_pending!(afd_a.as_ref().poll_read_ready(&mut waker.context()));

    let afd_a_2 = afd_a.clone();
    let r_barrier = Arc::new(tokio::sync::Barrier::new(2));
    let barrier_clone = r_barrier.clone();

    let read_fut = tokio::spawn(async move {
        // Move waker onto this task first
        assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
            .as_ref()
            .poll_read_ready(cx))));
        barrier_clone.wait().await;

        let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await;
    });

    let afd_a_2 = afd_a.clone();
    let w_barrier = Arc::new(tokio::sync::Barrier::new(2));
    let barrier_clone = w_barrier.clone();

    let mut write_fut = tokio::spawn(async move {
        // Move waker onto this task first
        assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
            .as_ref()
            .poll_write_ready(cx))));
        barrier_clone.wait().await;

        let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await;
    });

    r_barrier.wait().await;
    w_barrier.wait().await;

    let readable = afd_a.readable();
    tokio::pin!(readable);

    tokio::select! {
        _ = &mut readable => unreachable!(),
        _ = tokio::task::yield_now() => {}
    }

    // Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly
    afd_b.get_ref().write_all(b"0").unwrap();

    let _ = tokio::join!(readable, read_fut);

    // Our original waker should _not_ be awoken (poll_read_ready retains only the last context)
    assert!(!waker.awoken());

    // The writable side should not be awoken
    tokio::select! {
        _ = &mut write_fut => unreachable!(),
        _ = tokio::time::sleep(Duration::from_millis(5)) => {}
    }

    // Make it writable now
    drain(afd_b.get_ref());

    // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side)
    let _ = write_fut.await;
}

fn assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>> {
    let mut pinned = Box::pin(f);

    assert_pending!(pinned
        .as_mut()
        .poll(&mut Context::from_waker(futures::task::noop_waker_ref())));

    pinned
}

fn rt() -> tokio::runtime::Runtime {
    tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap()
}

#[test]
fn driver_shutdown_wakes_currently_pending() {
    let rt = rt();

    let (a, _b) = socketpair();
    let afd_a = {
        let _enter = rt.enter();
        AsyncFd::new(a).unwrap()
    };

    let readable = assert_pending(afd_a.readable());

    std::mem::drop(rt);

    // The future was initialized **before** dropping the rt
    assert_err!(futures::executor::block_on(readable));

    // The future is initialized **after** dropping the rt.
    assert_err!(futures::executor::block_on(afd_a.readable()));
}

#[test]
fn driver_shutdown_wakes_future_pending() {
    let rt = rt();

    let (a, _b) = socketpair();
    let afd_a = {
        let _enter = rt.enter();
        AsyncFd::new(a).unwrap()
    };

    std::mem::drop(rt);

    assert_err!(futures::executor::block_on(afd_a.readable()));
}

#[test]
fn driver_shutdown_wakes_pending_race() {
    // TODO: make this a loom test
    for _ in 0..100 {
        let rt = rt();

        let (a, _b) = socketpair();
        let afd_a = {
            let _enter = rt.enter();
            AsyncFd::new(a).unwrap()
        };

        let _ = std::thread::spawn(move || std::mem::drop(rt));

        // This may or may not return an error (but will be awoken)
        let _ = futures::executor::block_on(afd_a.readable());

        // However retrying will always return an error
        assert_err!(futures::executor::block_on(afd_a.readable()));
    }
}

async fn poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
    futures::future::poll_fn(|cx| fd.poll_read_ready(cx)).await
}

async fn poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
    futures::future::poll_fn(|cx| fd.poll_write_ready(cx)).await
}

#[test]
fn driver_shutdown_wakes_currently_pending_polls() {
    let rt = rt();

    let (a, _b) = socketpair();
    let afd_a = {
        let _enter = rt.enter();
        AsyncFd::new(a).unwrap()
    };

    while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable

    let readable = assert_pending(poll_readable(&afd_a));
    let writable = assert_pending(poll_writable(&afd_a));

    std::mem::drop(rt);

    // Attempting to poll readiness when the rt is dropped is an error
    assert_err!(futures::executor::block_on(readable));
    assert_err!(futures::executor::block_on(writable));
}

#[test]
fn driver_shutdown_wakes_poll() {
    let rt = rt();

    let (a, _b) = socketpair();
    let afd_a = {
        let _enter = rt.enter();
        AsyncFd::new(a).unwrap()
    };

    std::mem::drop(rt);

    assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
    assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
}

#[test]
fn driver_shutdown_then_clear_readiness() {
    let rt = rt();

    let (a, _b) = socketpair();
    let afd_a = {
        let _enter = rt.enter();
        AsyncFd::new(a).unwrap()
    };

    let mut write_ready = rt.block_on(afd_a.writable()).unwrap();

    std::mem::drop(rt);

    write_ready.clear_ready();
}

#[test]
fn driver_shutdown_wakes_poll_race() {
    // TODO: make this a loom test
    for _ in 0..100 {
        let rt = rt();

        let (a, _b) = socketpair();
        let afd_a = {
            let _enter = rt.enter();
            AsyncFd::new(a).unwrap()
        };

        while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable

        let _ = std::thread::spawn(move || std::mem::drop(rt));

        // The poll variants will always return an error in this case
        assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
        assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
    }
}

#[tokio::test]
#[cfg(any(target_os = "linux", target_os = "android"))]
async fn priority_event_on_oob_data() {
    use std::net::SocketAddr;

    use tokio::io::Interest;

    let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();

    let listener = std::net::TcpListener::bind(addr).unwrap();
    let addr = listener.local_addr().unwrap();

    let client = std::net::TcpStream::connect(addr).unwrap();
    let client = AsyncFd::with_interest(client, Interest::PRIORITY).unwrap();

    let (stream, _) = listener.accept().unwrap();

    // Sending out of band data should trigger priority event.
    send_oob_data(&stream, b"hello").unwrap();

    let _ = client.ready(Interest::PRIORITY).await.unwrap();
}

#[cfg(any(target_os = "linux", target_os = "android"))]
fn send_oob_data<S: AsRawFd>(stream: &S, data: &[u8]) -> io::Result<usize> {
    unsafe {
        let res = libc::send(
            stream.as_raw_fd(),
            data.as_ptr().cast(),
            data.len(),
            libc::MSG_OOB,
        );
        if res == -1 {
            Err(io::Error::last_os_error())
        } else {
            Ok(res as usize)
        }
    }
}

#[tokio::test]
async fn clear_ready_matching_clears_ready() {
    use tokio::io::{Interest, Ready};

    let (a, mut b) = socketpair();

    let afd_a = AsyncFd::new(a).unwrap();
    b.write_all(b"0").unwrap();

    let mut guard = afd_a
        .ready(Interest::READABLE | Interest::WRITABLE)
        .await
        .unwrap();

    assert_eq!(guard.ready(), Ready::READABLE | Ready::WRITABLE);

    guard.clear_ready_matching(Ready::READABLE);
    assert_eq!(guard.ready(), Ready::WRITABLE);

    guard.clear_ready_matching(Ready::WRITABLE);
    assert_eq!(guard.ready(), Ready::EMPTY);
}

#[tokio::test]
async fn clear_ready_matching_clears_ready_mut() {
    use tokio::io::{Interest, Ready};

    let (a, mut b) = socketpair();

    let mut afd_a = AsyncFd::new(a).unwrap();
    b.write_all(b"0").unwrap();

    let mut guard = afd_a
        .ready_mut(Interest::READABLE | Interest::WRITABLE)
        .await
        .unwrap();

    assert_eq!(guard.ready(), Ready::READABLE | Ready::WRITABLE);

    guard.clear_ready_matching(Ready::READABLE);
    assert_eq!(guard.ready(), Ready::WRITABLE);

    guard.clear_ready_matching(Ready::WRITABLE);
    assert_eq!(guard.ready(), Ready::EMPTY);
}

#[tokio::test]
#[cfg(target_os = "linux")]
async fn await_error_readiness_timestamping() {
    use std::net::{Ipv4Addr, SocketAddr};

    use tokio::io::{Interest, Ready};

    let address_a = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
    let address_b = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));

    let socket = std::net::UdpSocket::bind(address_a).unwrap();

    socket.set_nonblocking(true).unwrap();

    // configure send timestamps
    configure_timestamping_socket(&socket).unwrap();

    socket.connect(address_b).unwrap();

    let fd = AsyncFd::new(socket).unwrap();

    tokio::select! {
        _ = fd.ready(Interest::ERROR) => panic!(),
        _ = tokio::time::sleep(Duration::from_millis(10)) => {}
    }

    let buf = b"hello there";
    fd.get_ref().send(buf).unwrap();

    // the send timestamp should now be in the error queue
    let guard = fd.ready(Interest::ERROR).await.unwrap();
    assert_eq!(guard.ready(), Ready::ERROR);
}

#[cfg(target_os = "linux")]
fn configure_timestamping_socket(udp_socket: &std::net::UdpSocket) -> std::io::Result<libc::c_int> {
    // enable software timestamping, and specifically software send timestamping
    let options = libc::SOF_TIMESTAMPING_SOFTWARE | libc::SOF_TIMESTAMPING_TX_SOFTWARE;

    let res = unsafe {
        libc::setsockopt(
            udp_socket.as_raw_fd(),
            libc::SOL_SOCKET,
            libc::SO_TIMESTAMP,
            &options as *const _ as *const libc::c_void,
            std::mem::size_of_val(&options) as libc::socklen_t,
        )
    };

    if res == -1 {
        Err(std::io::Error::last_os_error())
    } else {
        Ok(res)
    }
}

#[tokio::test]
#[cfg(target_os = "linux")]
async fn await_error_readiness_invalid_address() {
    use std::net::{Ipv4Addr, SocketAddr};
    use tokio::io::{Interest, Ready};

    let socket_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
    let socket = std::net::UdpSocket::bind(socket_addr).unwrap();
    let socket_fd = socket.as_raw_fd();

    // Enable IP_RECVERR option to receive error messages
    // https://man7.org/linux/man-pages/man7/ip.7.html has some extra information
    let recv_err: libc::c_int = 1;
    unsafe {
        let res = libc::setsockopt(
            socket.as_raw_fd(),
            libc::SOL_IP,
            libc::IP_RECVERR,
            &recv_err as *const _ as *const libc::c_void,
            std::mem::size_of_val(&recv_err) as libc::socklen_t,
        );
        if res == -1 {
            panic!("{:?}", std::io::Error::last_os_error());
        }
    }

    // Spawn a separate thread for sending messages
    tokio::spawn(async move {
        // Set the destination address. This address is invalid in this context. the OS will notice
        // that nobody is listening on port this port. Normally this is ignored (UDP is "fire and forget"),
        // but because IP_RECVERR is enabled, the error will actually be reported to the sending socket
        let mut dest_addr =
            unsafe { std::mem::MaybeUninit::<libc::sockaddr_in>::zeroed().assume_init() };
        dest_addr.sin_family = libc::AF_INET as _;
        // based on https://en.wikipedia.org/wiki/Ephemeral_port, we should pick a port number
        // below 1024 to guarantee that other tests don't select this port by accident when they
        // use port 0 to select an ephemeral port.
        dest_addr.sin_port = 512u16.to_be(); // Destination port

        // Prepare the message data
        let message = "Hello, Socket!";

        // Prepare the message structure for sendmsg
        let mut iov = libc::iovec {
            iov_base: message.as_ptr() as *mut libc::c_void,
            iov_len: message.len(),
        };

        // Prepare the destination address for the sendmsg call
        let dest_sockaddr: *const libc::sockaddr = &dest_addr as *const _ as *const libc::sockaddr;
        let dest_addrlen: libc::socklen_t = std::mem::size_of_val(&dest_addr) as libc::socklen_t;

        let mut msg: libc::msghdr = unsafe { std::mem::MaybeUninit::zeroed().assume_init() };
        msg.msg_name = dest_sockaddr as *mut libc::c_void;
        msg.msg_namelen = dest_addrlen;
        msg.msg_iov = &mut iov;
        msg.msg_iovlen = 1;

        if unsafe { libc::sendmsg(socket_fd, &msg, 0) } == -1 {
            panic!("{:?}", std::io::Error::last_os_error())
        }
    });

    let fd = AsyncFd::new(socket).unwrap();

    let guard = fd.ready(Interest::ERROR).await.unwrap();
    assert_eq!(guard.ready(), Ready::ERROR);
}

#[derive(Debug, PartialEq, Eq)]
struct InvalidSource;

impl AsRawFd for InvalidSource {
    fn as_raw_fd(&self) -> RawFd {
        -1
    }
}

#[tokio::test]
async fn try_new() {
    let original = Arc::new(InvalidSource);

    let error = AsyncFd::try_new(original.clone()).unwrap_err();
    let (returned, _cause) = error.into_parts();

    assert!(Arc::ptr_eq(&original, &returned));
}

#[tokio::test]
async fn try_with_interest() {
    let original = Arc::new(InvalidSource);

    let error = AsyncFd::try_with_interest(original.clone(), Interest::READABLE).unwrap_err();
    let (returned, _cause) = error.into_parts();

    assert!(Arc::ptr_eq(&original, &returned));
}

[ Dauer der Verarbeitung: 0.4 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge