Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  framed_read.rs   Sprache: unbekannt

 
#![warn(rust_2018_idioms)]

use tokio::io::{AsyncRead, ReadBuf};
use tokio_test::assert_ready;
use tokio_test::task;
use tokio_util::codec::{Decoder, FramedRead};

use bytes::{Buf, BytesMut};
use futures::Stream;
use std::collections::VecDeque;
use std::io;
use std::pin::Pin;
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};

macro_rules! mock {
    ($($x:expr,)*) => {{
        let mut v = VecDeque::new();
        v.extend(vec![$($x),*]);
        Mock { calls: v }
    }};
}

macro_rules! assert_read {
    ($e:expr, $n:expr) => {{
        let val = assert_ready!($e);
        assert_eq!(val.unwrap().unwrap(), $n);
    }};
}

macro_rules! pin {
    ($id:ident) => {
        Pin::new(&mut $id)
    };
}

struct U32Decoder;

impl Decoder for U32Decoder {
    type Item = u32;
    type Error = io::Error;

    fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
        if buf.len() < 4 {
            return Ok(None);
        }

        let n = buf.split_to(4).get_u32();
        Ok(Some(n))
    }
}

struct U64Decoder;

impl Decoder for U64Decoder {
    type Item = u64;
    type Error = io::Error;

    fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u64>> {
        if buf.len() < 8 {
            return Ok(None);
        }

        let n = buf.split_to(8).get_u64();
        Ok(Some(n))
    }
}

#[test]
fn read_multi_frame_in_packet() {
    let mut task = task::spawn(());
    let mock = mock! {
        Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
    };
    let mut framed = FramedRead::new(mock, U32Decoder);

    task.enter(|cx, _| {
        assert_read!(pin!(framed).poll_next(cx), 0);
        assert_read!(pin!(framed).poll_next(cx), 1);
        assert_read!(pin!(framed).poll_next(cx), 2);
        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
    });
}

#[test]
fn read_multi_frame_across_packets() {
    let mut task = task::spawn(());
    let mock = mock! {
        Ok(b"\x00\x00\x00\x00".to_vec()),
        Ok(b"\x00\x00\x00\x01".to_vec()),
        Ok(b"\x00\x00\x00\x02".to_vec()),
    };
    let mut framed = FramedRead::new(mock, U32Decoder);

    task.enter(|cx, _| {
        assert_read!(pin!(framed).poll_next(cx), 0);
        assert_read!(pin!(framed).poll_next(cx), 1);
        assert_read!(pin!(framed).poll_next(cx), 2);
        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
    });
}

#[test]
fn read_multi_frame_in_packet_after_codec_changed() {
    let mut task = task::spawn(());
    let mock = mock! {
        Ok(b"\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x08".to_vec()),
    };
    let mut framed = FramedRead::new(mock, U32Decoder);

    task.enter(|cx, _| {
        assert_read!(pin!(framed).poll_next(cx), 0x04);

        let mut framed = framed.map_decoder(|_| U64Decoder);
        assert_read!(pin!(framed).poll_next(cx), 0x08);

        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
    });
}

#[test]
fn read_not_ready() {
    let mut task = task::spawn(());
    let mock = mock! {
        Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
        Ok(b"\x00\x00\x00\x00".to_vec()),
        Ok(b"\x00\x00\x00\x01".to_vec()),
    };
    let mut framed = FramedRead::new(mock, U32Decoder);

    task.enter(|cx, _| {
        assert!(pin!(framed).poll_next(cx).is_pending());
        assert_read!(pin!(framed).poll_next(cx), 0);
        assert_read!(pin!(framed).poll_next(cx), 1);
        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
    });
}

#[test]
fn read_partial_then_not_ready() {
    let mut task = task::spawn(());
    let mock = mock! {
        Ok(b"\x00\x00".to_vec()),
        Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
        Ok(b"\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
    };
    let mut framed = FramedRead::new(mock, U32Decoder);

    task.enter(|cx, _| {
        assert!(pin!(framed).poll_next(cx).is_pending());
        assert_read!(pin!(framed).poll_next(cx), 0);
        assert_read!(pin!(framed).poll_next(cx), 1);
        assert_read!(pin!(framed).poll_next(cx), 2);
        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
    });
}

#[test]
fn read_err() {
    let mut task = task::spawn(());
    let mock = mock! {
        Err(io::Error::new(io::ErrorKind::Other, "")),
    };
    let mut framed = FramedRead::new(mock, U32Decoder);

    task.enter(|cx, _| {
        assert_eq!(
            io::ErrorKind::Other,
            assert_ready!(pin!(framed).poll_next(cx))
                .unwrap()
                .unwrap_err()
                .kind()
        )
    });
}

#[test]
fn read_partial_then_err() {
    let mut task = task::spawn(());
    let mock = mock! {
        Ok(b"\x00\x00".to_vec()),
        Err(io::Error::new(io::ErrorKind::Other, "")),
    };
    let mut framed = FramedRead::new(mock, U32Decoder);

    task.enter(|cx, _| {
        assert_eq!(
            io::ErrorKind::Other,
            assert_ready!(pin!(framed).poll_next(cx))
                .unwrap()
                .unwrap_err()
                .kind()
        )
    });
}

#[test]
fn read_partial_would_block_then_err() {
    let mut task = task::spawn(());
    let mock = mock! {
        Ok(b"\x00\x00".to_vec()),
        Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
        Err(io::Error::new(io::ErrorKind::Other, "")),
    };
    let mut framed = FramedRead::new(mock, U32Decoder);

    task.enter(|cx, _| {
        assert!(pin!(framed).poll_next(cx).is_pending());
        assert_eq!(
            io::ErrorKind::Other,
            assert_ready!(pin!(framed).poll_next(cx))
                .unwrap()
                .unwrap_err()
                .kind()
        )
    });
}

#[test]
fn huge_size() {
    let mut task = task::spawn(());
    let data = &[0; 32 * 1024][..];
    let mut framed = FramedRead::new(data, BigDecoder);

    task.enter(|cx, _| {
        assert_read!(pin!(framed).poll_next(cx), 0);
        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
    });

    struct BigDecoder;

    impl Decoder for BigDecoder {
        type Item = u32;
        type Error = io::Error;

        fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
            if buf.len() < 32 * 1024 {
                return Ok(None);
            }
            buf.advance(32 * 1024);
            Ok(Some(0))
        }
    }
}

#[test]
fn data_remaining_is_error() {
    let mut task = task::spawn(());
    let slice = &[0; 5][..];
    let mut framed = FramedRead::new(slice, U32Decoder);

    task.enter(|cx, _| {
        assert_read!(pin!(framed).poll_next(cx), 0);
        assert!(assert_ready!(pin!(framed).poll_next(cx)).unwrap().is_err());
    });
}

#[test]
fn multi_frames_on_eof() {
    let mut task = task::spawn(());
    struct MyDecoder(Vec<u32>);

    impl Decoder for MyDecoder {
        type Item = u32;
        type Error = io::Error;

        fn decode(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
            unreachable!();
        }

        fn decode_eof(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
            if self.0.is_empty() {
                return Ok(None);
            }

            Ok(Some(self.0.remove(0)))
        }
    }

    let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3]));

    task.enter(|cx, _| {
        assert_read!(pin!(framed).poll_next(cx), 0);
        assert_read!(pin!(framed).poll_next(cx), 1);
        assert_read!(pin!(framed).poll_next(cx), 2);
        assert_read!(pin!(framed).poll_next(cx), 3);
        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
    });
}

#[test]
fn read_eof_then_resume() {
    let mut task = task::spawn(());
    let mock = mock! {
        Ok(b"\x00\x00\x00\x01".to_vec()),
        Ok(b"".to_vec()),
        Ok(b"\x00\x00\x00\x02".to_vec()),
        Ok(b"".to_vec()),
        Ok(b"\x00\x00\x00\x03".to_vec()),
    };
    let mut framed = FramedRead::new(mock, U32Decoder);

    task.enter(|cx, _| {
        assert_read!(pin!(framed).poll_next(cx), 1);
        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
        assert_read!(pin!(framed).poll_next(cx), 2);
        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
        assert_read!(pin!(framed).poll_next(cx), 3);
        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
        assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
    });
}

// ===== Mock ======

struct Mock {
    calls: VecDeque<io::Result<Vec<u8>>>,
}

impl AsyncRead for Mock {
    fn poll_read(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<io::Result<()>> {
        use io::ErrorKind::WouldBlock;

        match self.calls.pop_front() {
            Some(Ok(data)) => {
                debug_assert!(buf.remaining() >= data.len());
                buf.put_slice(&data);
                Ready(Ok(()))
            }
            Some(Err(ref e)) if e.kind() == WouldBlock => Pending,
            Some(Err(e)) => Ready(Err(e)),
            None => Ready(Ok(())),
        }
    }
}

[ Dauer der Verarbeitung: 0.38 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....
    

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge