Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


SSL ipccore.rs   Sprache: unbekannt

 
// Copyright © 2021 Mozilla Foundation
//
// This program is made available under an ISC-style license.  See the
// accompanying file LICENSE for details

use std::io::{self, Result};
use std::sync::{mpsc, Arc};
use std::thread;

use mio::{event::Event, Events, Interest, Poll, Registry, Token, Waker};
use slab::Slab;

use crate::messages::AssociateHandleForMessage;
use crate::rpccore::{
    make_client, make_server, Client, Handler, Proxy, RequestQueue, RequestQueueSender, Server,
};
use crate::{
    codec::Codec,
    codec::LengthDelimitedCodec,
    sys::{self, RecvMsg, SendMsg},
};

use serde::{de::DeserializeOwned, Serialize};
use std::fmt::Debug;

const WAKE_TOKEN: Token = Token(!0);

thread_local!(static IN_EVENTLOOP: std::cell::RefCell<Option<thread::ThreadId>> = const { std::cell::RefCell::new(None) });

fn assert_not_in_event_loop_thread() {
    IN_EVENTLOOP.with(|b| {
        assert_ne!(*b.borrow(), Some(thread::current().id()));
    });
}

// Requests sent by an EventLoopHandle to be serviced by
// the handle's associated EventLoop.
enum Request {
    // See EventLoop::add_connection
    AddConnection(
        sys::Pipe,
        Box<dyn Driver + Send>,
        mpsc::Sender<Result<Token>>,
    ),
    // See EventLoop::shutdown
    Shutdown,
    // See EventLoop::wake_connection
    WakeConnection(Token),
}

// EventLoopHandle is a cloneable external reference
// to a running EventLoop, allowing registration of
// new client and server connections, in addition to
// requesting the EventLoop shut down cleanly.
#[derive(Clone, Debug)]
pub struct EventLoopHandle {
    waker: Arc<Waker>,
    requests: RequestQueueSender<Request>,
}

impl EventLoopHandle {
    pub fn bind_client<C: Client + 'static>(
        &self,
        connection: sys::Pipe,
    ) -> Result<Proxy<<C as Client>::ServerMessage, <C as Client>::ClientMessage>>
    where
        <C as Client>::ServerMessage: Serialize + Debug + AssociateHandleForMessage + Send,
        <C as Client>::ClientMessage: DeserializeOwned + Debug + AssociateHandleForMessage + Send,
    {
        let (handler, mut proxy) = make_client::<C>()?;
        let driver = Box::new(FramedDriver::new(handler));
        let r = self.add_connection(connection, driver);
        trace!("EventLoop::bind_client {:?}", r);
        r.map(|token| {
            proxy.connect_event_loop(self.clone(), token);
            proxy
        })
    }

    pub fn bind_server<S: Server + Send + 'static>(
        &self,
        server: S,
        connection: sys::Pipe,
    ) -> Result<()>
    where
        <S as Server>::ServerMessage: DeserializeOwned + Debug + AssociateHandleForMessage + Send,
        <S as Server>::ClientMessage: Serialize + Debug + AssociateHandleForMessage + Send,
    {
        let handler = make_server::<S>(server);
        let driver = Box::new(FramedDriver::new(handler));
        let r = self.add_connection(connection, driver);
        trace!("EventLoop::bind_server {:?}", r);
        r.map(|_| ())
    }

    // Register a new connection with associated driver on the EventLoop.
    // TODO: Since this is called from a Gecko main thread, make this non-blocking wrt. the EventLoop.
    fn add_connection(
        &self,
        connection: sys::Pipe,
        driver: Box<dyn Driver + Send>,
    ) -> Result<Token> {
        assert_not_in_event_loop_thread();
        let (tx, rx) = mpsc::channel();
        self.requests
            .push(Request::AddConnection(connection, driver, tx))
            .map_err(|_| {
                debug!("EventLoopHandle::add_connection send failed");
                io::ErrorKind::ConnectionAborted
            })?;
        self.waker.wake()?;
        rx.recv().map_err(|_| {
            debug!("EventLoopHandle::add_connection recv failed");
            io::ErrorKind::ConnectionAborted
        })?
    }

    // Signal EventLoop to shutdown.  Causes EventLoop::poll to return Ok(false).
    fn shutdown(&self) -> Result<()> {
        self.requests.push(Request::Shutdown).map_err(|_| {
            debug!("EventLoopHandle::shutdown send failed");
            io::ErrorKind::ConnectionAborted
        })?;
        self.waker.wake()
    }

    // Signal EventLoop to wake connection specified by `token` for processing.
    pub(crate) fn wake_connection(&self, token: Token) {
        if self.requests.push(Request::WakeConnection(token)).is_ok() {
            self.waker.wake().expect("wake failed");
        }
    }
}

// EventLoop owns all registered connections, and is responsible for calling each connection's
// `handle_event` or `handle_wake` any time a readiness or wake event associated with that connection is
// produced.
struct EventLoop {
    poll: Poll,
    events: Events,
    waker: Arc<Waker>,
    name: String,
    connections: Slab<Connection>,
    requests: Arc<RequestQueue<Request>>,
}

const EVENT_LOOP_INITIAL_CLIENTS: usize = 64; // Initial client allocation, exceeding this will cause the connection slab to grow.
const EVENT_LOOP_EVENTS_PER_ITERATION: usize = 256; // Number of events per poll() step, arbitrary limit.

impl EventLoop {
    fn new(name: String) -> Result<EventLoop> {
        let poll = Poll::new()?;
        let waker = Arc::new(Waker::new(poll.registry(), WAKE_TOKEN)?);
        let eventloop = EventLoop {
            poll,
            events: Events::with_capacity(EVENT_LOOP_EVENTS_PER_ITERATION),
            waker,
            name,
            connections: Slab::with_capacity(EVENT_LOOP_INITIAL_CLIENTS),
            requests: Arc::new(RequestQueue::new(EVENT_LOOP_INITIAL_CLIENTS)),
        };

        Ok(eventloop)
    }

    // Return a cloneable handle for controlling the EventLoop externally.
    fn handle(&mut self) -> EventLoopHandle {
        EventLoopHandle {
            waker: self.waker.clone(),
            requests: self.requests.new_sender(),
        }
    }

    // Register a connection and driver.
    fn add_connection(
        &mut self,
        connection: sys::Pipe,
        driver: Box<dyn Driver + Send>,
    ) -> Result<Token> {
        if self.connections.len() == self.connections.capacity() {
            trace!("{}: connection slab full, insert will allocate", self.name);
        }
        let entry = self.connections.vacant_entry();
        let token = Token(entry.key());
        assert_ne!(token, WAKE_TOKEN);
        let connection = Connection::new(connection, token, driver, self.poll.registry())?;
        debug!("{}: {:?}: new connection", self.name, token);
        entry.insert(connection);
        Ok(token)
    }

    // Step EventLoop once.  Call this in a loop from a dedicated thread.
    // Returns false if EventLoop is shutting down.
    // Each step may call `handle_event` on any registered connection that
    // has received readiness events from the poll wakeup.
    fn poll(&mut self) -> Result<bool> {
        loop {
            let r = self.poll.poll(&mut self.events, None);
            match r {
                Ok(()) => break,
                Err(ref e) if interrupted(e) => continue,
                Err(e) => return Err(e),
            }
        }

        for event in self.events.iter() {
            match event.token() {
                WAKE_TOKEN => {
                    debug!("{}: WAKE: wake event, will process requests", self.name);
                }
                token => {
                    debug!("{}: {:?}: connection event: {:?}", self.name, token, event);
                    let done = if let Some(connection) = self.connections.get_mut(token.0) {
                        match connection.handle_event(event, self.poll.registry()) {
                            Ok(done) => {
                                debug!("{}: connection {:?} done={}", self.name, token, done);
                                done
                            }
                            Err(e) => {
                                debug!("{}: {:?}: connection error: {:?}", self.name, token, e);
                                true
                            }
                        }
                    } else {
                        // Spurious event, log and ignore.
                        debug!(
                            "{}: {:?}: token not found in slab: {:?}",
                            self.name, token, event
                        );
                        false
                    };
                    if done {
                        debug!("{}: {:?}: done, removing", self.name, token);
                        let mut connection = self.connections.remove(token.0);
                        if let Err(e) = connection.shutdown(self.poll.registry()) {
                            debug!(
                                "{}: EventLoop drop - closing connection for {:?} failed: {:?}",
                                self.name, token, e
                            );
                        }
                    }
                }
            }
        }

        // If the waker was signalled there may be pending requests to process.
        while let Some(req) = self.requests.pop() {
            match req {
                Request::AddConnection(pipe, driver, tx) => {
                    debug!("{}: EventLoop: handling add_connection", self.name);
                    let r = self.add_connection(pipe, driver);
                    tx.send(r).expect("EventLoop::add_connection");
                }
                Request::Shutdown => {
                    debug!("{}: EventLoop: handling shutdown", self.name);
                    return Ok(false);
                }
                Request::WakeConnection(token) => {
                    debug!(
                        "{}: EventLoop: handling wake_connection {:?}",
                        self.name, token
                    );
                    let done = if let Some(connection) = self.connections.get_mut(token.0) {
                        match connection.handle_wake(self.poll.registry()) {
                            Ok(done) => done,
                            Err(e) => {
                                debug!("{}: {:?}: connection error: {:?}", self.name, token, e);
                                true
                            }
                        }
                    } else {
                        // Spurious wake, log and ignore.
                        debug!(
                            "{}: {:?}: token not found in slab: wake_connection",
                            self.name, token
                        );
                        false
                    };
                    if done {
                        debug!("{}: {:?}: done (wake), removing", self.name, token);
                        let mut connection = self.connections.remove(token.0);
                        if let Err(e) = connection.shutdown(self.poll.registry()) {
                            debug!(
                                "{}: EventLoop drop - closing connection for {:?} failed: {:?}",
                                self.name, token, e
                            );
                        }
                    }
                }
            }
        }

        Ok(true)
    }
}

impl Drop for EventLoop {
    fn drop(&mut self) {
        debug!("{}: EventLoop drop", self.name);
        for (token, connection) in &mut self.connections {
            debug!(
                "{}: EventLoop drop - closing connection for {:?}",
                self.name, token
            );
            if let Err(e) = connection.shutdown(self.poll.registry()) {
                debug!(
                    "{}: EventLoop drop - closing connection for {:?} failed: {:?}",
                    self.name, token, e
                );
            }
        }
        debug!("{}: EventLoop drop done", self.name);
    }
}

// Connection wraps an interprocess connection (Pipe) and manages
// receiving inbound and sending outbound buffers (and associated handles, if any).
// The associated driver is responsible for message framing and serialization.
struct Connection {
    io: sys::Pipe,
    token: Token,
    interest: Option<Interest>,
    inbound: sys::ConnectionBuffer,
    outbound: sys::ConnectionBuffer,
    driver: Box<dyn Driver + Send>,
}

pub(crate) const IPC_CLIENT_BUFFER_SIZE: usize = 16384;

impl Connection {
    fn new(
        mut io: sys::Pipe,
        token: Token,
        driver: Box<dyn Driver + Send>,
        registry: &Registry,
    ) -> Result<Connection> {
        let interest = Interest::READABLE;
        registry.register(&mut io, token, interest)?;
        Ok(Connection {
            io,
            token,
            interest: Some(interest),
            inbound: sys::ConnectionBuffer::with_capacity(IPC_CLIENT_BUFFER_SIZE),
            outbound: sys::ConnectionBuffer::with_capacity(IPC_CLIENT_BUFFER_SIZE),
            driver,
        })
    }

    fn shutdown(&mut self, registry: &Registry) -> Result<()> {
        trace!(
            "{:?}: connection shutdown interest={:?}",
            self.token,
            self.interest
        );
        let r = self.io.shutdown();
        trace!("{:?}: connection shutdown r={:?}", self.token, r);
        self.interest = None;
        registry.deregister(&mut self.io)
    }

    // Connections are always interested in READABLE.  clear_readable is only
    // called when the connection is in the process of shutting down.
    fn clear_readable(&mut self, registry: &Registry) -> Result<()> {
        self.update_registration(
            registry,
            self.interest.and_then(|i| i.remove(Interest::READABLE)),
        )
    }

    // Connections toggle WRITABLE based on the state of the `outbound` buffer.
    fn set_writable(&mut self, registry: &Registry) -> Result<()> {
        self.update_registration(
            registry,
            Some(
                self.interest
                    .map_or_else(|| Interest::WRITABLE, |i| i.add(Interest::WRITABLE)),
            ),
        )
    }

    fn clear_writable(&mut self, registry: &Registry) -> Result<()> {
        self.update_registration(
            registry,
            self.interest.and_then(|i| i.remove(Interest::WRITABLE)),
        )
    }

    // Update connection registration with the current readiness event interests.
    fn update_registration(
        &mut self,
        registry: &Registry,
        new_interest: Option<Interest>,
    ) -> Result<()> {
        // Note: Updating registration always triggers a writable event with NamedPipes, so
        // it's important to skip updating registration when the set of interests hasn't changed.
        if new_interest != self.interest {
            trace!(
                "{:?}: updating readiness registration old={:?} new={:?}",
                self.token,
                self.interest,
                new_interest
            );
            self.interest = new_interest;
            if let Some(interest) = self.interest {
                registry.reregister(&mut self.io, self.token, interest)?;
            } else {
                registry.deregister(&mut self.io)?;
            }
        }
        Ok(())
    }

    // Handle readiness event.  Errors returned are fatal for this connection, resulting in removal from the EventLoop connection list.
    // The EventLoop will call this for any connection that has received an event.
    fn handle_event(&mut self, event: &Event, registry: &Registry) -> Result<bool> {
        debug!("{:?}: handling event {:?}", self.token, event);
        assert_eq!(self.token, event.token());
        let done = if event.is_readable() {
            self.recv_inbound()?
        } else {
            trace!("{:?}: not readable", self.token);
            false
        };
        self.flush_outbound()?;
        if self.send_outbound(registry)? {
            // Hit EOF during send
            return Ok(true);
        }
        debug!(
            "{:?}: handling event done (recv done={}, outbound={})",
            self.token,
            done,
            self.outbound.is_empty()
        );
        let done = done && self.outbound.is_empty();
        // If driver is done and outbound is clear, unregister connection.
        if done {
            trace!("{:?}: driver done, clearing read interest", self.token);
            self.clear_readable(registry)?;
        }
        Ok(done)
    }

    // Handle wake event.  Errors returned are fatal for this connection, resulting in removal from the EventLoop connection list.
    // The EventLoop will call this to clear the outbound buffer for any connection that has received a wake event.
    fn handle_wake(&mut self, registry: &Registry) -> Result<bool> {
        debug!("{:?}: handling wake", self.token);
        self.flush_outbound()?;
        if self.send_outbound(registry)? {
            // Hit EOF during send
            return Ok(true);
        }
        debug!("{:?}: handling wake done", self.token);
        Ok(false)
    }

    fn recv_inbound(&mut self) -> Result<bool> {
        // If the connection is readable, read into inbound and pass to driver for processing until all ready data
        // has been consumed.
        loop {
            trace!("{:?}: pre-recv inbound: {:?}", self.token, self.inbound);
            let r = self.io.recv_msg(&mut self.inbound);
            match r {
                Ok(0) => {
                    trace!(
                        "{:?}: recv EOF unprocessed inbound={}",
                        self.token,
                        self.inbound.is_empty()
                    );
                    return Ok(true);
                }
                Ok(n) => {
                    trace!("{:?}: recv bytes: {}, process_inbound", self.token, n);
                    let r = self.driver.process_inbound(&mut self.inbound);
                    trace!("{:?}: process_inbound done: {:?}", self.token, r);
                    match r {
                        Ok(done) => {
                            if done {
                                return Ok(true);
                            }
                        }
                        Err(e) => {
                            debug!(
                                "{:?}: process_inbound error: {:?} unprocessed inbound={}",
                                self.token,
                                e,
                                self.inbound.is_empty()
                            );
                            return Err(e);
                        }
                    }
                }
                Err(ref e) if would_block(e) => {
                    trace!("{:?}: recv would_block: {:?}", self.token, e);
                    return Ok(false);
                }
                Err(ref e) if interrupted(e) => {
                    trace!("{:?}: recv interrupted: {:?}", self.token, e);
                    continue;
                }
                Err(e) => {
                    debug!("{:?}: recv error: {:?}", self.token, e);
                    return Err(e);
                }
            }
        }
    }

    fn flush_outbound(&mut self) -> Result<()> {
        // Enqueue outbound messages to the outbound buffer, then try to write out to connection.
        // There may be outbound messages even if there was no inbound processing, so always attempt
        // to enqueue and flush.
        trace!("{:?}: flush_outbound", self.token);
        let r = self.driver.flush_outbound(&mut self.outbound);
        trace!("{:?}: flush_outbound done: {:?}", self.token, r);
        if let Err(e) = r {
            debug!("{:?}: flush_outbound error: {:?}", self.token, e);
            return Err(e);
        }
        Ok(())
    }

    fn send_outbound(&mut self, registry: &Registry) -> Result<bool> {
        // Attempt to flush outbound buffer.  If the connection's write buffer is full, register for WRITABLE
        // and complete flushing when associated notitication arrives later.
        while !self.outbound.is_empty() {
            let r = self.io.send_msg(&mut self.outbound);
            match r {
                Ok(0) => {
                    trace!("{:?}: send EOF", self.token);
                    return Ok(true);
                }
                Ok(n) => {
                    trace!("{:?}: send bytes: {}", self.token, n);
                }
                Err(ref e) if would_block(e) => {
                    trace!(
                        "{:?}: send would_block: {:?}, setting write interest",
                        self.token,
                        e
                    );
                    // Register for write events.
                    self.set_writable(registry)?;
                    break;
                }
                Err(ref e) if interrupted(e) => {
                    trace!("{:?}: send interrupted: {:?}", self.token, e);
                    continue;
                }
                Err(e) => {
                    debug!("{:?}: send error: {:?}", self.token, e);
                    return Err(e);
                }
            }
            trace!("{:?}: post-send: outbound {:?}", self.token, self.outbound);
        }
        // Outbound buffer flushed, clear registration for WRITABLE.
        if self.outbound.is_empty() {
            trace!("{:?}: outbound empty, clearing write interest", self.token);
            self.clear_writable(registry)?;
        }
        Ok(false)
    }
}

impl Drop for Connection {
    fn drop(&mut self) {
        debug!("{:?}: Connection drop", self.token);
    }
}

fn would_block(err: &std::io::Error) -> bool {
    err.kind() == std::io::ErrorKind::WouldBlock
}

fn interrupted(err: &std::io::Error) -> bool {
    err.kind() == std::io::ErrorKind::Interrupted
}

// Driver only has a single implementation, but must be hidden behind a Trait object to
// hide the varying FramedDriver sizes (due to different `T` values).
trait Driver {
    // Handle inbound messages.  Returns true if Driver is done; this will trigger Connection removal and cleanup.
    fn process_inbound(&mut self, inbound: &mut sys::ConnectionBuffer) -> Result<bool>;

    // Write outbound messages to `outbound`.
    fn flush_outbound(&mut self, outbound: &mut sys::ConnectionBuffer) -> Result<()>;
}

// Length-delimited connection framing and (de)serialization is handled by the inbound and outbound processing.
// Handlers can then process message Requests and Responses without knowledge of serialization or
// handle remoting.
impl<T> Driver for FramedDriver<T>
where
    T: Handler,
    T::In: DeserializeOwned + Debug + AssociateHandleForMessage,
    T::Out: Serialize + Debug + AssociateHandleForMessage,
{
    // Caller passes `inbound` data, this function will trim any complete messages from `inbound` and pass them to the handler for processing.
    fn process_inbound(&mut self, inbound: &mut sys::ConnectionBuffer) -> Result<bool> {
        debug!("process_inbound: {:?}", inbound);

        // Repeatedly call `decode` as long as it produces items, passing each produced item to the handler to action.
        #[allow(unused_mut)]
        while let Some(mut item) = self.codec.decode(&mut inbound.buf)? {
            if item.has_associated_handle() {
                // On Unix, dequeue a handle from the connection and update the item's handle.
                #[cfg(unix)]
                {
                    let new = inbound
                        .pop_handle()
                        .expect("inbound handle expected for item");
                    unsafe { item.set_local_handle(new.take()) };
                }
                // On Windows, the deserialized item contains the correct handle value, so
                // convert it to an owned handle on the item.
                #[cfg(windows)]
                {
                    assert!(inbound.pop_handle().is_none());
                    unsafe { item.set_local_handle() };
                }
            }

            self.handler.consume(item)?;
        }

        Ok(false)
    }

    // Caller will try to write `outbound` to associated connection, queuing any data that can't be transmitted immediately.
    fn flush_outbound(&mut self, outbound: &mut sys::ConnectionBuffer) -> Result<()> {
        debug!("flush_outbound: {:?}", outbound.buf);

        // Repeatedly grab outgoing items from the handler, passing each to `encode` for serialization into `outbound`.
        while let Some(mut item) = self.handler.produce()? {
            let handle = if item.has_associated_handle() {
                #[allow(unused_mut)]
                let mut handle = item.take_handle();
                // On Windows, the handle is transferred by duplicating it into the target remote process.
                #[cfg(windows)]
                unsafe {
                    item.set_remote_handle(handle.send_to_target()?);
                }
                Some(handle)
            } else {
                None
            };

            self.codec.encode(item, &mut outbound.buf)?;
            if let Some(handle) = handle {
                // `outbound` retains ownership of the handle until the associated
                // encoded item in `outbound.buf` is sent to the remote process.
                outbound.push_handle(handle);
            }
        }
        Ok(())
    }
}

struct FramedDriver<T: Handler> {
    codec: LengthDelimitedCodec<T::Out, T::In>,
    handler: T,
}

impl<T: Handler> FramedDriver<T> {
    fn new(handler: T) -> FramedDriver<T> {
        FramedDriver {
            codec: Default::default(),
            handler,
        }
    }
}

#[derive(Debug)]
pub struct EventLoopThread {
    thread: Option<thread::JoinHandle<Result<()>>>,
    name: String,
    handle: EventLoopHandle,
}

impl EventLoopThread {
    pub fn new<F1, F2>(
        name: String,
        stack_size: Option<usize>,
        after_start: F1,
        before_stop: F2,
    ) -> Result<Self>
    where
        F1: Fn() + Send + 'static,
        F2: Fn() + Send + 'static,
    {
        let mut event_loop = EventLoop::new(name.clone())?;
        let handle = event_loop.handle();

        let builder = thread::Builder::new()
            .name(name.clone())
            .stack_size(stack_size.unwrap_or(64 * 4096));

        let thread = builder.spawn(move || {
            trace!("{}: event loop thread enter", event_loop.name);
            after_start();
            let _thread_exit_guard = scopeguard::guard((), |_| before_stop());

            let r = loop {
                let start = std::time::Instant::now();
                let r = event_loop.poll();
                trace!(
                    "{}: event loop poll r={:?}, took={}μs",
                    event_loop.name,
                    r,
                    start.elapsed().as_micros()
                );
                match r {
                    Ok(true) => continue,
                    _ => break r,
                }
            };

            trace!("{}: event loop thread exit", event_loop.name);
            r.map(|_| ())
        })?;

        Ok(EventLoopThread {
            thread: Some(thread),
            name,
            handle,
        })
    }

    pub fn handle(&self) -> &EventLoopHandle {
        &self.handle
    }
}

impl Drop for EventLoopThread {
    // Shut down event loop and executor thread.  Blocks until complete.
    fn drop(&mut self) {
        trace!("{}: EventLoopThread shutdown", self.name);
        if let Err(e) = self.handle.shutdown() {
            debug!("{}: initiating shutdown failed: {:?}", self.name, e);
        }
        let thread = self.thread.take().expect("event loop thread");
        if let Err(e) = thread.join() {
            error!("{}: EventLoopThread failed: {:?}", self.name, e);
        }
        trace!("{}: EventLoopThread shutdown done", self.name);
    }
}

#[cfg(test)]
mod test {
    use serde_derive::{Deserialize, Serialize};

    use super::*;

    #[derive(Debug, Serialize, Deserialize, PartialEq)]
    enum TestServerMessage {
        TestRequest,
    }
    impl AssociateHandleForMessage for TestServerMessage {}

    struct TestServerImpl {}

    impl Server for TestServerImpl {
        type ServerMessage = TestServerMessage;
        type ClientMessage = TestClientMessage;

        fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage {
            assert_eq!(req, TestServerMessage::TestRequest);
            TestClientMessage::TestResponse
        }
    }

    #[derive(Debug, Serialize, Deserialize, PartialEq)]
    enum TestClientMessage {
        TestResponse,
    }

    impl AssociateHandleForMessage for TestClientMessage {}

    struct TestClientImpl {}

    impl Client for TestClientImpl {
        type ServerMessage = TestServerMessage;
        type ClientMessage = TestClientMessage;
    }

    fn init() {
        let _ = env_logger::builder().is_test(true).try_init();
    }

    fn setup() -> (
        EventLoopThread,
        EventLoopThread,
        Proxy<TestServerMessage, TestClientMessage>,
    ) {
        // Server setup and registration.
        let server = EventLoopThread::new("test-server".to_string(), None, || {}, || {})
            .expect("server EventLoopThread");
        let server_handle = server.handle();

        let (server_pipe, client_pipe) = sys::make_pipe_pair().expect("server make_pipe_pair");
        server_handle
            .bind_server(TestServerImpl {}, server_pipe)
            .expect("server bind_server");

        // Client setup and registration.
        let client = EventLoopThread::new("test-client".to_string(), None, || {}, || {})
            .expect("client EventLoopThread");
        let client_handle = client.handle();

        let client_pipe = unsafe { sys::Pipe::from_raw_handle(client_pipe) };
        let client_proxy = client_handle
            .bind_client::<TestClientImpl>(client_pipe)
            .expect("client bind_client");

        (server, client, client_proxy)
    }

    // Verify basic EventLoopThread functionality works.  Create a server and client EventLoopThread, then send
    // a single message from the client to the server and wait for the expected response.
    #[test]
    fn basic() {
        init();
        let (server, client, client_proxy) = setup();

        // RPC message from client to server.
        let response = client_proxy.call(TestServerMessage::TestRequest);
        let response = response.expect("client response");
        assert_eq!(response, TestClientMessage::TestResponse);

        // Explicit shutdown.
        drop(client);
        drop(server);
    }

    // Same as `basic`, but shut down server before client.
    #[test]
    fn basic_reverse_drop_order() {
        init();
        let (server, client, client_proxy) = setup();

        // RPC message from client to server.
        let response = client_proxy.call(TestServerMessage::TestRequest);
        let response = response.expect("client response");
        assert_eq!(response, TestClientMessage::TestResponse);

        // Explicit shutdown.
        drop(server);
        drop(client);
    }

    #[test]
    fn dead_server() {
        init();
        let (server, _client, client_proxy) = setup();
        drop(server);

        let response = client_proxy.call(TestServerMessage::TestRequest);
        response.expect_err("sending on closed channel");
    }

    #[test]
    fn dead_client() {
        init();
        let (_server, client, client_proxy) = setup();
        drop(client);

        let response = client_proxy.call(TestServerMessage::TestRequest);
        response.expect_err("sending on a closed channel");
    }

    #[test]
    fn disconnected_handle() {
        init();
        let server = EventLoopThread::new("test-server".to_string(), None, || {}, || {})
            .expect("server EventLoopThread");
        let server_handle = server.handle().clone();
        drop(server);

        server_handle
            .shutdown()
            .expect_err("sending on closed channel");
    }

    #[test]
    fn clone_after_drop() {
        init();
        let (server, client, client_proxy) = setup();
        drop(server);
        drop(client);

        let clone = client_proxy.clone();
        let response = clone.call(TestServerMessage::TestRequest);
        response.expect_err("sending to a dropped ClientHandler");
    }

    #[test]
    fn basic_event_loop_thread_callbacks() {
        init();
        let (start_tx, start_rx) = mpsc::channel();
        let (stop_tx, stop_rx) = mpsc::channel();

        let elt = EventLoopThread::new(
            "test-thread-callbacks".to_string(),
            None,
            move || start_tx.send(()).unwrap(),
            move || stop_tx.send(()).unwrap(),
        )
        .expect("server EventLoopThread");

        start_rx.recv().expect("after_start callback done");

        drop(elt);

        stop_rx.recv().expect("before_stop callback done");
    }
}

[ Verzeichnis aufwärts0.51unsichere Verbindung  Übersetzung europäischer Sprachen durch Browser  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge