Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  udp.rs   Sprache: unbekannt

 
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi does not support bind or UDP

use futures::future::poll_fn;
use std::io;
use std::sync::Arc;
use tokio::{io::ReadBuf, net::UdpSocket};
use tokio_test::assert_ok;

const MSG: &[u8] = b"hello";
const MSG_LEN: usize = MSG.len();

#[tokio::test]
async fn send_recv() -> std::io::Result<()> {
    let sender = UdpSocket::bind("127.0.0.1:0").await?;
    let receiver = UdpSocket::bind("127.0.0.1:0").await?;

    sender.connect(receiver.local_addr()?).await?;
    receiver.connect(sender.local_addr()?).await?;

    sender.send(MSG).await?;

    let mut recv_buf = [0u8; 32];
    let len = receiver.recv(&mut recv_buf[..]).await?;

    assert_eq!(&recv_buf[..len], MSG);
    Ok(())
}

#[tokio::test]
async fn send_recv_poll() -> std::io::Result<()> {
    let sender = UdpSocket::bind("127.0.0.1:0").await?;
    let receiver = UdpSocket::bind("127.0.0.1:0").await?;

    sender.connect(receiver.local_addr()?).await?;
    receiver.connect(sender.local_addr()?).await?;

    poll_fn(|cx| sender.poll_send(cx, MSG)).await?;

    let mut recv_buf = [0u8; 32];
    let mut read = ReadBuf::new(&mut recv_buf);
    poll_fn(|cx| receiver.poll_recv(cx, &mut read)).await?;

    assert_eq!(read.filled(), MSG);
    Ok(())
}

#[tokio::test]
async fn send_to_recv_from() -> std::io::Result<()> {
    let sender = UdpSocket::bind("127.0.0.1:0").await?;
    let receiver = UdpSocket::bind("127.0.0.1:0").await?;

    let receiver_addr = receiver.local_addr()?;
    sender.send_to(MSG, &receiver_addr).await?;

    let mut recv_buf = [0u8; 32];
    let (len, addr) = receiver.recv_from(&mut recv_buf[..]).await?;

    assert_eq!(&recv_buf[..len], MSG);
    assert_eq!(addr, sender.local_addr()?);
    Ok(())
}

#[tokio::test]
async fn send_to_recv_from_poll() -> std::io::Result<()> {
    let sender = UdpSocket::bind("127.0.0.1:0").await?;
    let receiver = UdpSocket::bind("127.0.0.1:0").await?;

    let receiver_addr = receiver.local_addr()?;
    poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;

    let mut recv_buf = [0u8; 32];
    let mut read = ReadBuf::new(&mut recv_buf);
    let addr = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;

    assert_eq!(read.filled(), MSG);
    assert_eq!(addr, sender.local_addr()?);
    Ok(())
}

#[tokio::test]
async fn send_to_peek_from() -> std::io::Result<()> {
    let sender = UdpSocket::bind("127.0.0.1:0").await?;
    let receiver = UdpSocket::bind("127.0.0.1:0").await?;

    let receiver_addr = receiver.local_addr()?;
    poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;

    // peek
    let mut recv_buf = [0u8; 32];
    let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
    assert_eq!(&recv_buf[..n], MSG);
    assert_eq!(addr, sender.local_addr()?);

    // peek
    let mut recv_buf = [0u8; 32];
    let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
    assert_eq!(&recv_buf[..n], MSG);
    assert_eq!(addr, sender.local_addr()?);

    let mut recv_buf = [0u8; 32];
    let (n, addr) = receiver.recv_from(&mut recv_buf).await?;
    assert_eq!(&recv_buf[..n], MSG);
    assert_eq!(addr, sender.local_addr()?);

    Ok(())
}

#[tokio::test]
async fn send_to_try_peek_from() -> std::io::Result<()> {
    let sender = UdpSocket::bind("127.0.0.1:0").await?;
    let receiver = UdpSocket::bind("127.0.0.1:0").await?;

    let receiver_addr = receiver.local_addr()?;
    poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;

    // peek
    let mut recv_buf = [0u8; 32];

    loop {
        match receiver.try_peek_from(&mut recv_buf) {
            Ok((n, addr)) => {
                assert_eq!(&recv_buf[..n], MSG);
                assert_eq!(addr, sender.local_addr()?);
                break;
            }
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                receiver.readable().await?;
            }
            Err(e) => return Err(e),
        }
    }

    // peek
    let mut recv_buf = [0u8; 32];
    let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
    assert_eq!(&recv_buf[..n], MSG);
    assert_eq!(addr, sender.local_addr()?);

    let mut recv_buf = [0u8; 32];
    let (n, addr) = receiver.recv_from(&mut recv_buf).await?;
    assert_eq!(&recv_buf[..n], MSG);
    assert_eq!(addr, sender.local_addr()?);

    Ok(())
}

#[tokio::test]
async fn send_to_peek_from_poll() -> std::io::Result<()> {
    let sender = UdpSocket::bind("127.0.0.1:0").await?;
    let receiver = UdpSocket::bind("127.0.0.1:0").await?;

    let receiver_addr = receiver.local_addr()?;
    poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;

    let mut recv_buf = [0u8; 32];
    let mut read = ReadBuf::new(&mut recv_buf);
    let addr = poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?;

    assert_eq!(read.filled(), MSG);
    assert_eq!(addr, sender.local_addr()?);

    let mut recv_buf = [0u8; 32];
    let mut read = ReadBuf::new(&mut recv_buf);
    poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?;

    assert_eq!(read.filled(), MSG);
    let mut recv_buf = [0u8; 32];
    let mut read = ReadBuf::new(&mut recv_buf);

    poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
    assert_eq!(read.filled(), MSG);
    Ok(())
}

#[tokio::test]
async fn peek_sender() -> std::io::Result<()> {
    let sender = UdpSocket::bind("127.0.0.1:0").await?;
    let receiver = UdpSocket::bind("127.0.0.1:0").await?;

    let sender_addr = sender.local_addr()?;
    let receiver_addr = receiver.local_addr()?;

    let msg = b"Hello, world!";
    sender.send_to(msg, receiver_addr).await?;

    let peeked_sender = receiver.peek_sender().await?;
    assert_eq!(peeked_sender, sender_addr);

    // Assert that `peek_sender()` returns the right sender but
    // doesn't remove from the receive queue.
    let mut recv_buf = [0u8; 32];
    let (read, received_sender) = receiver.recv_from(&mut recv_buf).await?;

    assert_eq!(&recv_buf[..read], msg);
    assert_eq!(received_sender, peeked_sender);

    Ok(())
}

#[tokio::test]
async fn poll_peek_sender() -> std::io::Result<()> {
    let sender = UdpSocket::bind("127.0.0.1:0").await?;
    let receiver = UdpSocket::bind("127.0.0.1:0").await?;

    let sender_addr = sender.local_addr()?;
    let receiver_addr = receiver.local_addr()?;

    let msg = b"Hello, world!";
    poll_fn(|cx| sender.poll_send_to(cx, msg, receiver_addr)).await?;

    let peeked_sender = poll_fn(|cx| receiver.poll_peek_sender(cx)).await?;
    assert_eq!(peeked_sender, sender_addr);

    // Assert that `poll_peek_sender()` returns the right sender but
    // doesn't remove from the receive queue.
    let mut recv_buf = [0u8; 32];
    let mut read = ReadBuf::new(&mut recv_buf);
    let received_sender = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;

    assert_eq!(read.filled(), msg);
    assert_eq!(received_sender, peeked_sender);

    Ok(())
}

#[tokio::test]
async fn try_peek_sender() -> std::io::Result<()> {
    let sender = UdpSocket::bind("127.0.0.1:0").await?;
    let receiver = UdpSocket::bind("127.0.0.1:0").await?;

    let sender_addr = sender.local_addr()?;
    let receiver_addr = receiver.local_addr()?;

    let msg = b"Hello, world!";
    sender.send_to(msg, receiver_addr).await?;

    let peeked_sender = loop {
        match receiver.try_peek_sender() {
            Ok(peeked_sender) => break peeked_sender,
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                receiver.readable().await?;
            }
            Err(e) => return Err(e),
        }
    };

    assert_eq!(peeked_sender, sender_addr);

    // Assert that `try_peek_sender()` returns the right sender but
    // didn't remove from the receive queue.
    let mut recv_buf = [0u8; 32];
    // We already peeked the sender so there must be data in the receive queue.
    let (read, received_sender) = receiver.try_recv_from(&mut recv_buf).unwrap();

    assert_eq!(&recv_buf[..read], msg);
    assert_eq!(received_sender, peeked_sender);

    Ok(())
}

#[tokio::test]
async fn split() -> std::io::Result<()> {
    let socket = UdpSocket::bind("127.0.0.1:0").await?;
    let s = Arc::new(socket);
    let r = s.clone();

    let addr = s.local_addr()?;
    tokio::spawn(async move {
        s.send_to(MSG, &addr).await.unwrap();
    });
    let mut recv_buf = [0u8; 32];
    let (len, _) = r.recv_from(&mut recv_buf[..]).await?;
    assert_eq!(&recv_buf[..len], MSG);
    Ok(())
}

#[tokio::test]
async fn split_chan() -> std::io::Result<()> {
    // setup UdpSocket that will echo all sent items
    let socket = UdpSocket::bind("127.0.0.1:0").await?;
    let addr = socket.local_addr().unwrap();
    let s = Arc::new(socket);
    let r = s.clone();

    let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000);
    tokio::spawn(async move {
        while let Some((bytes, addr)) = rx.recv().await {
            s.send_to(&bytes, &addr).await.unwrap();
        }
    });

    tokio::spawn(async move {
        let mut buf = [0u8; 32];
        loop {
            let (len, addr) = r.recv_from(&mut buf).await.unwrap();
            tx.send((buf[..len].to_vec(), addr)).await.unwrap();
        }
    });

    // test that we can send a value and get back some response
    let sender = UdpSocket::bind("127.0.0.1:0").await?;
    sender.send_to(MSG, addr).await?;
    let mut recv_buf = [0u8; 32];
    let (len, _) = sender.recv_from(&mut recv_buf).await?;
    assert_eq!(&recv_buf[..len], MSG);
    Ok(())
}

#[tokio::test]
async fn split_chan_poll() -> std::io::Result<()> {
    // setup UdpSocket that will echo all sent items
    let socket = UdpSocket::bind("127.0.0.1:0").await?;
    let addr = socket.local_addr().unwrap();
    let s = Arc::new(socket);
    let r = s.clone();

    let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000);
    tokio::spawn(async move {
        while let Some((bytes, addr)) = rx.recv().await {
            poll_fn(|cx| s.poll_send_to(cx, &bytes, addr))
                .await
                .unwrap();
        }
    });

    tokio::spawn(async move {
        let mut recv_buf = [0u8; 32];
        let mut read = ReadBuf::new(&mut recv_buf);
        loop {
            let addr = poll_fn(|cx| r.poll_recv_from(cx, &mut read)).await.unwrap();
            tx.send((read.filled().to_vec(), addr)).await.unwrap();
        }
    });

    // test that we can send a value and get back some response
    let sender = UdpSocket::bind("127.0.0.1:0").await?;
    poll_fn(|cx| sender.poll_send_to(cx, MSG, addr)).await?;

    let mut recv_buf = [0u8; 32];
    let mut read = ReadBuf::new(&mut recv_buf);
    let _ = poll_fn(|cx| sender.poll_recv_from(cx, &mut read)).await?;
    assert_eq!(read.filled(), MSG);
    Ok(())
}

// # Note
//
// This test is purposely written such that each time `sender` sends data on
// the socket, `receiver` awaits the data. On Unix, it would be okay waiting
// until the end of the test to receive all the data. On Windows, this would
// **not** be okay because it's resources are completion based (via IOCP).
// If data is sent and not yet received, attempting to send more data will
// result in `ErrorKind::WouldBlock` until the first operation completes.
#[tokio::test]
async fn try_send_spawn() {
    const MSG2: &[u8] = b"world!";
    const MSG2_LEN: usize = MSG2.len();

    let sender = UdpSocket::bind("127.0.0.1:0").await.unwrap();
    let receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap();

    receiver
        .connect(sender.local_addr().unwrap())
        .await
        .unwrap();

    sender.writable().await.unwrap();

    let sent = &sender
        .try_send_to(MSG, receiver.local_addr().unwrap())
        .unwrap();
    assert_eq!(sent, &MSG_LEN);
    let mut buf = [0u8; 32];
    let mut received = receiver.recv(&mut buf[..]).await.unwrap();

    sender
        .connect(receiver.local_addr().unwrap())
        .await
        .unwrap();
    let sent = &sender.try_send(MSG2).unwrap();
    assert_eq!(sent, &MSG2_LEN);
    received += receiver.recv(&mut buf[..]).await.unwrap();

    std::thread::spawn(move || {
        let sent = &sender.try_send(MSG).unwrap();
        assert_eq!(sent, &MSG_LEN);
    })
    .join()
    .unwrap();
    received += receiver.recv(&mut buf[..]).await.unwrap();

    assert_eq!(received, MSG_LEN * 2 + MSG2_LEN);
}

#[tokio::test]
async fn try_send_recv() {
    // Create listener
    let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();

    // Create socket pair
    let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();

    // Connect the two
    client.connect(server.local_addr().unwrap()).await.unwrap();
    server.connect(client.local_addr().unwrap()).await.unwrap();

    for _ in 0..5 {
        loop {
            client.writable().await.unwrap();

            match client.try_send(b"hello world") {
                Ok(n) => {
                    assert_eq!(n, 11);
                    break;
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
                Err(e) => panic!("{:?}", e),
            }
        }

        loop {
            server.readable().await.unwrap();

            let mut buf = [0; 512];

            match server.try_recv(&mut buf) {
                Ok(n) => {
                    assert_eq!(n, 11);
                    assert_eq!(&buf[0..11], &b"hello world"[..]);
                    break;
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
                Err(e) => panic!("{:?}", e),
            }
        }
    }
}

#[tokio::test]
async fn try_send_to_recv_from() {
    // Create listener
    let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
    let saddr = server.local_addr().unwrap();

    // Create socket pair
    let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
    let caddr = client.local_addr().unwrap();

    for _ in 0..5 {
        loop {
            client.writable().await.unwrap();

            match client.try_send_to(b"hello world", saddr) {
                Ok(n) => {
                    assert_eq!(n, 11);
                    break;
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
                Err(e) => panic!("{:?}", e),
            }
        }

        loop {
            server.readable().await.unwrap();

            let mut buf = [0; 512];

            match server.try_recv_from(&mut buf) {
                Ok((n, addr)) => {
                    assert_eq!(n, 11);
                    assert_eq!(addr, caddr);
                    assert_eq!(&buf[0..11], &b"hello world"[..]);
                    break;
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
                Err(e) => panic!("{:?}", e),
            }
        }
    }
}

#[tokio::test]
async fn try_recv_buf() {
    // Create listener
    let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();

    // Create socket pair
    let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();

    // Connect the two
    client.connect(server.local_addr().unwrap()).await.unwrap();
    server.connect(client.local_addr().unwrap()).await.unwrap();

    for _ in 0..5 {
        loop {
            client.writable().await.unwrap();

            match client.try_send(b"hello world") {
                Ok(n) => {
                    assert_eq!(n, 11);
                    break;
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
                Err(e) => panic!("{:?}", e),
            }
        }

        loop {
            server.readable().await.unwrap();

            let mut buf = Vec::with_capacity(512);

            match server.try_recv_buf(&mut buf) {
                Ok(n) => {
                    assert_eq!(n, 11);
                    assert_eq!(&buf[0..11], &b"hello world"[..]);
                    break;
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
                Err(e) => panic!("{:?}", e),
            }
        }
    }
}

#[tokio::test]
async fn recv_buf() -> std::io::Result<()> {
    let sender = UdpSocket::bind("127.0.0.1:0").await?;
    let receiver = UdpSocket::bind("127.0.0.1:0").await?;

    sender.connect(receiver.local_addr()?).await?;
    receiver.connect(sender.local_addr()?).await?;

    sender.send(MSG).await?;
    let mut recv_buf = Vec::with_capacity(32);
    let len = receiver.recv_buf(&mut recv_buf).await?;

    assert_eq!(len, MSG_LEN);
    assert_eq!(&recv_buf[..len], MSG);
    Ok(())
}

#[tokio::test]
async fn try_recv_buf_from() {
    // Create listener
    let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
    let saddr = server.local_addr().unwrap();

    // Create socket pair
    let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
    let caddr = client.local_addr().unwrap();

    for _ in 0..5 {
        loop {
            client.writable().await.unwrap();

            match client.try_send_to(b"hello world", saddr) {
                Ok(n) => {
                    assert_eq!(n, 11);
                    break;
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
                Err(e) => panic!("{:?}", e),
            }
        }

        loop {
            server.readable().await.unwrap();

            let mut buf = Vec::with_capacity(512);

            match server.try_recv_buf_from(&mut buf) {
                Ok((n, addr)) => {
                    assert_eq!(n, 11);
                    assert_eq!(addr, caddr);
                    assert_eq!(&buf[0..11], &b"hello world"[..]);
                    break;
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
                Err(e) => panic!("{:?}", e),
            }
        }
    }
}

#[tokio::test]
async fn recv_buf_from() -> std::io::Result<()> {
    let sender = UdpSocket::bind("127.0.0.1:0").await?;
    let receiver = UdpSocket::bind("127.0.0.1:0").await?;

    sender.connect(receiver.local_addr()?).await?;

    sender.send(MSG).await?;
    let mut recv_buf = Vec::with_capacity(32);
    let (len, caddr) = receiver.recv_buf_from(&mut recv_buf).await?;

    assert_eq!(len, MSG_LEN);
    assert_eq!(&recv_buf[..len], MSG);
    assert_eq!(caddr, sender.local_addr()?);
    Ok(())
}

#[tokio::test]
async fn poll_ready() {
    // Create listener
    let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
    let saddr = server.local_addr().unwrap();

    // Create socket pair
    let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
    let caddr = client.local_addr().unwrap();

    for _ in 0..5 {
        loop {
            assert_ok!(poll_fn(|cx| client.poll_send_ready(cx)).await);

            match client.try_send_to(b"hello world", saddr) {
                Ok(n) => {
                    assert_eq!(n, 11);
                    break;
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
                Err(e) => panic!("{:?}", e),
            }
        }

        loop {
            assert_ok!(poll_fn(|cx| server.poll_recv_ready(cx)).await);

            let mut buf = Vec::with_capacity(512);

            match server.try_recv_buf_from(&mut buf) {
                Ok((n, addr)) => {
                    assert_eq!(n, 11);
                    assert_eq!(addr, caddr);
                    assert_eq!(&buf[0..11], &b"hello world"[..]);
                    break;
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
                Err(e) => panic!("{:?}", e),
            }
        }
    }
}

[ Dauer der Verarbeitung: 0.29 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge