Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/toolkit/components/kvstore/src/skv/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 12 kB image not shown  

Quelle  coordinator.rs   Sprache: unbekannt

 
Spracherkennung für: .rs vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]

/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

//! Multi-store coordinator.

use std::{
    collections::{BTreeMap, BTreeSet},
    fmt::Debug,
    num::NonZeroUsize,
    ops::Bound,
    sync::{Arc, Mutex, OnceLock},
};

// `std::collections::HashMap::extract_if` is unstable [1],
// so we use Hashbrown.
//
// [1]: https://github.com/rust-lang/rust/issues/59618
use hashbrown::hash_map::{Entry, HashMap};

use crate::skv::{
    abort::{AbortController, AbortSignal},
    store::{Store, StorePath},
};

/// Manages access to all persistent stores.
///
/// The coordinator is a singleton that lives for the lifetime of
/// the process. It has two main responsibilities:
///
/// 1. Decoupling the lifecycle of the stores from the
///    lifecycle of the [`crate::skv::interface`] objects.
/// 2. Keeping track of which objects are accessing which stores, and
///    closing stores once they're no longer needed.
///
/// ## Why have a coordinator?
///
/// Interface objects are a little cumbersome to work with in pure Rust:
/// they don't have predictable lifetimes; they can only be held by an
/// [`xpcom::RefPtr`]; and they can unintentionally retain the resources
/// they're managing beyond their intended use.
///
/// The coordinator introduces a layer of indirection to help
/// with these challenges.
///
/// ## Using the coordinator
///
/// When an interface object is instantiated, it either requests a new client
/// from the coordinator, or creates a child of an existing client. The object
/// holds on to the client, and uses it to access any stores it needs
/// during its lifecycle.
///
/// When the object is done accessing its stores, it invalidates its client.
/// The coordinator detaches the invalidated client, and its children,
/// from any open stores. This prevents the object from accessing any stores
/// again, even if the [`xpcom::RefPtr`] holding the object isn't released
/// right away.
///
/// ## Object hierarchies
///
/// Clients can have multiple children, grandchildren, great-grandchildren,
/// and so on; recursively. This is useful for managing hierarchies of
/// interface objects, where the child's lifecycle is the same or shorter
/// than its parent's.
///
/// Check out [`crate::skv::interface::KeyValueService`] and
/// [`crate::skv::interface::KeyValueDatabase`] for an example of
/// hierarchies in action.
#[derive(Debug)]
pub struct Coordinator {
    state: Mutex<CoordinatorState>,
}

impl Coordinator {
    fn new() -> Self {
        Coordinator {
            state: Mutex::new(CoordinatorState {
                clients: BTreeMap::new(),
                stores: HashMap::new(),
            }),
        }
    }

    /// Returns the singleton coordinator.
    pub fn get_or_create() -> &'static Self {
        static COORDINATOR: OnceLock<Coordinator> = OnceLock::new();
        COORDINATOR.get_or_init(|| Coordinator::new())
    }

    /// Creates a new coordinator client.
    pub fn client_with_name(&self, name: &'static str) -> CoordinatorClient<'_> {
        let mut state = self.state.lock().unwrap();

        // Make this client the new last sibling by incrementing
        // the first bud of the current last sibling's key.
        let last_sibling_key = state.clients.last_key_value().map(|(key, _)| key);
        let key = ClientKey::new(
            last_sibling_key
                .map(|key| key.first().increment())
                .unwrap_or(ClientKeyBud::MIN),
        );
        let controller = AbortController::new();
        let signal = controller.signal();
        state
            .clients
            .insert(key.clone(), ActiveClient { controller });

        CoordinatorClient {
            coordinator: self,
            name,
            key,
            signal,
        }
    }
}

/// A client owned by an interface object.
#[derive(Clone, Debug)]
pub struct CoordinatorClient<'a> {
    coordinator: &'a Coordinator,
    name: &'static str,
    key: ClientKey,
    signal: AbortSignal,
}

impl<'a> CoordinatorClient<'a> {
    /// Gets or opens a store for this client.
    ///
    /// Stores connect to the underlying physical database lazily,
    /// so this does _not_ block on I/O.
    pub fn store_for_path(&self, path: StorePath) -> Result<Arc<Store>, CoordinatorError> {
        let mut state = self.coordinator.state.lock().unwrap();
        if !state.clients.contains_key(&self.key) {
            return Err(CoordinatorError::Invalidated(self.name));
        }
        match state.stores.entry(path) {
            Entry::Occupied(mut entry) => Ok(entry.get_mut().attach(self.key.clone())),
            Entry::Vacant(entry) => {
                let store = InUseStore::new(Store::new(entry.key().clone()));
                Ok(entry.insert(store).attach(self.key.clone()))
            }
        }
    }

    /// Creates a child of this client.
    pub fn child_with_name(
        &self,
        name: &'static str,
    ) -> Result<CoordinatorClient<'a>, CoordinatorError> {
        let mut state = self.coordinator.state.lock().unwrap();
        if !state.clients.contains_key(&self.key) {
            return Err(CoordinatorError::Invalidated(self.name));
        }

        let child_key = {
            // Make this child the new last child by incrementing
            // the last bud of the current last child's key.
            let last_child_key = {
                let max_child_key = self.key.clone().appending(ClientKeyBud::MAX);
                let children = state
                    .clients
                    .range((
                        // Include all our descendants, but not ourselves.
                        Bound::Excluded(&self.key),
                        Bound::Included(&max_child_key),
                    ))
                    .map(|(key, _)| key)
                    .filter(|key| {
                        // Narrow down our descendants to immediate children.
                        key.len() <= max_child_key.len()
                    });
                children.last()
            };
            self.key.clone().appending(
                last_child_key
                    .map(|key| key.last().increment())
                    .unwrap_or(ClientKeyBud::MIN),
            )
        };
        let controller = AbortController::new();
        let signal = controller.signal();
        state
            .clients
            .insert(child_key.clone(), ActiveClient { controller });

        Ok(CoordinatorClient {
            coordinator: self.coordinator,
            name,
            key: child_key,
            signal,
        })
    }

    /// Invalidates this client.
    ///
    /// Invalidation is recursive: if the client has descendants
    /// (children, grandchildren, etc.), they'll be invalidated, too.
    ///
    /// If the client or any of its descendants are the last clients
    /// of a store, invalidating the client will also close those stores.
    /// **This can block on disk I/O**, so clients should not be
    /// invalidated on the main thread.
    pub fn invalidate(&self) {
        let (abortable_clients, closeable_stores) = {
            let mut state = self.coordinator.state.lock().unwrap();
            let keys = {
                let max_child_key = self.key.clone().appending(ClientKeyBud::MAX);
                state
                    .clients
                    .range(
                        // Include ourselves and all our descendants.
                        &self.key..=&max_child_key,
                    )
                    .map(|(key, _)| key)
                    .cloned()
                    .collect::<Vec<_>>()
            };
            if keys.is_empty() {
                // Already invalidated; no need to do anything.
                return;
            }
            let abortable_clients = keys
                .iter()
                .map(|key| {
                    // Invariant: `keys` only contains keys that exist
                    // in `clients`.
                    state.clients.remove(key).expect("invariant violation")
                })
                .collect::<Vec<_>>();
            let closeable_stores = state
                .stores
                .extract_if(|_, store| {
                    // Detach ourselves from every store.
                    match store.detach(&keys) {
                        DetachResult::StillInUse => false,
                        // If we detached all clients from this store, also
                        // remove the store from the map, so that we can
                        // close it.
                        DetachResult::Closeable => true,
                    }
                })
                .map(|(_, store)| store)
                .collect::<Vec<_>>();
            (abortable_clients, closeable_stores)
        };

        for client in abortable_clients {
            client.controller.abort();
        }

        for store in closeable_stores {
            // Invariant: `into_inner` always succeeds for closeable stores.
            let store = store.into_inner().expect("invariant violation");
            store.close();
        }
    }

    /// Returns a shared reference to this client's cancellation signal.
    ///
    /// The signal will fire when the client, or any of its ancestors,
    /// are invalidated.
    pub fn signal(&self) -> AbortSignal {
        self.signal.clone()
    }
}

/// An incrementable and totally ordered scalar value.
///
/// These are called "buds" because they can form leaves and branches.
#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
struct ClientKeyBud(NonZeroUsize);

impl ClientKeyBud {
    const MIN: ClientKeyBud = ClientKeyBud(NonZeroUsize::MIN);
    const MAX: ClientKeyBud = ClientKeyBud(NonZeroUsize::MAX);

    fn increment(self) -> ClientKeyBud {
        Self(self.0.checked_add(1).unwrap())
    }
}

impl Debug for ClientKeyBud {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "#{:?}", self.0)
    }
}

/// A hierarchical key uniquely identifying a coordinator client.
///
/// A key comprises one or more "buds". If clients M and N are siblings,
/// then their keys only differ in the last bud. If N is a child of M,
/// then N's key has one more bud than M's key.
#[derive(Clone, Eq, Hash, Ord, PartialEq, PartialOrd)]
struct ClientKey(ClientKeyBud, Box<[ClientKeyBud]>);

impl ClientKey {
    fn new(id: ClientKeyBud) -> Self {
        Self(id, Box::default())
    }

    /// Returns the first bud of the key.
    fn first(&self) -> ClientKeyBud {
        self.0
    }

    /// Returns the last bud of the key.
    fn last(&self) -> ClientKeyBud {
        self.1.last().copied().unwrap_or(self.0)
    }

    /// Returns the number of buds in the key.
    fn len(&self) -> usize {
        1usize.checked_add(self.1.len()).unwrap()
    }

    /// Consumes this key, and returns a new key with
    /// the given bud added to the end.
    fn appending(self, bud: ClientKeyBud) -> Self {
        let mut buds = self.1.into_vec();
        buds.push(bud);
        ClientKey(self.0, buds.into_boxed_slice())
    }
}

impl Debug for ClientKey {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_set().entry(&self.0).entries(self.1.iter()).finish()
    }
}

#[derive(Debug)]
struct InUseStore {
    attached_client_keys: BTreeSet<ClientKey>,
    store: Arc<Store>,
}

impl InUseStore {
    fn new(store: Store) -> Self {
        Self {
            attached_client_keys: BTreeSet::new(),
            store: Arc::new(store),
        }
    }

    /// Notes that a client with the given key
    /// is now using this store.
    ///
    /// If the client is already using this store,
    /// attaching it again is a no-op.
    fn attach(&mut self, key: ClientKey) -> Arc<Store> {
        self.attached_client_keys.insert(key);
        Arc::clone(&self.store)
    }

    /// Notes that all the clients with the given keys
    /// are no longer using this store.
    fn detach(&mut self, keys: &[ClientKey]) -> DetachResult {
        for key in keys {
            self.attached_client_keys.remove(key);
        }
        if self.attached_client_keys.is_empty() {
            DetachResult::Closeable
        } else {
            DetachResult::StillInUse
        }
    }

    fn into_inner(self) -> Result<Arc<Store>, Self> {
        if self.attached_client_keys.is_empty() {
            Ok(self.store)
        } else {
            Err(self)
        }
    }
}

#[derive(Clone, Copy, Debug)]
enum DetachResult {
    StillInUse,
    Closeable,
}

#[derive(Debug)]
struct ActiveClient {
    controller: AbortController,
}

#[derive(Debug)]
struct CoordinatorState {
    clients: BTreeMap<ClientKey, ActiveClient>,
    stores: HashMap<StorePath, InUseStore>,
}

#[derive(thiserror::Error, Debug)]
pub enum CoordinatorError {
    #[error("`{0}` invalidated")]
    Invalidated(&'static str),
}

[ Dauer der Verarbeitung: 0.46 Sekunden  ]