Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  coordinator.rs   Sprache: unbekannt

 
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

//! Multi-store coordinator.

use std::{
    collections::{BTreeMap, BTreeSet},
    fmt::Debug,
    num::NonZeroUsize,
    ops::Bound,
    sync::{Arc, Mutex, OnceLock},
};

// `std::collections::HashMap::extract_if` is unstable [1],
// so we use Hashbrown.
//
// [1]: https://github.com/rust-lang/rust/issues/59618
use hashbrown::hash_map::{Entry, HashMap};

use crate::skv::{
    abort::{AbortController, AbortSignal},
    store::{Store, StorePath},
};

/// Manages access to all persistent stores.
///
/// The coordinator is a singleton that lives for the lifetime of
/// the process. It has two main responsibilities:
///
/// 1. Decoupling the lifecycle of the stores from the
///    lifecycle of the [`crate::skv::interface`] objects.
/// 2. Keeping track of which objects are accessing which stores, and
///    closing stores once they're no longer needed.
///
/// ## Why have a coordinator?
///
/// Interface objects are a little cumbersome to work with in pure Rust:
/// they don't have predictable lifetimes; they can only be held by an
/// [`xpcom::RefPtr`]; and they can unintentionally retain the resources
/// they're managing beyond their intended use.
///
/// The coordinator introduces a layer of indirection to help
/// with these challenges.
///
/// ## Using the coordinator
///
/// When an interface object is instantiated, it either requests a new client
/// from the coordinator, or creates a child of an existing client. The object
/// holds on to the client, and uses it to access any stores it needs
/// during its lifecycle.
///
/// When the object is done accessing its stores, it invalidates its client.
/// The coordinator detaches the invalidated client, and its children,
/// from any open stores. This prevents the object from accessing any stores
/// again, even if the [`xpcom::RefPtr`] holding the object isn't released
/// right away.
///
/// ## Object hierarchies
///
/// Clients can have multiple children, grandchildren, great-grandchildren,
/// and so on; recursively. This is useful for managing hierarchies of
/// interface objects, where the child's lifecycle is the same or shorter
/// than its parent's.
///
/// Check out [`crate::skv::interface::KeyValueService`] and
/// [`crate::skv::interface::KeyValueDatabase`] for an example of
/// hierarchies in action.
#[derive(Debug)]
pub struct Coordinator {
    state: Mutex<CoordinatorState>,
}

impl Coordinator {
    fn new() -> Self {
        Coordinator {
            state: Mutex::new(CoordinatorState {
                clients: BTreeMap::new(),
                stores: HashMap::new(),
            }),
        }
    }

    /// Returns the singleton coordinator.
    pub fn get_or_create() -> &'static Self {
        static COORDINATOR: OnceLock<Coordinator> = OnceLock::new();
        COORDINATOR.get_or_init(|| Coordinator::new())
    }

    /// Creates a new coordinator client.
    pub fn client_with_name(&self, name: &'static str) -> CoordinatorClient<'_> {
        let mut state = self.state.lock().unwrap();

        // Make this client the new last sibling by incrementing
        // the first bud of the current last sibling's key.
        let last_sibling_key = state.clients.last_key_value().map(|(key, _)| key);
        let key = ClientKey::new(
            last_sibling_key
                .map(|key| key.first().increment())
                .unwrap_or(ClientKeyBud::MIN),
        );
        let controller = AbortController::new();
        let signal = controller.signal();
        state
            .clients
            .insert(key.clone(), ActiveClient { controller });

        CoordinatorClient {
            coordinator: self,
            name,
            key,
            signal,
        }
    }
}

/// A client owned by an interface object.
#[derive(Clone, Debug)]
pub struct CoordinatorClient<'a> {
    coordinator: &'a Coordinator,
    name: &'static str,
    key: ClientKey,
    signal: AbortSignal,
}

impl<'a> CoordinatorClient<'a> {
    /// Gets or opens a store for this client.
    ///
    /// Stores connect to the underlying physical database lazily,
    /// so this does _not_ block on I/O.
    pub fn store_for_path(&self, path: StorePath) -> Result<Arc<Store>, CoordinatorError> {
        let mut state = self.coordinator.state.lock().unwrap();
        if !state.clients.contains_key(&self.key) {
            return Err(CoordinatorError::Invalidated(self.name));
        }
        match state.stores.entry(path) {
            Entry::Occupied(mut entry) => Ok(entry.get_mut().attach(self.key.clone())),
            Entry::Vacant(entry) => {
                let store = InUseStore::new(Store::new(entry.key().clone()));
                Ok(entry.insert(store).attach(self.key.clone()))
            }
        }
    }

    /// Creates a child of this client.
    pub fn child_with_name(
        &self,
        name: &'static str,
    ) -> Result<CoordinatorClient<'a>, CoordinatorError> {
        let mut state = self.coordinator.state.lock().unwrap();
        if !state.clients.contains_key(&self.key) {
            return Err(CoordinatorError::Invalidated(self.name));
        }

        let child_key = {
            // Make this child the new last child by incrementing
            // the last bud of the current last child's key.
            let last_child_key = {
                let max_child_key = self.key.clone().appending(ClientKeyBud::MAX);
                let children = state
                    .clients
                    .range((
                        // Include all our descendants, but not ourselves.
                        Bound::Excluded(&self.key),
                        Bound::Included(&max_child_key),
                    ))
                    .map(|(key, _)| key)
                    .filter(|key| {
                        // Narrow down our descendants to immediate children.
                        key.len() <= max_child_key.len()
                    });
                children.last()
            };
            self.key.clone().appending(
                last_child_key
                    .map(|key| key.last().increment())
                    .unwrap_or(ClientKeyBud::MIN),
            )
        };
        let controller = AbortController::new();
        let signal = controller.signal();
        state
            .clients
            .insert(child_key.clone(), ActiveClient { controller });

        Ok(CoordinatorClient {
            coordinator: self.coordinator,
            name,
            key: child_key,
            signal,
        })
    }

    /// Invalidates this client.
    ///
    /// Invalidation is recursive: if the client has descendants
    /// (children, grandchildren, etc.), they'll be invalidated, too.
    ///
    /// If the client or any of its descendants are the last clients
    /// of a store, invalidating the client will also close those stores.
    /// **This can block on disk I/O**, so clients should not be
    /// invalidated on the main thread.
    pub fn invalidate(&self) {
        let (abortable_clients, closeable_stores) = {
            let mut state = self.coordinator.state.lock().unwrap();
            let keys = {
                let max_child_key = self.key.clone().appending(ClientKeyBud::MAX);
                state
                    .clients
                    .range(
                        // Include ourselves and all our descendants.
                        &self.key..=&max_child_key,
                    )
                    .map(|(key, _)| key)
                    .cloned()
                    .collect::<Vec<_>>()
            };
            if keys.is_empty() {
                // Already invalidated; no need to do anything.
                return;
            }
            let abortable_clients = keys
                .iter()
                .map(|key| {
                    // Invariant: `keys` only contains keys that exist
                    // in `clients`.
                    state.clients.remove(key).expect("invariant violation")
                })
                .collect::<Vec<_>>();
            let closeable_stores = state
                .stores
                .extract_if(|_, store| {
                    // Detach ourselves from every store.
                    match store.detach(&keys) {
                        DetachResult::StillInUse => false,
                        // If we detached all clients from this store, also
                        // remove the store from the map, so that we can
                        // close it.
                        DetachResult::Closeable => true,
                    }
                })
                .map(|(_, store)| store)
                .collect::<Vec<_>>();
            (abortable_clients, closeable_stores)
        };

        for client in abortable_clients {
            client.controller.abort();
        }

        for store in closeable_stores {
            // Invariant: `into_inner` always succeeds for closeable stores.
            let store = store.into_inner().expect("invariant violation");
            store.close();
        }
    }

    /// Returns a shared reference to this client's cancellation signal.
    ///
    /// The signal will fire when the client, or any of its ancestors,
    /// are invalidated.
    pub fn signal(&self) -> AbortSignal {
        self.signal.clone()
    }
}

/// An incrementable and totally ordered scalar value.
///
/// These are called "buds" because they can form leaves and branches.
#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
struct ClientKeyBud(NonZeroUsize);

impl ClientKeyBud {
    const MIN: ClientKeyBud = ClientKeyBud(NonZeroUsize::MIN);
    const MAX: ClientKeyBud = ClientKeyBud(NonZeroUsize::MAX);

    fn increment(self) -> ClientKeyBud {
        Self(self.0.checked_add(1).unwrap())
    }
}

impl Debug for ClientKeyBud {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "#{:?}", self.0)
    }
}

/// A hierarchical key uniquely identifying a coordinator client.
///
/// A key comprises one or more "buds". If clients M and N are siblings,
/// then their keys only differ in the last bud. If N is a child of M,
/// then N's key has one more bud than M's key.
#[derive(Clone, Eq, Hash, Ord, PartialEq, PartialOrd)]
struct ClientKey(ClientKeyBud, Box<[ClientKeyBud]>);

impl ClientKey {
    fn new(id: ClientKeyBud) -> Self {
        Self(id, Box::default())
    }

    /// Returns the first bud of the key.
    fn first(&self) -> ClientKeyBud {
        self.0
    }

    /// Returns the last bud of the key.
    fn last(&self) -> ClientKeyBud {
        self.1.last().copied().unwrap_or(self.0)
    }

    /// Returns the number of buds in the key.
    fn len(&self) -> usize {
        1usize.checked_add(self.1.len()).unwrap()
    }

    /// Consumes this key, and returns a new key with
    /// the given bud added to the end.
    fn appending(self, bud: ClientKeyBud) -> Self {
        let mut buds = self.1.into_vec();
        buds.push(bud);
        ClientKey(self.0, buds.into_boxed_slice())
    }
}

impl Debug for ClientKey {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_set().entry(&self.0).entries(self.1.iter()).finish()
    }
}

#[derive(Debug)]
struct InUseStore {
    attached_client_keys: BTreeSet<ClientKey>,
    store: Arc<Store>,
}

impl InUseStore {
    fn new(store: Store) -> Self {
        Self {
            attached_client_keys: BTreeSet::new(),
            store: Arc::new(store),
        }
    }

    /// Notes that a client with the given key
    /// is now using this store.
    ///
    /// If the client is already using this store,
    /// attaching it again is a no-op.
    fn attach(&mut self, key: ClientKey) -> Arc<Store> {
        self.attached_client_keys.insert(key);
        Arc::clone(&self.store)
    }

    /// Notes that all the clients with the given keys
    /// are no longer using this store.
    fn detach(&mut self, keys: &[ClientKey]) -> DetachResult {
        for key in keys {
            self.attached_client_keys.remove(key);
        }
        if self.attached_client_keys.is_empty() {
            DetachResult::Closeable
        } else {
            DetachResult::StillInUse
        }
    }

    fn into_inner(self) -> Result<Arc<Store>, Self> {
        if self.attached_client_keys.is_empty() {
            Ok(self.store)
        } else {
            Err(self)
        }
    }
}

#[derive(Clone, Copy, Debug)]
enum DetachResult {
    StillInUse,
    Closeable,
}

#[derive(Debug)]
struct ActiveClient {
    controller: AbortController,
}

#[derive(Debug)]
struct CoordinatorState {
    clients: BTreeMap<ClientKey, ActiveClient>,
    stores: HashMap<StorePath, InUseStore>,
}

#[derive(thiserror::Error, Debug)]
pub enum CoordinatorError {
    #[error("`{0}` invalidated")]
    Invalidated(&'static str),
}

[ Dauer der Verarbeitung: 0.31 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge