Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/third_party/rust/audioipc2/src/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 14 kB image not shown  

Quelle  rpccore.rs   Sprache: unbekannt

 
// Copyright © 2021 Mozilla Foundation
//
// This program is made available under an ISC-style license.  See the
// accompanying file LICENSE for details

use crossbeam_queue::ArrayQueue;
use mio::Token;
use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::io::{self, Error, ErrorKind, Result};
use std::marker::PhantomPinned;
use std::mem::ManuallyDrop;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};

use crate::ipccore::EventLoopHandle;

// This provides a safe-ish method for a thread to allocate
// stack storage space for a result, then pass a (wrapped)
// pointer to that location to another thread via
// a CompletionWriter to eventually store a result into.
struct Completion<T> {
    item: UnsafeCell<Option<T>>,
    writer: AtomicBool,
    _pin: PhantomPinned, // disable rustc's no-alias
}

impl<T> Completion<T> {
    fn new() -> Self {
        Completion {
            item: UnsafeCell::new(None),
            writer: AtomicBool::new(false),
            _pin: PhantomPinned,
        }
    }

    // Wait until the writer completes, then return the result.
    // This is intended to be a single-use function, once the writer
    // has completed any further attempts to wait will return None.
    fn wait(&self) -> Option<T> {
        // Wait for the writer to complete or be dropped.
        while self.writer.load(Ordering::Acquire) {
            std::thread::park();
        }
        unsafe { (*self.item.get()).take() }
    }

    // Create a writer for the other thread to store the
    // expected result into.
    fn writer(&self) -> CompletionWriter<T> {
        assert!(!self.writer.load(Ordering::Relaxed));
        self.writer.store(true, Ordering::Release);
        CompletionWriter {
            ptr: self as *const _ as *mut _,
            waiter: std::thread::current(),
        }
    }
}

impl<T> Drop for Completion<T> {
    fn drop(&mut self) {
        // Wait for the outstanding writer to complete before
        // dropping, since the CompletionWriter references
        // memory owned by this object.
        while self.writer.load(Ordering::Acquire) {
            std::thread::park();
        }
    }
}

struct CompletionWriter<T> {
    ptr: *mut Completion<T>, // Points to a Completion on another thread's stack
    waiter: std::thread::Thread, // Identifies thread waiting for completion
}

impl<T> CompletionWriter<T> {
    fn set(self, value: T) {
        // Store the result into the Completion's memory.
        // Since `set` consumes `self`, rely on `Drop` to
        // mark the writer as done and wake the Completion's
        // thread.
        unsafe {
            assert!((*self.ptr).writer.load(Ordering::Relaxed));
            *(*self.ptr).item.get() = Some(value);
        }
    }
}

impl<T> Drop for CompletionWriter<T> {
    fn drop(&mut self) {
        // Mark writer as complete - if `set` was not called,
        // the waiter will receive `None`.
        unsafe {
            (*self.ptr).writer.store(false, Ordering::Release);
        }
        // Wake the Completion's thread.
        self.waiter.unpark();
    }
}

// Safety: CompletionWriter holds a pointer to a Completion
// residing on another thread's stack.  The Completion always
// waits for an outstanding writer if present, and CompletionWriter
// releases the waiter and wakes the Completion's thread on drop,
// so this pointer will always be live for the duration of a
// CompletionWriter.
unsafe impl<T> Send for CompletionWriter<T> {}

// RPC message handler.  Implemented by ClientHandler (for Client)
// and ServerHandler (for Server).
pub(crate) trait Handler {
    type In;
    type Out;

    // Consume a request
    fn consume(&mut self, request: Self::In) -> Result<()>;

    // Produce a response
    fn produce(&mut self) -> Result<Option<Self::Out>>;
}

// Client RPC definition.  This supplies the expected message
// request and response types.
pub trait Client {
    type ServerMessage;
    type ClientMessage;
}

// Server RPC definition.  This supplies the expected message
// request and response types.  `process` is passed inbound RPC
// requests by the ServerHandler to be responded to by the server.
pub trait Server {
    type ServerMessage;
    type ClientMessage;

    fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage;
}

// RPC Client Proxy implementation.
type ProxyRequest<Request, Response> = (Request, CompletionWriter<Response>);

// RPC Proxy that may be `clone`d for use by multiple owners/threads.
// A Proxy `call` arranges for the supplied request to be transmitted
// to the associated Server via RPC and blocks awaiting the response
// via the associated `Completion`.
// A ClientHandler normally lives until the last Proxy is dropped, but if the ClientHandler
// encounters an internal error, `requests` will fail to upgrade, allowing
// the proxy to report an error.
#[derive(Debug)]
pub struct Proxy<Request, Response> {
    handle: Option<(EventLoopHandle, Token)>,
    requests: ManuallyDrop<RequestQueueSender<ProxyRequest<Request, Response>>>,
}

impl<Request, Response> Proxy<Request, Response> {
    fn new(requests: RequestQueueSender<ProxyRequest<Request, Response>>) -> Self {
        Self {
            handle: None,
            requests: ManuallyDrop::new(requests),
        }
    }

    pub fn call(&self, request: Request) -> Result<Response> {
        let response = Completion::new();
        self.requests.push((request, response.writer()))?;
        self.wake_connection();
        match response.wait() {
            Some(resp) => Ok(resp),
            None => Err(Error::new(ErrorKind::Other, "proxy recv error")),
        }
    }

    pub(crate) fn connect_event_loop(&mut self, handle: EventLoopHandle, token: Token) {
        self.handle = Some((handle, token));
    }

    fn wake_connection(&self) {
        let (handle, token) = self
            .handle
            .as_ref()
            .expect("proxy not connected to event loop");
        handle.wake_connection(*token);
    }
}

impl<Request, Response> Clone for Proxy<Request, Response> {
    fn clone(&self) -> Self {
        let mut clone = Self::new((*self.requests).clone());
        let (handle, token) = self
            .handle
            .as_ref()
            .expect("proxy not connected to event loop");
        clone.connect_event_loop(handle.clone(), *token);
        clone
    }
}

impl<Request, Response> Drop for Proxy<Request, Response> {
    fn drop(&mut self) {
        trace!("Proxy drop, waking EventLoop");
        // Must drop `requests` before waking the connection, otherwise
        // the wake may be processed before the (last) weak reference is
        // dropped.
        let last_proxy = self.requests.live_proxies();
        unsafe {
            ManuallyDrop::drop(&mut self.requests);
        }
        if last_proxy == 1 && self.handle.is_some() {
            self.wake_connection()
        }
    }
}

const RPC_CLIENT_INITIAL_PROXIES: usize = 32; // Initial proxy pre-allocation per client.

// Client-specific Handler implementation.
// The IPC EventLoop Driver calls this to execute client-specific
// RPC handling.  Serialized messages sent via a Proxy are queued
// for transmission when `produce` is called.
// Deserialized messages are passed via `consume` to
// trigger response completion by sending the response via a channel
// connected to a ProxyResponse.
pub(crate) struct ClientHandler<C: Client> {
    in_flight: VecDeque<CompletionWriter<C::ClientMessage>>,
    requests: Arc<RequestQueue<ProxyRequest<C::ServerMessage, C::ClientMessage>>>,
}

impl<C: Client> ClientHandler<C> {
    fn new(
        requests: Arc<RequestQueue<ProxyRequest<C::ServerMessage, C::ClientMessage>>>,
    ) -> ClientHandler<C> {
        ClientHandler::<C> {
            in_flight: VecDeque::with_capacity(RPC_CLIENT_INITIAL_PROXIES),
            requests,
        }
    }
}

impl<C: Client> Handler for ClientHandler<C> {
    type In = C::ClientMessage;
    type Out = C::ServerMessage;

    fn consume(&mut self, response: Self::In) -> Result<()> {
        trace!("ClientHandler::consume");
        if let Some(response_writer) = self.in_flight.pop_front() {
            response_writer.set(response);
        } else {
            return Err(Error::new(ErrorKind::Other, "request/response mismatch"));
        }

        Ok(())
    }

    fn produce(&mut self) -> Result<Option<Self::Out>> {
        trace!("ClientHandler::produce");

        // If the weak count is zero, no proxies are attached and
        // no further proxies can be attached since every proxy
        // after the initial one is cloned from an existing instance.
        self.requests.check_live_proxies()?;
        // Try to get a new message
        match self.requests.pop() {
            Some((request, response_writer)) => {
                trace!("  --> received request");
                self.in_flight.push_back(response_writer);
                Ok(Some(request))
            }
            None => {
                trace!("  --> no request");
                Ok(None)
            }
        }
    }
}

#[derive(Debug)]
pub(crate) struct RequestQueue<T> {
    queue: ArrayQueue<T>,
}

impl<T> RequestQueue<T> {
    pub(crate) fn new(size: usize) -> Self {
        RequestQueue {
            queue: ArrayQueue::new(size),
        }
    }

    pub(crate) fn pop(&self) -> Option<T> {
        self.queue.pop()
    }

    pub(crate) fn new_sender(self: &Arc<Self>) -> RequestQueueSender<T> {
        RequestQueueSender {
            inner: Arc::downgrade(self),
        }
    }

    pub(crate) fn check_live_proxies(self: &Arc<Self>) -> Result<()> {
        if Arc::weak_count(self) == 0 {
            return Err(io::ErrorKind::ConnectionAborted.into());
        }
        Ok(())
    }
}

pub(crate) struct RequestQueueSender<T> {
    inner: Weak<RequestQueue<T>>,
}

impl<T> RequestQueueSender<T> {
    pub(crate) fn push(&self, request: T) -> Result<()> {
        if let Some(consumer) = self.inner.upgrade() {
            if consumer.queue.push(request).is_err() {
                debug!("Proxy[{:p}]: call failed - CH::requests full", self);
                return Err(io::ErrorKind::ConnectionAborted.into());
            }
            return Ok(());
        }
        debug!("Proxy[{:p}]: call failed - CH::requests dropped", self);
        Err(Error::new(ErrorKind::Other, "proxy send error"))
    }

    pub(crate) fn live_proxies(&self) -> usize {
        Weak::weak_count(&self.inner)
    }
}

impl<T> Clone for RequestQueueSender<T> {
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
        }
    }
}

impl<T> std::fmt::Debug for RequestQueueSender<T> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("RequestQueueProducer")
            .field("inner", &self.inner.as_ptr())
            .finish()
    }
}

#[allow(clippy::type_complexity)]
pub(crate) fn make_client<C: Client>(
) -> Result<(ClientHandler<C>, Proxy<C::ServerMessage, C::ClientMessage>)> {
    let requests = Arc::new(RequestQueue::new(RPC_CLIENT_INITIAL_PROXIES));
    let proxy_req = requests.new_sender();
    let handler = ClientHandler::new(requests);

    Ok((handler, Proxy::new(proxy_req)))
}

// Server-specific Handler implementation.
// The IPC EventLoop Driver calls this to execute server-specific
// RPC handling.  Deserialized messages are passed via `consume` to the
// associated `server` for processing.  Server responses are then queued
// for RPC to the associated client when `produce` is called.
pub(crate) struct ServerHandler<S: Server> {
    server: S,
    in_flight: VecDeque<S::ClientMessage>,
}

impl<S: Server> Handler for ServerHandler<S> {
    type In = S::ServerMessage;
    type Out = S::ClientMessage;

    fn consume(&mut self, message: Self::In) -> Result<()> {
        trace!("ServerHandler::consume");
        let response = self.server.process(message);
        self.in_flight.push_back(response);
        Ok(())
    }

    fn produce(&mut self) -> Result<Option<Self::Out>> {
        trace!("ServerHandler::produce");

        // Return the ready response
        match self.in_flight.pop_front() {
            Some(res) => {
                trace!("  --> received response");
                Ok(Some(res))
            }
            None => {
                trace!("  --> no response ready");
                Ok(None)
            }
        }
    }
}

const RPC_SERVER_INITIAL_CLIENTS: usize = 32; // Initial client allocation per server.

pub(crate) fn make_server<S: Server>(server: S) -> ServerHandler<S> {
    ServerHandler::<S> {
        server,
        in_flight: VecDeque::with_capacity(RPC_SERVER_INITIAL_CLIENTS),
    }
}

#[cfg(test)]
mod test {
    use super::*;

    #[test]
    fn basic() {
        let queue = Arc::new(RequestQueue::new(1));
        let producer = queue.new_sender();
        assert!(queue.pop().is_none());
        producer.push(1).unwrap();
        assert!(queue.pop().is_some());
        assert!(queue.pop().is_none());
    }

    #[test]
    fn queue_dropped() {
        let queue = Arc::new(RequestQueue::new(1));
        let producer = queue.new_sender();
        drop(queue);
        assert!(producer.push(1).is_err());
    }

    #[test]
    fn queue_full() {
        let queue = Arc::new(RequestQueue::new(1));
        let producer = queue.new_sender();
        producer.push(1).unwrap();
        assert!(producer.push(2).is_err());
    }

    #[test]
    fn queue_producer_clone() {
        let queue = Arc::new(RequestQueue::new(1));
        let producer = queue.new_sender();
        let producer2 = producer.clone();
        producer.push(1).unwrap();
        assert!(producer2.push(2).is_err());
    }

    #[test]
    fn queue_producer_drop() {
        let queue = Arc::new(RequestQueue::new(1));
        let producer = queue.new_sender();
        let producer2 = producer.clone();
        drop(producer);
        assert!(producer2.push(2).is_ok());
    }

    #[test]
    fn queue_producer_weak() {
        let queue = Arc::new(RequestQueue::new(1));
        let producer = queue.new_sender();
        let producer2 = producer.clone();
        drop(queue);
        assert!(producer2.push(2).is_err());
    }

    #[test]
    fn queue_producer_shutdown() {
        let queue = Arc::new(RequestQueue::new(1));
        let producer = queue.new_sender();
        let producer2 = producer.clone();
        producer.push(1).unwrap();
        assert!(Arc::weak_count(&queue) == 2);
        drop(producer);
        assert!(Arc::weak_count(&queue) == 1);
        drop(producer2);
        assert!(Arc::weak_count(&queue) == 0);
    }
}

[ Dauer der Verarbeitung: 0.24 Sekunden  (vorverarbeitet)  ]