Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  tcp_stream.rs   Sprache: unbekannt

 
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support bind

use tokio::io::{AsyncReadExt, AsyncWriteExt, Interest};
use tokio::net::{TcpListener, TcpStream};
use tokio::try_join;
use tokio_test::task;
use tokio_test::{assert_ok, assert_pending, assert_ready_ok};

use std::io;
use std::task::Poll;
use std::time::Duration;

use futures::future::poll_fn;

#[tokio::test]
async fn set_linger() {
    let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();

    let stream = TcpStream::connect(listener.local_addr().unwrap())
        .await
        .unwrap();

    assert_ok!(stream.set_linger(Some(Duration::from_secs(1))));
    assert_eq!(stream.linger().unwrap().unwrap().as_secs(), 1);

    assert_ok!(stream.set_linger(None));
    assert!(stream.linger().unwrap().is_none());
}

#[tokio::test]
async fn try_read_write() {
    const DATA: &[u8] = b"this is some data to write to the socket";

    // Create listener
    let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();

    // Create socket pair
    let client = TcpStream::connect(listener.local_addr().unwrap())
        .await
        .unwrap();
    let (server, _) = listener.accept().await.unwrap();
    let mut written = DATA.to_vec();

    // Track the server receiving data
    let mut readable = task::spawn(server.readable());
    assert_pending!(readable.poll());

    // Write data.
    client.writable().await.unwrap();
    assert_eq!(DATA.len(), client.try_write(DATA).unwrap());

    // The task should be notified
    while !readable.is_woken() {
        tokio::task::yield_now().await;
    }

    // Fill the write buffer using non-vectored I/O
    loop {
        // Still ready
        let mut writable = task::spawn(client.writable());
        assert_ready_ok!(writable.poll());

        match client.try_write(DATA) {
            Ok(n) => written.extend(&DATA[..n]),
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                break;
            }
            Err(e) => panic!("error = {:?}", e),
        }
    }

    {
        // Write buffer full
        let mut writable = task::spawn(client.writable());
        assert_pending!(writable.poll());

        // Drain the socket from the server end using non-vectored I/O
        let mut read = vec![0; written.len()];
        let mut i = 0;

        while i < read.len() {
            server.readable().await.unwrap();

            match server.try_read(&mut read[i..]) {
                Ok(n) => i += n,
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
                Err(e) => panic!("error = {:?}", e),
            }
        }

        assert_eq!(read, written);
    }

    written.clear();
    client.writable().await.unwrap();

    // Fill the write buffer using vectored I/O
    let data_bufs: Vec<_> = DATA.chunks(10).map(io::IoSlice::new).collect();
    loop {
        // Still ready
        let mut writable = task::spawn(client.writable());
        assert_ready_ok!(writable.poll());

        match client.try_write_vectored(&data_bufs) {
            Ok(n) => written.extend(&DATA[..n]),
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                break;
            }
            Err(e) => panic!("error = {:?}", e),
        }
    }

    {
        // Write buffer full
        let mut writable = task::spawn(client.writable());
        assert_pending!(writable.poll());

        // Drain the socket from the server end using vectored I/O
        let mut read = vec![0; written.len()];
        let mut i = 0;

        while i < read.len() {
            server.readable().await.unwrap();

            let mut bufs: Vec<_> = read[i..]
                .chunks_mut(0x10000)
                .map(io::IoSliceMut::new)
                .collect();
            match server.try_read_vectored(&mut bufs) {
                Ok(n) => i += n,
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
                Err(e) => panic!("error = {:?}", e),
            }
        }

        assert_eq!(read, written);
    }

    // Now, we listen for shutdown
    drop(client);

    loop {
        let ready = server.ready(Interest::READABLE).await.unwrap();

        if ready.is_read_closed() {
            return;
        } else {
            tokio::task::yield_now().await;
        }
    }
}

#[test]
fn buffer_not_included_in_future() {
    use std::mem;

    const N: usize = 4096;

    let fut = async {
        let stream = TcpStream::connect("127.0.0.1:8080").await.unwrap();

        loop {
            stream.readable().await.unwrap();

            let mut buf = [0; N];
            let n = stream.try_read(&mut buf[..]).unwrap();

            if n == 0 {
                break;
            }
        }
    };

    let n = mem::size_of_val(&fut);
    assert!(n < 1000);
}

macro_rules! assert_readable_by_polling {
    ($stream:expr) => {
        assert_ok!(poll_fn(|cx| $stream.poll_read_ready(cx)).await);
    };
}

macro_rules! assert_not_readable_by_polling {
    ($stream:expr) => {
        poll_fn(|cx| {
            assert_pending!($stream.poll_read_ready(cx));
            Poll::Ready(())
        })
        .await;
    };
}

macro_rules! assert_writable_by_polling {
    ($stream:expr) => {
        assert_ok!(poll_fn(|cx| $stream.poll_write_ready(cx)).await);
    };
}

macro_rules! assert_not_writable_by_polling {
    ($stream:expr) => {
        poll_fn(|cx| {
            assert_pending!($stream.poll_write_ready(cx));
            Poll::Ready(())
        })
        .await;
    };
}

#[tokio::test]
async fn poll_read_ready() {
    let (mut client, mut server) = create_pair().await;

    // Initial state - not readable.
    assert_not_readable_by_polling!(server);

    // There is data in the buffer - readable.
    assert_ok!(client.write_all(b"ping").await);
    assert_readable_by_polling!(server);

    // Readable until calls to `poll_read` return `Poll::Pending`.
    let mut buf = [0u8; 4];
    assert_ok!(server.read_exact(&mut buf).await);
    assert_readable_by_polling!(server);
    read_until_pending(&mut server);
    assert_not_readable_by_polling!(server);

    // Detect the client disconnect.
    drop(client);
    assert_readable_by_polling!(server);
}

#[tokio::test]
async fn poll_write_ready() {
    let (mut client, server) = create_pair().await;

    // Initial state - writable.
    assert_writable_by_polling!(client);

    // No space to write - not writable.
    write_until_pending(&mut client);
    assert_not_writable_by_polling!(client);

    // Detect the server disconnect.
    drop(server);
    assert_writable_by_polling!(client);
}

async fn create_pair() -> (TcpStream, TcpStream) {
    let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
    let addr = assert_ok!(listener.local_addr());
    let (client, (server, _)) = assert_ok!(try_join!(TcpStream::connect(&addr), listener.accept()));
    (client, server)
}

fn read_until_pending(stream: &mut TcpStream) -> usize {
    let mut buf = vec![0u8; 1024 * 1024];
    let mut total = 0;
    loop {
        match stream.try_read(&mut buf) {
            Ok(n) => total += n,
            Err(err) => {
                assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
                break;
            }
        }
    }
    total
}

fn write_until_pending(stream: &mut TcpStream) -> usize {
    let buf = vec![0u8; 1024 * 1024];
    let mut total = 0;
    loop {
        match stream.try_write(&buf) {
            Ok(n) => total += n,
            Err(err) => {
                assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
                break;
            }
        }
    }
    total
}

#[tokio::test]
async fn try_read_buf() {
    const DATA: &[u8] = b"this is some data to write to the socket";

    // Create listener
    let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();

    // Create socket pair
    let client = TcpStream::connect(listener.local_addr().unwrap())
        .await
        .unwrap();
    let (server, _) = listener.accept().await.unwrap();
    let mut written = DATA.to_vec();

    // Track the server receiving data
    let mut readable = task::spawn(server.readable());
    assert_pending!(readable.poll());

    // Write data.
    client.writable().await.unwrap();
    assert_eq!(DATA.len(), client.try_write(DATA).unwrap());

    // The task should be notified
    while !readable.is_woken() {
        tokio::task::yield_now().await;
    }

    // Fill the write buffer
    loop {
        // Still ready
        let mut writable = task::spawn(client.writable());
        assert_ready_ok!(writable.poll());

        match client.try_write(DATA) {
            Ok(n) => written.extend(&DATA[..n]),
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                break;
            }
            Err(e) => panic!("error = {:?}", e),
        }
    }

    {
        // Write buffer full
        let mut writable = task::spawn(client.writable());
        assert_pending!(writable.poll());

        // Drain the socket from the server end
        let mut read = Vec::with_capacity(written.len());
        let mut i = 0;

        while i < read.capacity() {
            server.readable().await.unwrap();

            match server.try_read_buf(&mut read) {
                Ok(n) => i += n,
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
                Err(e) => panic!("error = {:?}", e),
            }
        }

        assert_eq!(read, written);
    }

    // Now, we listen for shutdown
    drop(client);

    loop {
        let ready = server.ready(Interest::READABLE).await.unwrap();

        if ready.is_read_closed() {
            return;
        } else {
            tokio::task::yield_now().await;
        }
    }
}

// read_closed is a best effort event, so test only for no false positives.
#[tokio::test]
async fn read_closed() {
    let (client, mut server) = create_pair().await;

    let mut ready_fut = task::spawn(client.ready(Interest::READABLE));
    assert_pending!(ready_fut.poll());

    assert_ok!(server.write_all(b"ping").await);

    let ready_event = assert_ok!(ready_fut.await);

    assert!(!ready_event.is_read_closed());
}

// write_closed is a best effort event, so test only for no false positives.
#[tokio::test]
async fn write_closed() {
    let (mut client, mut server) = create_pair().await;

    // Fill the write buffer.
    let write_size = write_until_pending(&mut client);
    let mut ready_fut = task::spawn(client.ready(Interest::WRITABLE));
    assert_pending!(ready_fut.poll());

    // Drain the socket to make client writable.
    let mut read_size = 0;
    while read_size < write_size {
        server.readable().await.unwrap();
        read_size += read_until_pending(&mut server);
    }

    let ready_event = assert_ok!(ready_fut.await);

    assert!(!ready_event.is_write_closed());
}

[ Dauer der Verarbeitung: 0.25 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge