Quellcodebibliothek Statistik Leitseite products/sources/formale Sprachen/C/Firefox/third_party/rust/audioipc2/src/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 32 kB image not shown  

Quelle  ipccore.rs   Sprache: unbekannt

 
// Copyright © 2021 Mozilla Foundation
//
// This program is made available under an ISC-style license.  See the
// accompanying file LICENSE for details

use std::io::{self, Result};
use std::sync::{mpsc, Arc};
use std::thread;

use mio::{event::Event, Events, Interest, Poll, Registry, Token, Waker};
use slab::Slab;

use crate::messages::AssociateHandleForMessage;
use crate::rpccore::{
    make_client, make_server, Client, Handler, Proxy, RequestQueue, RequestQueueSender, Server,
};
use crate::{
    codec::Codec,
    codec::LengthDelimitedCodec,
    sys::{self, RecvMsg, SendMsg},
};

use serde::{de::DeserializeOwned, Serialize};
use std::fmt::Debug;

const WAKE_TOKEN: Token = Token(!0);

thread_local!(static IN_EVENTLOOP: std::cell::RefCell<Option<thread::ThreadId>> = const { std::cell::RefCell::new(None) });

fn assert_not_in_event_loop_thread() {
    IN_EVENTLOOP.with(|b| {
        assert_ne!(*b.borrow(), Some(thread::current().id()));
    });
}

// Requests sent by an EventLoopHandle to be serviced by
// the handle's associated EventLoop.
enum Request {
    // See EventLoop::add_connection
    AddConnection(
        sys::Pipe,
        Box<dyn Driver + Send>,
        mpsc::Sender<Result<Token>>,
    ),
    // See EventLoop::shutdown
    Shutdown,
    // See EventLoop::wake_connection
    WakeConnection(Token),
}

// EventLoopHandle is a cloneable external reference
// to a running EventLoop, allowing registration of
// new client and server connections, in addition to
// requesting the EventLoop shut down cleanly.
#[derive(Clone, Debug)]
pub struct EventLoopHandle {
    waker: Arc<Waker>,
    requests: RequestQueueSender<Request>,
}

impl EventLoopHandle {
    pub fn bind_client<C: Client + 'static>(
        &self,
        connection: sys::Pipe,
    ) -> Result<Proxy<<C as Client>::ServerMessage, <C as Client>::ClientMessage>>
    where
        <C as Client>::ServerMessage: Serialize + Debug + AssociateHandleForMessage + Send,
        <C as Client>::ClientMessage: DeserializeOwned + Debug + AssociateHandleForMessage + Send,
    {
        let (handler, mut proxy) = make_client::<C>()?;
        let driver = Box::new(FramedDriver::new(handler));
        let r = self.add_connection(connection, driver);
        trace!("EventLoop::bind_client {:?}", r);
        r.map(|token| {
            proxy.connect_event_loop(self.clone(), token);
            proxy
        })
    }

    pub fn bind_server<S: Server + Send + 'static>(
        &self,
        server: S,
        connection: sys::Pipe,
    ) -> Result<()>
    where
        <S as Server>::ServerMessage: DeserializeOwned + Debug + AssociateHandleForMessage + Send,
        <S as Server>::ClientMessage: Serialize + Debug + AssociateHandleForMessage + Send,
    {
        let handler = make_server::<S>(server);
        let driver = Box::new(FramedDriver::new(handler));
        let r = self.add_connection(connection, driver);
        trace!("EventLoop::bind_server {:?}", r);
        r.map(|_| ())
    }

    // Register a new connection with associated driver on the EventLoop.
    // TODO: Since this is called from a Gecko main thread, make this non-blocking wrt. the EventLoop.
    fn add_connection(
        &self,
        connection: sys::Pipe,
        driver: Box<dyn Driver + Send>,
    ) -> Result<Token> {
        assert_not_in_event_loop_thread();
        let (tx, rx) = mpsc::channel();
        self.requests
            .push(Request::AddConnection(connection, driver, tx))
            .map_err(|_| {
                debug!("EventLoopHandle::add_connection send failed");
                io::ErrorKind::ConnectionAborted
            })?;
        self.waker.wake()?;
        rx.recv().map_err(|_| {
            debug!("EventLoopHandle::add_connection recv failed");
            io::ErrorKind::ConnectionAborted
        })?
    }

    // Signal EventLoop to shutdown.  Causes EventLoop::poll to return Ok(false).
    fn shutdown(&self) -> Result<()> {
        self.requests.push(Request::Shutdown).map_err(|_| {
            debug!("EventLoopHandle::shutdown send failed");
            io::ErrorKind::ConnectionAborted
        })?;
        self.waker.wake()
    }

    // Signal EventLoop to wake connection specified by `token` for processing.
    pub(crate) fn wake_connection(&self, token: Token) {
        if self.requests.push(Request::WakeConnection(token)).is_ok() {
            self.waker.wake().expect("wake failed");
        }
    }
}

// EventLoop owns all registered connections, and is responsible for calling each connection's
// `handle_event` or `handle_wake` any time a readiness or wake event associated with that connection is
// produced.
struct EventLoop {
    poll: Poll,
    events: Events,
    waker: Arc<Waker>,
    name: String,
    connections: Slab<Connection>,
    requests: Arc<RequestQueue<Request>>,
}

const EVENT_LOOP_INITIAL_CLIENTS: usize = 64; // Initial client allocation, exceeding this will cause the connection slab to grow.
const EVENT_LOOP_EVENTS_PER_ITERATION: usize = 256; // Number of events per poll() step, arbitrary limit.

impl EventLoop {
    fn new(name: String) -> Result<EventLoop> {
        let poll = Poll::new()?;
        let waker = Arc::new(Waker::new(poll.registry(), WAKE_TOKEN)?);
        let eventloop = EventLoop {
            poll,
            events: Events::with_capacity(EVENT_LOOP_EVENTS_PER_ITERATION),
            waker,
            name,
            connections: Slab::with_capacity(EVENT_LOOP_INITIAL_CLIENTS),
            requests: Arc::new(RequestQueue::new(EVENT_LOOP_INITIAL_CLIENTS)),
        };

        Ok(eventloop)
    }

    // Return a cloneable handle for controlling the EventLoop externally.
    fn handle(&mut self) -> EventLoopHandle {
        EventLoopHandle {
            waker: self.waker.clone(),
            requests: self.requests.new_sender(),
        }
    }

    // Register a connection and driver.
    fn add_connection(
        &mut self,
        connection: sys::Pipe,
        driver: Box<dyn Driver + Send>,
    ) -> Result<Token> {
        if self.connections.len() == self.connections.capacity() {
            trace!("{}: connection slab full, insert will allocate", self.name);
        }
        let entry = self.connections.vacant_entry();
        let token = Token(entry.key());
        assert_ne!(token, WAKE_TOKEN);
        let connection = Connection::new(connection, token, driver, self.poll.registry())?;
        debug!("{}: {:?}: new connection", self.name, token);
        entry.insert(connection);
        Ok(token)
    }

    // Step EventLoop once.  Call this in a loop from a dedicated thread.
    // Returns false if EventLoop is shutting down.
    // Each step may call `handle_event` on any registered connection that
    // has received readiness events from the poll wakeup.
    fn poll(&mut self) -> Result<bool> {
        loop {
            let r = self.poll.poll(&mut self.events, None);
            match r {
                Ok(()) => break,
                Err(ref e) if interrupted(e) => continue,
                Err(e) => return Err(e),
            }
        }

        for event in self.events.iter() {
            match event.token() {
                WAKE_TOKEN => {
                    debug!("{}: WAKE: wake event, will process requests", self.name);
                }
                token => {
                    debug!("{}: {:?}: connection event: {:?}", self.name, token, event);
                    let done = if let Some(connection) = self.connections.get_mut(token.0) {
                        match connection.handle_event(event, self.poll.registry()) {
                            Ok(done) => {
                                debug!("{}: connection {:?} done={}", self.name, token, done);
                                done
                            }
                            Err(e) => {
                                debug!("{}: {:?}: connection error: {:?}", self.name, token, e);
                                true
                            }
                        }
                    } else {
                        // Spurious event, log and ignore.
                        debug!(
                            "{}: {:?}: token not found in slab: {:?}",
                            self.name, token, event
                        );
                        false
                    };
                    if done {
                        debug!("{}: {:?}: done, removing", self.name, token);
                        let mut connection = self.connections.remove(token.0);
                        if let Err(e) = connection.shutdown(self.poll.registry()) {
                            debug!(
                                "{}: EventLoop drop - closing connection for {:?} failed: {:?}",
                                self.name, token, e
                            );
                        }
                    }
                }
            }
        }

        // If the waker was signalled there may be pending requests to process.
        while let Some(req) = self.requests.pop() {
            match req {
                Request::AddConnection(pipe, driver, tx) => {
                    debug!("{}: EventLoop: handling add_connection", self.name);
                    let r = self.add_connection(pipe, driver);
                    tx.send(r).expect("EventLoop::add_connection");
                }
                Request::Shutdown => {
                    debug!("{}: EventLoop: handling shutdown", self.name);
                    return Ok(false);
                }
                Request::WakeConnection(token) => {
                    debug!(
                        "{}: EventLoop: handling wake_connection {:?}",
                        self.name, token
                    );
                    let done = if let Some(connection) = self.connections.get_mut(token.0) {
                        match connection.handle_wake(self.poll.registry()) {
                            Ok(done) => done,
                            Err(e) => {
                                debug!("{}: {:?}: connection error: {:?}", self.name, token, e);
                                true
                            }
                        }
                    } else {
                        // Spurious wake, log and ignore.
                        debug!(
                            "{}: {:?}: token not found in slab: wake_connection",
                            self.name, token
                        );
                        false
                    };
                    if done {
                        debug!("{}: {:?}: done (wake), removing", self.name, token);
                        let mut connection = self.connections.remove(token.0);
                        if let Err(e) = connection.shutdown(self.poll.registry()) {
                            debug!(
                                "{}: EventLoop drop - closing connection for {:?} failed: {:?}",
                                self.name, token, e
                            );
                        }
                    }
                }
            }
        }

        Ok(true)
    }
}

impl Drop for EventLoop {
    fn drop(&mut self) {
        debug!("{}: EventLoop drop", self.name);
        for (token, connection) in &mut self.connections {
            debug!(
                "{}: EventLoop drop - closing connection for {:?}",
                self.name, token
            );
            if let Err(e) = connection.shutdown(self.poll.registry()) {
                debug!(
                    "{}: EventLoop drop - closing connection for {:?} failed: {:?}",
                    self.name, token, e
                );
            }
        }
        debug!("{}: EventLoop drop done", self.name);
    }
}

// Connection wraps an interprocess connection (Pipe) and manages
// receiving inbound and sending outbound buffers (and associated handles, if any).
// The associated driver is responsible for message framing and serialization.
struct Connection {
    io: sys::Pipe,
    token: Token,
    interest: Option<Interest>,
    inbound: sys::ConnectionBuffer,
    outbound: sys::ConnectionBuffer,
    driver: Box<dyn Driver + Send>,
}

pub(crate) const IPC_CLIENT_BUFFER_SIZE: usize = 16384;

impl Connection {
    fn new(
        mut io: sys::Pipe,
        token: Token,
        driver: Box<dyn Driver + Send>,
        registry: &Registry,
    ) -> Result<Connection> {
        let interest = Interest::READABLE;
        registry.register(&mut io, token, interest)?;
        Ok(Connection {
            io,
            token,
            interest: Some(interest),
            inbound: sys::ConnectionBuffer::with_capacity(IPC_CLIENT_BUFFER_SIZE),
            outbound: sys::ConnectionBuffer::with_capacity(IPC_CLIENT_BUFFER_SIZE),
            driver,
        })
    }

    fn shutdown(&mut self, registry: &Registry) -> Result<()> {
        trace!(
            "{:?}: connection shutdown interest={:?}",
            self.token,
            self.interest
        );
        let r = self.io.shutdown();
        trace!("{:?}: connection shutdown r={:?}", self.token, r);
        self.interest = None;
        registry.deregister(&mut self.io)
    }

    // Connections are always interested in READABLE.  clear_readable is only
    // called when the connection is in the process of shutting down.
    fn clear_readable(&mut self, registry: &Registry) -> Result<()> {
        self.update_registration(
            registry,
            self.interest.and_then(|i| i.remove(Interest::READABLE)),
        )
    }

    // Connections toggle WRITABLE based on the state of the `outbound` buffer.
    fn set_writable(&mut self, registry: &Registry) -> Result<()> {
        self.update_registration(
            registry,
            Some(
                self.interest
                    .map_or_else(|| Interest::WRITABLE, |i| i.add(Interest::WRITABLE)),
            ),
        )
    }

    fn clear_writable(&mut self, registry: &Registry) -> Result<()> {
        self.update_registration(
            registry,
            self.interest.and_then(|i| i.remove(Interest::WRITABLE)),
        )
    }

    // Update connection registration with the current readiness event interests.
    fn update_registration(
        &mut self,
        registry: &Registry,
        new_interest: Option<Interest>,
    ) -> Result<()> {
        // Note: Updating registration always triggers a writable event with NamedPipes, so
        // it's important to skip updating registration when the set of interests hasn't changed.
        if new_interest != self.interest {
            trace!(
                "{:?}: updating readiness registration old={:?} new={:?}",
                self.token,
                self.interest,
                new_interest
            );
            self.interest = new_interest;
            if let Some(interest) = self.interest {
                registry.reregister(&mut self.io, self.token, interest)?;
            } else {
                registry.deregister(&mut self.io)?;
            }
        }
        Ok(())
    }

    // Handle readiness event.  Errors returned are fatal for this connection, resulting in removal from the EventLoop connection list.
    // The EventLoop will call this for any connection that has received an event.
    fn handle_event(&mut self, event: &Event, registry: &Registry) -> Result<bool> {
        debug!("{:?}: handling event {:?}", self.token, event);
        assert_eq!(self.token, event.token());
        let done = if event.is_readable() {
            self.recv_inbound()?
        } else {
            trace!("{:?}: not readable", self.token);
            false
        };
        self.flush_outbound()?;
        if self.send_outbound(registry)? {
            // Hit EOF during send
            return Ok(true);
        }
        debug!(
            "{:?}: handling event done (recv done={}, outbound={})",
            self.token,
            done,
            self.outbound.is_empty()
        );
        let done = done && self.outbound.is_empty();
        // If driver is done and outbound is clear, unregister connection.
        if done {
            trace!("{:?}: driver done, clearing read interest", self.token);
            self.clear_readable(registry)?;
        }
        Ok(done)
    }

    // Handle wake event.  Errors returned are fatal for this connection, resulting in removal from the EventLoop connection list.
    // The EventLoop will call this to clear the outbound buffer for any connection that has received a wake event.
    fn handle_wake(&mut self, registry: &Registry) -> Result<bool> {
        debug!("{:?}: handling wake", self.token);
        self.flush_outbound()?;
        if self.send_outbound(registry)? {
            // Hit EOF during send
            return Ok(true);
        }
        debug!("{:?}: handling wake done", self.token);
        Ok(false)
    }

    fn recv_inbound(&mut self) -> Result<bool> {
        // If the connection is readable, read into inbound and pass to driver for processing until all ready data
        // has been consumed.
        loop {
            trace!("{:?}: pre-recv inbound: {:?}", self.token, self.inbound);
            let r = self.io.recv_msg(&mut self.inbound);
            match r {
                Ok(0) => {
                    trace!(
                        "{:?}: recv EOF unprocessed inbound={}",
                        self.token,
                        self.inbound.is_empty()
                    );
                    return Ok(true);
                }
                Ok(n) => {
                    trace!("{:?}: recv bytes: {}, process_inbound", self.token, n);
                    let r = self.driver.process_inbound(&mut self.inbound);
                    trace!("{:?}: process_inbound done: {:?}", self.token, r);
                    match r {
                        Ok(done) => {
                            if done {
                                return Ok(true);
                            }
                        }
                        Err(e) => {
                            debug!(
                                "{:?}: process_inbound error: {:?} unprocessed inbound={}",
                                self.token,
                                e,
                                self.inbound.is_empty()
                            );
                            return Err(e);
                        }
                    }
                }
                Err(ref e) if would_block(e) => {
                    trace!("{:?}: recv would_block: {:?}", self.token, e);
                    return Ok(false);
                }
                Err(ref e) if interrupted(e) => {
                    trace!("{:?}: recv interrupted: {:?}", self.token, e);
                    continue;
                }
                Err(e) => {
                    debug!("{:?}: recv error: {:?}", self.token, e);
                    return Err(e);
                }
            }
        }
    }

    fn flush_outbound(&mut self) -> Result<()> {
        // Enqueue outbound messages to the outbound buffer, then try to write out to connection.
        // There may be outbound messages even if there was no inbound processing, so always attempt
        // to enqueue and flush.
        trace!("{:?}: flush_outbound", self.token);
        let r = self.driver.flush_outbound(&mut self.outbound);
        trace!("{:?}: flush_outbound done: {:?}", self.token, r);
        if let Err(e) = r {
            debug!("{:?}: flush_outbound error: {:?}", self.token, e);
            return Err(e);
        }
        Ok(())
    }

    fn send_outbound(&mut self, registry: &Registry) -> Result<bool> {
        // Attempt to flush outbound buffer.  If the connection's write buffer is full, register for WRITABLE
        // and complete flushing when associated notitication arrives later.
        while !self.outbound.is_empty() {
            let r = self.io.send_msg(&mut self.outbound);
            match r {
                Ok(0) => {
                    trace!("{:?}: send EOF", self.token);
                    return Ok(true);
                }
                Ok(n) => {
                    trace!("{:?}: send bytes: {}", self.token, n);
                }
                Err(ref e) if would_block(e) => {
                    trace!(
                        "{:?}: send would_block: {:?}, setting write interest",
                        self.token,
                        e
                    );
                    // Register for write events.
                    self.set_writable(registry)?;
                    break;
                }
                Err(ref e) if interrupted(e) => {
                    trace!("{:?}: send interrupted: {:?}", self.token, e);
                    continue;
                }
                Err(e) => {
                    debug!("{:?}: send error: {:?}", self.token, e);
                    return Err(e);
                }
            }
            trace!("{:?}: post-send: outbound {:?}", self.token, self.outbound);
        }
        // Outbound buffer flushed, clear registration for WRITABLE.
        if self.outbound.is_empty() {
            trace!("{:?}: outbound empty, clearing write interest", self.token);
            self.clear_writable(registry)?;
        }
        Ok(false)
    }
}

impl Drop for Connection {
    fn drop(&mut self) {
        debug!("{:?}: Connection drop", self.token);
    }
}

fn would_block(err: &std::io::Error) -> bool {
    err.kind() == std::io::ErrorKind::WouldBlock
}

fn interrupted(err: &std::io::Error) -> bool {
    err.kind() == std::io::ErrorKind::Interrupted
}

// Driver only has a single implementation, but must be hidden behind a Trait object to
// hide the varying FramedDriver sizes (due to different `T` values).
trait Driver {
    // Handle inbound messages.  Returns true if Driver is done; this will trigger Connection removal and cleanup.
    fn process_inbound(&mut self, inbound: &mut sys::ConnectionBuffer) -> Result<bool>;

    // Write outbound messages to `outbound`.
    fn flush_outbound(&mut self, outbound: &mut sys::ConnectionBuffer) -> Result<()>;
}

// Length-delimited connection framing and (de)serialization is handled by the inbound and outbound processing.
// Handlers can then process message Requests and Responses without knowledge of serialization or
// handle remoting.
impl<T> Driver for FramedDriver<T>
where
    T: Handler,
    T::In: DeserializeOwned + Debug + AssociateHandleForMessage,
    T::Out: Serialize + Debug + AssociateHandleForMessage,
{
    // Caller passes `inbound` data, this function will trim any complete messages from `inbound` and pass them to the handler for processing.
    fn process_inbound(&mut self, inbound: &mut sys::ConnectionBuffer) -> Result<bool> {
        debug!("process_inbound: {:?}", inbound);

        // Repeatedly call `decode` as long as it produces items, passing each produced item to the handler to action.
        #[allow(unused_mut)]
        while let Some(mut item) = self.codec.decode(&mut inbound.buf)? {
            if item.has_associated_handle() {
                // On Unix, dequeue a handle from the connection and update the item's handle.
                #[cfg(unix)]
                {
                    let new = inbound
                        .pop_handle()
                        .expect("inbound handle expected for item");
                    unsafe { item.set_local_handle(new.take()) };
                }
                // On Windows, the deserialized item contains the correct handle value, so
                // convert it to an owned handle on the item.
                #[cfg(windows)]
                {
                    assert!(inbound.pop_handle().is_none());
                    unsafe { item.set_local_handle() };
                }
            }

            self.handler.consume(item)?;
        }

        Ok(false)
    }

    // Caller will try to write `outbound` to associated connection, queuing any data that can't be transmitted immediately.
    fn flush_outbound(&mut self, outbound: &mut sys::ConnectionBuffer) -> Result<()> {
        debug!("flush_outbound: {:?}", outbound.buf);

        // Repeatedly grab outgoing items from the handler, passing each to `encode` for serialization into `outbound`.
        while let Some(mut item) = self.handler.produce()? {
            let handle = if item.has_associated_handle() {
                #[allow(unused_mut)]
                let mut handle = item.take_handle();
                // On Windows, the handle is transferred by duplicating it into the target remote process.
                #[cfg(windows)]
                unsafe {
                    item.set_remote_handle(handle.send_to_target()?);
                }
                Some(handle)
            } else {
                None
            };

            self.codec.encode(item, &mut outbound.buf)?;
            if let Some(handle) = handle {
                // `outbound` retains ownership of the handle until the associated
                // encoded item in `outbound.buf` is sent to the remote process.
                outbound.push_handle(handle);
            }
        }
        Ok(())
    }
}

struct FramedDriver<T: Handler> {
    codec: LengthDelimitedCodec<T::Out, T::In>,
    handler: T,
}

impl<T: Handler> FramedDriver<T> {
    fn new(handler: T) -> FramedDriver<T> {
        FramedDriver {
            codec: Default::default(),
            handler,
        }
    }
}

#[derive(Debug)]
pub struct EventLoopThread {
    thread: Option<thread::JoinHandle<Result<()>>>,
    name: String,
    handle: EventLoopHandle,
}

impl EventLoopThread {
    pub fn new<F1, F2>(
        name: String,
        stack_size: Option<usize>,
        after_start: F1,
        before_stop: F2,
    ) -> Result<Self>
    where
        F1: Fn() + Send + 'static,
        F2: Fn() + Send + 'static,
    {
        let mut event_loop = EventLoop::new(name.clone())?;
        let handle = event_loop.handle();

        let builder = thread::Builder::new()
            .name(name.clone())
            .stack_size(stack_size.unwrap_or(64 * 4096));

        let thread = builder.spawn(move || {
            trace!("{}: event loop thread enter", event_loop.name);
            after_start();
            let _thread_exit_guard = scopeguard::guard((), |_| before_stop());

            let r = loop {
                let start = std::time::Instant::now();
                let r = event_loop.poll();
                trace!(
                    "{}: event loop poll r={:?}, took={}μs",
                    event_loop.name,
                    r,
                    start.elapsed().as_micros()
                );
                match r {
                    Ok(true) => continue,
                    _ => break r,
                }
            };

            trace!("{}: event loop thread exit", event_loop.name);
            r.map(|_| ())
        })?;

        Ok(EventLoopThread {
            thread: Some(thread),
            name,
            handle,
        })
    }

    pub fn handle(&self) -> &EventLoopHandle {
        &self.handle
    }
}

impl Drop for EventLoopThread {
    // Shut down event loop and executor thread.  Blocks until complete.
    fn drop(&mut self) {
        trace!("{}: EventLoopThread shutdown", self.name);
        if let Err(e) = self.handle.shutdown() {
            debug!("{}: initiating shutdown failed: {:?}", self.name, e);
        }
        let thread = self.thread.take().expect("event loop thread");
        if let Err(e) = thread.join() {
            error!("{}: EventLoopThread failed: {:?}", self.name, e);
        }
        trace!("{}: EventLoopThread shutdown done", self.name);
    }
}

#[cfg(test)]
mod test {
    use serde_derive::{Deserialize, Serialize};

    use super::*;

    #[derive(Debug, Serialize, Deserialize, PartialEq)]
    enum TestServerMessage {
        TestRequest,
    }
    impl AssociateHandleForMessage for TestServerMessage {}

    struct TestServerImpl {}

    impl Server for TestServerImpl {
        type ServerMessage = TestServerMessage;
        type ClientMessage = TestClientMessage;

        fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage {
            assert_eq!(req, TestServerMessage::TestRequest);
            TestClientMessage::TestResponse
        }
    }

    #[derive(Debug, Serialize, Deserialize, PartialEq)]
    enum TestClientMessage {
        TestResponse,
    }

    impl AssociateHandleForMessage for TestClientMessage {}

    struct TestClientImpl {}

    impl Client for TestClientImpl {
        type ServerMessage = TestServerMessage;
        type ClientMessage = TestClientMessage;
    }

    fn init() {
        let _ = env_logger::builder().is_test(true).try_init();
    }

    fn setup() -> (
        EventLoopThread,
        EventLoopThread,
        Proxy<TestServerMessage, TestClientMessage>,
    ) {
        // Server setup and registration.
        let server = EventLoopThread::new("test-server".to_string(), None, || {}, || {})
            .expect("server EventLoopThread");
        let server_handle = server.handle();

        let (server_pipe, client_pipe) = sys::make_pipe_pair().expect("server make_pipe_pair");
        server_handle
            .bind_server(TestServerImpl {}, server_pipe)
            .expect("server bind_server");

        // Client setup and registration.
        let client = EventLoopThread::new("test-client".to_string(), None, || {}, || {})
            .expect("client EventLoopThread");
        let client_handle = client.handle();

        let client_pipe = unsafe { sys::Pipe::from_raw_handle(client_pipe) };
        let client_proxy = client_handle
            .bind_client::<TestClientImpl>(client_pipe)
            .expect("client bind_client");

        (server, client, client_proxy)
    }

    // Verify basic EventLoopThread functionality works.  Create a server and client EventLoopThread, then send
    // a single message from the client to the server and wait for the expected response.
    #[test]
    fn basic() {
        init();
        let (server, client, client_proxy) = setup();

        // RPC message from client to server.
        let response = client_proxy.call(TestServerMessage::TestRequest);
        let response = response.expect("client response");
        assert_eq!(response, TestClientMessage::TestResponse);

        // Explicit shutdown.
        drop(client);
        drop(server);
    }

    // Same as `basic`, but shut down server before client.
    #[test]
    fn basic_reverse_drop_order() {
        init();
        let (server, client, client_proxy) = setup();

        // RPC message from client to server.
        let response = client_proxy.call(TestServerMessage::TestRequest);
        let response = response.expect("client response");
        assert_eq!(response, TestClientMessage::TestResponse);

        // Explicit shutdown.
        drop(server);
        drop(client);
    }

    #[test]
    fn dead_server() {
        init();
        let (server, _client, client_proxy) = setup();
        drop(server);

        let response = client_proxy.call(TestServerMessage::TestRequest);
        response.expect_err("sending on closed channel");
    }

    #[test]
    fn dead_client() {
        init();
        let (_server, client, client_proxy) = setup();
        drop(client);

        let response = client_proxy.call(TestServerMessage::TestRequest);
        response.expect_err("sending on a closed channel");
    }

    #[test]
    fn disconnected_handle() {
        init();
        let server = EventLoopThread::new("test-server".to_string(), None, || {}, || {})
            .expect("server EventLoopThread");
        let server_handle = server.handle().clone();
        drop(server);

        server_handle
            .shutdown()
            .expect_err("sending on closed channel");
    }

    #[test]
    fn clone_after_drop() {
        init();
        let (server, client, client_proxy) = setup();
        drop(server);
        drop(client);

        let clone = client_proxy.clone();
        let response = clone.call(TestServerMessage::TestRequest);
        response.expect_err("sending to a dropped ClientHandler");
    }

    #[test]
    fn basic_event_loop_thread_callbacks() {
        init();
        let (start_tx, start_rx) = mpsc::channel();
        let (stop_tx, stop_rx) = mpsc::channel();

        let elt = EventLoopThread::new(
            "test-thread-callbacks".to_string(),
            None,
            move || start_tx.send(()).unwrap(),
            move || stop_tx.send(()).unwrap(),
        )
        .expect("server EventLoopThread");

        start_rx.recv().expect("after_start callback done");

        drop(elt);

        stop_rx.recv().expect("before_stop callback done");
    }
}

[ Dauer der Verarbeitung: 0.35 Sekunden  (vorverarbeitet)  ]