Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  rpccore.rs   Sprache: unbekannt

 
// Copyright © 2021 Mozilla Foundation
//
// This program is made available under an ISC-style license.  See the
// accompanying file LICENSE for details

use crossbeam_queue::ArrayQueue;
use mio::Token;
use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::io::{self, Error, ErrorKind, Result};
use std::marker::PhantomPinned;
use std::mem::ManuallyDrop;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};

use crate::ipccore::EventLoopHandle;

// This provides a safe-ish method for a thread to allocate
// stack storage space for a result, then pass a (wrapped)
// pointer to that location to another thread via
// a CompletionWriter to eventually store a result into.
struct Completion<T> {
    item: UnsafeCell<Option<T>>,
    writer: AtomicBool,
    _pin: PhantomPinned, // disable rustc's no-alias
}

impl<T> Completion<T> {
    fn new() -> Self {
        Completion {
            item: UnsafeCell::new(None),
            writer: AtomicBool::new(false),
            _pin: PhantomPinned,
        }
    }

    // Wait until the writer completes, then return the result.
    // This is intended to be a single-use function, once the writer
    // has completed any further attempts to wait will return None.
    fn wait(&self) -> Option<T> {
        // Wait for the writer to complete or be dropped.
        while self.writer.load(Ordering::Acquire) {
            std::thread::park();
        }
        unsafe { (*self.item.get()).take() }
    }

    // Create a writer for the other thread to store the
    // expected result into.
    fn writer(&self) -> CompletionWriter<T> {
        assert!(!self.writer.load(Ordering::Relaxed));
        self.writer.store(true, Ordering::Release);
        CompletionWriter {
            ptr: self as *const _ as *mut _,
            waiter: std::thread::current(),
        }
    }
}

impl<T> Drop for Completion<T> {
    fn drop(&mut self) {
        // Wait for the outstanding writer to complete before
        // dropping, since the CompletionWriter references
        // memory owned by this object.
        while self.writer.load(Ordering::Acquire) {
            std::thread::park();
        }
    }
}

struct CompletionWriter<T> {
    ptr: *mut Completion<T>, // Points to a Completion on another thread's stack
    waiter: std::thread::Thread, // Identifies thread waiting for completion
}

impl<T> CompletionWriter<T> {
    fn set(self, value: T) {
        // Store the result into the Completion's memory.
        // Since `set` consumes `self`, rely on `Drop` to
        // mark the writer as done and wake the Completion's
        // thread.
        unsafe {
            assert!((*self.ptr).writer.load(Ordering::Relaxed));
            *(*self.ptr).item.get() = Some(value);
        }
    }
}

impl<T> Drop for CompletionWriter<T> {
    fn drop(&mut self) {
        // Mark writer as complete - if `set` was not called,
        // the waiter will receive `None`.
        unsafe {
            (*self.ptr).writer.store(false, Ordering::Release);
        }
        // Wake the Completion's thread.
        self.waiter.unpark();
    }
}

// Safety: CompletionWriter holds a pointer to a Completion
// residing on another thread's stack.  The Completion always
// waits for an outstanding writer if present, and CompletionWriter
// releases the waiter and wakes the Completion's thread on drop,
// so this pointer will always be live for the duration of a
// CompletionWriter.
unsafe impl<T> Send for CompletionWriter<T> {}

// RPC message handler.  Implemented by ClientHandler (for Client)
// and ServerHandler (for Server).
pub(crate) trait Handler {
    type In;
    type Out;

    // Consume a request
    fn consume(&mut self, request: Self::In) -> Result<()>;

    // Produce a response
    fn produce(&mut self) -> Result<Option<Self::Out>>;
}

// Client RPC definition.  This supplies the expected message
// request and response types.
pub trait Client {
    type ServerMessage;
    type ClientMessage;
}

// Server RPC definition.  This supplies the expected message
// request and response types.  `process` is passed inbound RPC
// requests by the ServerHandler to be responded to by the server.
pub trait Server {
    type ServerMessage;
    type ClientMessage;

    fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage;
}

// RPC Client Proxy implementation.
type ProxyRequest<Request, Response> = (Request, CompletionWriter<Response>);

// RPC Proxy that may be `clone`d for use by multiple owners/threads.
// A Proxy `call` arranges for the supplied request to be transmitted
// to the associated Server via RPC and blocks awaiting the response
// via the associated `Completion`.
// A ClientHandler normally lives until the last Proxy is dropped, but if the ClientHandler
// encounters an internal error, `requests` will fail to upgrade, allowing
// the proxy to report an error.
#[derive(Debug)]
pub struct Proxy<Request, Response> {
    handle: Option<(EventLoopHandle, Token)>,
    requests: ManuallyDrop<RequestQueueSender<ProxyRequest<Request, Response>>>,
}

impl<Request, Response> Proxy<Request, Response> {
    fn new(requests: RequestQueueSender<ProxyRequest<Request, Response>>) -> Self {
        Self {
            handle: None,
            requests: ManuallyDrop::new(requests),
        }
    }

    pub fn call(&self, request: Request) -> Result<Response> {
        let response = Completion::new();
        self.requests.push((request, response.writer()))?;
        self.wake_connection();
        match response.wait() {
            Some(resp) => Ok(resp),
            None => Err(Error::new(ErrorKind::Other, "proxy recv error")),
        }
    }

    pub(crate) fn connect_event_loop(&mut self, handle: EventLoopHandle, token: Token) {
        self.handle = Some((handle, token));
    }

    fn wake_connection(&self) {
        let (handle, token) = self
            .handle
            .as_ref()
            .expect("proxy not connected to event loop");
        handle.wake_connection(*token);
    }
}

impl<Request, Response> Clone for Proxy<Request, Response> {
    fn clone(&self) -> Self {
        let mut clone = Self::new((*self.requests).clone());
        let (handle, token) = self
            .handle
            .as_ref()
            .expect("proxy not connected to event loop");
        clone.connect_event_loop(handle.clone(), *token);
        clone
    }
}

impl<Request, Response> Drop for Proxy<Request, Response> {
    fn drop(&mut self) {
        trace!("Proxy drop, waking EventLoop");
        // Must drop `requests` before waking the connection, otherwise
        // the wake may be processed before the (last) weak reference is
        // dropped.
        let last_proxy = self.requests.live_proxies();
        unsafe {
            ManuallyDrop::drop(&mut self.requests);
        }
        if last_proxy == 1 && self.handle.is_some() {
            self.wake_connection()
        }
    }
}

const RPC_CLIENT_INITIAL_PROXIES: usize = 32; // Initial proxy pre-allocation per client.

// Client-specific Handler implementation.
// The IPC EventLoop Driver calls this to execute client-specific
// RPC handling.  Serialized messages sent via a Proxy are queued
// for transmission when `produce` is called.
// Deserialized messages are passed via `consume` to
// trigger response completion by sending the response via a channel
// connected to a ProxyResponse.
pub(crate) struct ClientHandler<C: Client> {
    in_flight: VecDeque<CompletionWriter<C::ClientMessage>>,
    requests: Arc<RequestQueue<ProxyRequest<C::ServerMessage, C::ClientMessage>>>,
}

impl<C: Client> ClientHandler<C> {
    fn new(
        requests: Arc<RequestQueue<ProxyRequest<C::ServerMessage, C::ClientMessage>>>,
    ) -> ClientHandler<C> {
        ClientHandler::<C> {
            in_flight: VecDeque::with_capacity(RPC_CLIENT_INITIAL_PROXIES),
            requests,
        }
    }
}

impl<C: Client> Handler for ClientHandler<C> {
    type In = C::ClientMessage;
    type Out = C::ServerMessage;

    fn consume(&mut self, response: Self::In) -> Result<()> {
        trace!("ClientHandler::consume");
        if let Some(response_writer) = self.in_flight.pop_front() {
            response_writer.set(response);
        } else {
            return Err(Error::new(ErrorKind::Other, "request/response mismatch"));
        }

        Ok(())
    }

    fn produce(&mut self) -> Result<Option<Self::Out>> {
        trace!("ClientHandler::produce");

        // If the weak count is zero, no proxies are attached and
        // no further proxies can be attached since every proxy
        // after the initial one is cloned from an existing instance.
        self.requests.check_live_proxies()?;
        // Try to get a new message
        match self.requests.pop() {
            Some((request, response_writer)) => {
                trace!("  --> received request");
                self.in_flight.push_back(response_writer);
                Ok(Some(request))
            }
            None => {
                trace!("  --> no request");
                Ok(None)
            }
        }
    }
}

#[derive(Debug)]
pub(crate) struct RequestQueue<T> {
    queue: ArrayQueue<T>,
}

impl<T> RequestQueue<T> {
    pub(crate) fn new(size: usize) -> Self {
        RequestQueue {
            queue: ArrayQueue::new(size),
        }
    }

    pub(crate) fn pop(&self) -> Option<T> {
        self.queue.pop()
    }

    pub(crate) fn new_sender(self: &Arc<Self>) -> RequestQueueSender<T> {
        RequestQueueSender {
            inner: Arc::downgrade(self),
        }
    }

    pub(crate) fn check_live_proxies(self: &Arc<Self>) -> Result<()> {
        if Arc::weak_count(self) == 0 {
            return Err(io::ErrorKind::ConnectionAborted.into());
        }
        Ok(())
    }
}

pub(crate) struct RequestQueueSender<T> {
    inner: Weak<RequestQueue<T>>,
}

impl<T> RequestQueueSender<T> {
    pub(crate) fn push(&self, request: T) -> Result<()> {
        if let Some(consumer) = self.inner.upgrade() {
            if consumer.queue.push(request).is_err() {
                debug!("Proxy[{:p}]: call failed - CH::requests full", self);
                return Err(io::ErrorKind::ConnectionAborted.into());
            }
            return Ok(());
        }
        debug!("Proxy[{:p}]: call failed - CH::requests dropped", self);
        Err(Error::new(ErrorKind::Other, "proxy send error"))
    }

    pub(crate) fn live_proxies(&self) -> usize {
        Weak::weak_count(&self.inner)
    }
}

impl<T> Clone for RequestQueueSender<T> {
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
        }
    }
}

impl<T> std::fmt::Debug for RequestQueueSender<T> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("RequestQueueProducer")
            .field("inner", &self.inner.as_ptr())
            .finish()
    }
}

#[allow(clippy::type_complexity)]
pub(crate) fn make_client<C: Client>(
) -> Result<(ClientHandler<C>, Proxy<C::ServerMessage, C::ClientMessage>)> {
    let requests = Arc::new(RequestQueue::new(RPC_CLIENT_INITIAL_PROXIES));
    let proxy_req = requests.new_sender();
    let handler = ClientHandler::new(requests);

    Ok((handler, Proxy::new(proxy_req)))
}

// Server-specific Handler implementation.
// The IPC EventLoop Driver calls this to execute server-specific
// RPC handling.  Deserialized messages are passed via `consume` to the
// associated `server` for processing.  Server responses are then queued
// for RPC to the associated client when `produce` is called.
pub(crate) struct ServerHandler<S: Server> {
    server: S,
    in_flight: VecDeque<S::ClientMessage>,
}

impl<S: Server> Handler for ServerHandler<S> {
    type In = S::ServerMessage;
    type Out = S::ClientMessage;

    fn consume(&mut self, message: Self::In) -> Result<()> {
        trace!("ServerHandler::consume");
        let response = self.server.process(message);
        self.in_flight.push_back(response);
        Ok(())
    }

    fn produce(&mut self) -> Result<Option<Self::Out>> {
        trace!("ServerHandler::produce");

        // Return the ready response
        match self.in_flight.pop_front() {
            Some(res) => {
                trace!("  --> received response");
                Ok(Some(res))
            }
            None => {
                trace!("  --> no response ready");
                Ok(None)
            }
        }
    }
}

const RPC_SERVER_INITIAL_CLIENTS: usize = 32; // Initial client allocation per server.

pub(crate) fn make_server<S: Server>(server: S) -> ServerHandler<S> {
    ServerHandler::<S> {
        server,
        in_flight: VecDeque::with_capacity(RPC_SERVER_INITIAL_CLIENTS),
    }
}

#[cfg(test)]
mod test {
    use super::*;

    #[test]
    fn basic() {
        let queue = Arc::new(RequestQueue::new(1));
        let producer = queue.new_sender();
        assert!(queue.pop().is_none());
        producer.push(1).unwrap();
        assert!(queue.pop().is_some());
        assert!(queue.pop().is_none());
    }

    #[test]
    fn queue_dropped() {
        let queue = Arc::new(RequestQueue::new(1));
        let producer = queue.new_sender();
        drop(queue);
        assert!(producer.push(1).is_err());
    }

    #[test]
    fn queue_full() {
        let queue = Arc::new(RequestQueue::new(1));
        let producer = queue.new_sender();
        producer.push(1).unwrap();
        assert!(producer.push(2).is_err());
    }

    #[test]
    fn queue_producer_clone() {
        let queue = Arc::new(RequestQueue::new(1));
        let producer = queue.new_sender();
        let producer2 = producer.clone();
        producer.push(1).unwrap();
        assert!(producer2.push(2).is_err());
    }

    #[test]
    fn queue_producer_drop() {
        let queue = Arc::new(RequestQueue::new(1));
        let producer = queue.new_sender();
        let producer2 = producer.clone();
        drop(producer);
        assert!(producer2.push(2).is_ok());
    }

    #[test]
    fn queue_producer_weak() {
        let queue = Arc::new(RequestQueue::new(1));
        let producer = queue.new_sender();
        let producer2 = producer.clone();
        drop(queue);
        assert!(producer2.push(2).is_err());
    }

    #[test]
    fn queue_producer_shutdown() {
        let queue = Arc::new(RequestQueue::new(1));
        let producer = queue.new_sender();
        let producer2 = producer.clone();
        producer.push(1).unwrap();
        assert!(Arc::weak_count(&queue) == 2);
        drop(producer);
        assert!(Arc::weak_count(&queue) == 1);
        drop(producer2);
        assert!(Arc::weak_count(&queue) == 0);
    }
}

[ Dauer der Verarbeitung: 0.4 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge