Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/toolkit/components/kvstore/src/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 25 kB image not shown  

Quelle  task.rs   Sprache: unbekannt

 
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

extern crate xpcom;

use crossbeam_utils::atomic::AtomicCell;
use moz_task::Task;
use nserror::{nsresult, NS_ERROR_FAILURE};
use nsstring::{nsCString, nsString};
use rkv::backend::{
    BackendEnvironmentBuilder, BackendInfo, RecoveryStrategy, SafeMode, SafeModeDatabase,
    SafeModeEnvironment,
};
use rkv::{OwnedValue, StoreError, StoreOptions, Value};
use std::{
    str,
    sync::{Arc, RwLock},
};
use storage_variant::VariantType;
use xpcom::{
    interfaces::{
        nsIKeyValueDatabaseCallback, nsIKeyValueEnumeratorCallback, nsIKeyValueVariantCallback,
        nsIKeyValueVoidCallback, nsIVariant,
    },
    RefPtr, ThreadBoundRefPtr,
};

use crate::{
    error::KeyValueError, fs::WidePathBuf, owned_value::owned_to_variant, KeyValueDatabase,
    KeyValueEnumerator, KeyValuePairResult,
};

type Manager = rkv::Manager<SafeModeEnvironment>;
type Rkv = rkv::Rkv<SafeModeEnvironment>;
type SingleStore = rkv::SingleStore<SafeModeDatabase>;

/// A macro to generate a done() implementation for a Task.
/// Takes one argument that specifies the type of the Task's callback function:
///   value: a callback function that takes a value
///   void: the callback function doesn't take a value
///
/// The "value" variant calls self.convert() to convert a successful result
/// into the value to pass to the callback function.  So if you generate done()
/// for a callback that takes a value, ensure you also implement convert()!
macro_rules! task_done {
    (value) => {
        fn done(&self) -> Result<(), nsresult> {
            // If TaskRunnable calls Task.done() to return a result on the
            // main thread before TaskRunnable returns on the database thread,
            // then the Task will get dropped on the database thread.
            //
            // But the callback is an nsXPCWrappedJS that isn't safe to release
            // on the database thread.  So we move it out of the Task here to ensure
            // it gets released on the main thread.
            let threadbound = self.callback.swap(None).ok_or(NS_ERROR_FAILURE)?;
            let callback = threadbound.get_ref().ok_or(NS_ERROR_FAILURE)?;

            match self.result.swap(None) {
                Some(Ok(value)) => unsafe { callback.Resolve(self.convert(value)?.coerce()) },
                Some(Err(err)) => unsafe { callback.Reject(&*nsCString::from(err.to_string())) },
                None => unsafe { callback.Reject(&*nsCString::from("unexpected")) },
            }
            .to_result()
        }
    };

    (void) => {
        fn done(&self) -> Result<(), nsresult> {
            // If TaskRunnable calls Task.done() to return a result on the
            // main thread before TaskRunnable returns on the database thread,
            // then the Task will get dropped on the database thread.
            //
            // But the callback is an nsXPCWrappedJS that isn't safe to release
            // on the database thread.  So we move it out of the Task here to ensure
            // it gets released on the main thread.
            let threadbound = self.callback.swap(None).ok_or(NS_ERROR_FAILURE)?;
            let callback = threadbound.get_ref().ok_or(NS_ERROR_FAILURE)?;

            match self.result.swap(None) {
                Some(Ok(())) => unsafe { callback.Resolve() },
                Some(Err(err)) => unsafe { callback.Reject(&*nsCString::from(err.to_string())) },
                None => unsafe { callback.Reject(&*nsCString::from("unexpected")) },
            }
            .to_result()
        }
    };
}

/// A tuple comprising an Arc<RwLock<Rkv>> and a SingleStore, which is
/// the result of GetOrCreateTask.  We declare this type because otherwise
/// Clippy complains "error: very complex type used. Consider factoring
/// parts into `type` definitions" (i.e. clippy::type-complexity) when we
/// declare the type of `GetOrCreateTask::result`.
type RkvStoreTuple = (Arc<RwLock<Rkv>>, SingleStore);

// The threshold for active resizing.
const RESIZE_RATIO: f32 = 0.85;

/// The threshold (50 MB) to switch the resizing policy from the double size to
/// the constant increment for active resizing.
const INCREMENTAL_RESIZE_THRESHOLD: usize = 52_428_800;

/// The incremental resize step (5 MB)
const INCREMENTAL_RESIZE_STEP: usize = 5_242_880;

/// The RKV disk page size and mask.
const PAGE_SIZE: usize = 4096;
const PAGE_SIZE_MASK: usize = 0b_1111_1111_1111;

/// Round the non-zero size to the multiple of page size greater or equal.
///
/// It does not handle the special cases such as size zero and overflow,
/// because even if that happens (extremely unlikely though), RKV will
/// ignore the new size if it's smaller than the current size.
///
/// E.g:
///     [   1 -  4096] -> 4096,
///     [4097 -  8192] -> 8192,
///     [8193 - 12286] -> 12286,
fn round_to_pagesize(size: usize) -> usize {
    if size & PAGE_SIZE_MASK == 0 {
        size
    } else {
        (size & !PAGE_SIZE_MASK) + PAGE_SIZE
    }
}

/// Kvstore employes two auto resizing strategies: active and passive resizing.
/// They work together to liberate consumers from having to guess the "proper"
/// size of the store upfront. See more detail about this in Bug 1543861.
///
/// Active resizing that is performed at the store startup.
///
/// It either increases the size in double, or by a constant size if its size
/// reaches INCREMENTAL_RESIZE_THRESHOLD.
///
/// Note that on Linux / MAC OSX, the increased size would only take effect if
/// there is a write transaction committed afterwards.
fn active_resize(env: &Rkv) -> Result<(), StoreError> {
    let info = env.info()?;
    let current_size = info.map_size();

    let size = if current_size < INCREMENTAL_RESIZE_THRESHOLD {
        current_size << 1
    } else {
        current_size + INCREMENTAL_RESIZE_STEP
    };

    env.set_map_size(size)?;
    Ok(())
}

/// Passive resizing that is performed when the MAP_FULL error occurs. It
/// increases the store with a `wanted` size.
///
/// Note that the `wanted` size must be rounded to a multiple of page size
/// by using `round_to_pagesize`.
fn passive_resize(env: &Rkv, wanted: usize) -> Result<(), StoreError> {
    let info = env.info()?;
    let current_size = info.map_size();
    env.set_map_size(current_size + wanted)?;
    Ok(())
}

pub struct GetOrCreateWithOptionsTask {
    callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueDatabaseCallback>>>,
    path: nsString,
    name: nsCString,
    strategy: RecoveryStrategy,
    result: AtomicCell<Option<Result<RkvStoreTuple, KeyValueError>>>,
}

impl GetOrCreateWithOptionsTask {
    pub fn new(
        callback: RefPtr<nsIKeyValueDatabaseCallback>,
        path: nsString,
        name: nsCString,
        strategy: RecoveryStrategy,
    ) -> GetOrCreateWithOptionsTask {
        GetOrCreateWithOptionsTask {
            callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
            path,
            name,
            strategy,
            result: AtomicCell::default(),
        }
    }

    fn convert(&self, result: RkvStoreTuple) -> Result<RefPtr<KeyValueDatabase>, KeyValueError> {
        Ok(KeyValueDatabase::new(result.0, result.1)?)
    }
}

impl Task for GetOrCreateWithOptionsTask {
    fn run(&self) {
        // We do the work within a closure that returns a Result so we can
        // use the ? operator to simplify the implementation.
        self.result
            .store(Some(|| -> Result<RkvStoreTuple, KeyValueError> {
                let store;
                let mut builder = Rkv::environment_builder::<SafeMode>();
                builder.set_corruption_recovery_strategy(self.strategy);
                let mut manager = Manager::singleton().write()?;
                let path = WidePathBuf::new(&self.path).canonicalize()?;
                let rkv = manager.get_or_create_from_builder(
                    path.as_path(),
                    builder,
                    Rkv::from_builder::<SafeMode>,
                )?;
                {
                    let env = rkv.read()?;
                    let load_ratio = env.load_ratio()?.unwrap_or(0.0);
                    if load_ratio > RESIZE_RATIO {
                        active_resize(&env)?;
                    }
                    store = env.open_single(str::from_utf8(&self.name)?, StoreOptions::create())?;
                }
                Ok((rkv, store))
            }()));
    }

    task_done!(value);
}

pub struct PutTask {
    callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVoidCallback>>>,
    rkv: Arc<RwLock<Rkv>>,
    store: SingleStore,
    key: nsCString,
    value: OwnedValue,
    result: AtomicCell<Option<Result<(), KeyValueError>>>,
}

impl PutTask {
    pub fn new(
        callback: RefPtr<nsIKeyValueVoidCallback>,
        rkv: Arc<RwLock<Rkv>>,
        store: SingleStore,
        key: nsCString,
        value: OwnedValue,
    ) -> PutTask {
        PutTask {
            callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
            rkv,
            store,
            key,
            value,
            result: AtomicCell::default(),
        }
    }
}

impl Task for PutTask {
    fn run(&self) {
        // We do the work within a closure that returns a Result so we can
        // use the ? operator to simplify the implementation.
        self.result.store(Some(|| -> Result<(), KeyValueError> {
            let env = self.rkv.read()?;
            let key = str::from_utf8(&self.key)?;
            let v = Value::from(&self.value);
            let mut resized = false;

            // Use a loop here in case we want to retry from a recoverable
            // error such as `StoreError::MapFull`.
            loop {
                let mut writer = env.write()?;

                match self.store.put(&mut writer, key, &v) {
                    Ok(_) => (),

                    // Only handle the first MapFull error via passive resizing.
                    // Propogate the subsequent MapFull error.
                    Err(StoreError::MapFull) if !resized => {
                        // abort the failed transaction for resizing.
                        writer.abort();

                        // calculate the size of pairs and resize the store accordingly.
                        let pair_size =
                            key.len() + v.serialized_size().map_err(StoreError::from)? as usize;
                        let wanted = round_to_pagesize(pair_size);
                        passive_resize(&env, wanted)?;
                        resized = true;
                        continue;
                    }

                    Err(err) => return Err(KeyValueError::StoreError(err)),
                }

                // Ignore errors caused by simultaneous access.
                // We intend to investigate/revert this in bug 1810212.
                match writer.commit() {
                    Err(StoreError::IoError(e)) if e.kind() == std::io::ErrorKind::NotFound => {
                        // Explicitly ignore errors from simultaneous access.
                    }
                    Err(e) => return Err(From::from(e)),
                    _ => (),
                };
                break;
            }

            Ok(())
        }()));
    }

    task_done!(void);
}

pub struct WriteManyTask {
    callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVoidCallback>>>,
    rkv: Arc<RwLock<Rkv>>,
    store: SingleStore,
    pairs: Vec<(nsCString, Option<OwnedValue>)>,
    result: AtomicCell<Option<Result<(), KeyValueError>>>,
}

impl WriteManyTask {
    pub fn new(
        callback: RefPtr<nsIKeyValueVoidCallback>,
        rkv: Arc<RwLock<Rkv>>,
        store: SingleStore,
        pairs: Vec<(nsCString, Option<OwnedValue>)>,
    ) -> WriteManyTask {
        WriteManyTask {
            callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
            rkv,
            store,
            pairs,
            result: AtomicCell::default(),
        }
    }

    fn calc_pair_size(&self) -> Result<usize, StoreError> {
        let mut total = 0;

        for (key, value) in self.pairs.iter() {
            if let Some(val) = value {
                total += key.len();
                total += Value::from(val)
                    .serialized_size()
                    .map_err(StoreError::from)? as usize;
            }
        }

        Ok(total)
    }
}

impl Task for WriteManyTask {
    fn run(&self) {
        // We do the work within a closure that returns a Result so we can
        // use the ? operator to simplify the implementation.
        self.result.store(Some(|| -> Result<(), KeyValueError> {
            let env = self.rkv.read()?;
            let mut resized = false;

            // Use a loop here in case we want to retry from a recoverable
            // error such as `StoreError::MapFull`.
            'outer: loop {
                let mut writer = env.write()?;

                for (key, value) in self.pairs.iter() {
                    let key = str::from_utf8(key)?;
                    match value {
                        // To put.
                        Some(val) => {
                            match self.store.put(&mut writer, key, &Value::from(val)) {
                                Ok(_) => (),

                                // Only handle the first MapFull error via passive resizing.
                                // Propogate the subsequent MapFull error.
                                Err(StoreError::MapFull) if !resized => {
                                    // Abort the failed transaction for resizing.
                                    writer.abort();

                                    // Calculate the size of pairs and resize accordingly.
                                    let pair_size = self.calc_pair_size()?;
                                    let wanted = round_to_pagesize(pair_size);
                                    passive_resize(&env, wanted)?;
                                    resized = true;
                                    continue 'outer;
                                }

                                Err(err) => return Err(KeyValueError::StoreError(err)),
                            }
                        }
                        // To delete.
                        None => {
                            match self.store.delete(&mut writer, key) {
                                Ok(_) => (),

                                // RKV fails with an error if the key to delete wasn't found,
                                // and Rkv returns that error, but we ignore it, as we expect most
                                // of our consumers to want this behavior.
                                Err(StoreError::KeyValuePairNotFound) => (),

                                Err(err) => return Err(KeyValueError::StoreError(err)),
                            };
                        }
                    }
                }

                // Ignore errors caused by simultaneous access.
                // We intend to investigate/revert this in bug 1810212.
                match writer.commit() {
                    Err(StoreError::IoError(e)) if e.kind() == std::io::ErrorKind::NotFound => {
                        // Explicitly ignore errors from simultaneous access.
                    }
                    Err(e) => return Err(From::from(e)),
                    _ => (),
                };
                break; // 'outer: loop
            }

            Ok(())
        }()));
    }

    task_done!(void);
}

pub struct GetTask {
    callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVariantCallback>>>,
    rkv: Arc<RwLock<Rkv>>,
    store: SingleStore,
    key: nsCString,
    default_value: Option<OwnedValue>,
    result: AtomicCell<Option<Result<Option<OwnedValue>, KeyValueError>>>,
}

impl GetTask {
    pub fn new(
        callback: RefPtr<nsIKeyValueVariantCallback>,
        rkv: Arc<RwLock<Rkv>>,
        store: SingleStore,
        key: nsCString,
        default_value: Option<OwnedValue>,
    ) -> GetTask {
        GetTask {
            callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
            rkv,
            store,
            key,
            default_value,
            result: AtomicCell::default(),
        }
    }

    fn convert(&self, result: Option<OwnedValue>) -> Result<RefPtr<nsIVariant>, KeyValueError> {
        Ok(match result {
            Some(val) => owned_to_variant(val)?,
            None => ().into_variant(),
        })
    }
}

impl Task for GetTask {
    fn run(&self) {
        // We do the work within a closure that returns a Result so we can
        // use the ? operator to simplify the implementation.
        self.result
            .store(Some(|| -> Result<Option<OwnedValue>, KeyValueError> {
                let key = str::from_utf8(&self.key)?;
                let env = self.rkv.read()?;
                let reader = env.read()?;
                let value = self.store.get(&reader, key)?;

                Ok(match value {
                    Some(value) => Some(OwnedValue::from(&value)),
                    None => match self.default_value {
                        Some(ref val) => Some(val.clone()),
                        None => None,
                    },
                })
            }()));
    }

    task_done!(value);
}

pub struct HasTask {
    callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVariantCallback>>>,
    rkv: Arc<RwLock<Rkv>>,
    store: SingleStore,
    key: nsCString,
    result: AtomicCell<Option<Result<bool, KeyValueError>>>,
}

impl HasTask {
    pub fn new(
        callback: RefPtr<nsIKeyValueVariantCallback>,
        rkv: Arc<RwLock<Rkv>>,
        store: SingleStore,
        key: nsCString,
    ) -> HasTask {
        HasTask {
            callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
            rkv,
            store,
            key,
            result: AtomicCell::default(),
        }
    }

    fn convert(&self, result: bool) -> Result<RefPtr<nsIVariant>, KeyValueError> {
        Ok(result.into_variant())
    }
}

impl Task for HasTask {
    fn run(&self) {
        // We do the work within a closure that returns a Result so we can
        // use the ? operator to simplify the implementation.
        self.result.store(Some(|| -> Result<bool, KeyValueError> {
            let key = str::from_utf8(&self.key)?;
            let env = self.rkv.read()?;
            let reader = env.read()?;
            let value = self.store.get(&reader, key)?;
            Ok(value.is_some())
        }()));
    }

    task_done!(value);
}

pub struct DeleteTask {
    callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVoidCallback>>>,
    rkv: Arc<RwLock<Rkv>>,
    store: SingleStore,
    key: nsCString,
    result: AtomicCell<Option<Result<(), KeyValueError>>>,
}

impl DeleteTask {
    pub fn new(
        callback: RefPtr<nsIKeyValueVoidCallback>,
        rkv: Arc<RwLock<Rkv>>,
        store: SingleStore,
        key: nsCString,
    ) -> DeleteTask {
        DeleteTask {
            callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
            rkv,
            store,
            key,
            result: AtomicCell::default(),
        }
    }
}

impl Task for DeleteTask {
    fn run(&self) {
        // We do the work within a closure that returns a Result so we can
        // use the ? operator to simplify the implementation.
        self.result.store(Some(|| -> Result<(), KeyValueError> {
            let key = str::from_utf8(&self.key)?;
            let env = self.rkv.read()?;
            let mut writer = env.write()?;

            match self.store.delete(&mut writer, key) {
                Ok(_) => (),

                // RKV fails with an error if the key to delete wasn't found,
                // and Rkv returns that error, but we ignore it, as we expect most
                // of our consumers to want this behavior.
                Err(StoreError::KeyValuePairNotFound) => (),

                Err(err) => return Err(KeyValueError::StoreError(err)),
            };

            // Ignore errors caused by simultaneous access.
            // We intend to investigate/revert this in bug 1810212.
            match writer.commit() {
                Err(StoreError::IoError(e)) if e.kind() == std::io::ErrorKind::NotFound => {
                    // Explicitly ignore errors from simultaneous access.
                }
                Err(e) => return Err(From::from(e)),
                _ => (),
            };

            Ok(())
        }()));
    }

    task_done!(void);
}

pub struct ClearTask {
    callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVoidCallback>>>,
    rkv: Arc<RwLock<Rkv>>,
    store: SingleStore,
    result: AtomicCell<Option<Result<(), KeyValueError>>>,
}

impl ClearTask {
    pub fn new(
        callback: RefPtr<nsIKeyValueVoidCallback>,
        rkv: Arc<RwLock<Rkv>>,
        store: SingleStore,
    ) -> ClearTask {
        ClearTask {
            callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
            rkv,
            store,
            result: AtomicCell::default(),
        }
    }
}

impl Task for ClearTask {
    fn run(&self) {
        // We do the work within a closure that returns a Result so we can
        // use the ? operator to simplify the implementation.
        self.result.store(Some(|| -> Result<(), KeyValueError> {
            let env = self.rkv.read()?;
            let mut writer = env.write()?;
            self.store.clear(&mut writer)?;
            // Ignore errors caused by simultaneous access.
            // We intend to investigate/revert this in bug 1810212.
            match writer.commit() {
                Err(StoreError::IoError(e)) if e.kind() == std::io::ErrorKind::NotFound => {
                    // Explicitly ignore errors from simultaneous access.
                }
                Err(e) => return Err(From::from(e)),
                _ => (),
            };

            Ok(())
        }()));
    }

    task_done!(void);
}

pub struct EnumerateTask {
    callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueEnumeratorCallback>>>,
    rkv: Arc<RwLock<Rkv>>,
    store: SingleStore,
    from_key: nsCString,
    to_key: nsCString,
    result: AtomicCell<Option<Result<Vec<KeyValuePairResult>, KeyValueError>>>,
}

impl EnumerateTask {
    pub fn new(
        callback: RefPtr<nsIKeyValueEnumeratorCallback>,
        rkv: Arc<RwLock<Rkv>>,
        store: SingleStore,
        from_key: nsCString,
        to_key: nsCString,
    ) -> EnumerateTask {
        EnumerateTask {
            callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
            rkv,
            store,
            from_key,
            to_key,
            result: AtomicCell::default(),
        }
    }

    fn convert(
        &self,
        result: Vec<KeyValuePairResult>,
    ) -> Result<RefPtr<KeyValueEnumerator>, KeyValueError> {
        Ok(KeyValueEnumerator::new(result))
    }
}

impl Task for EnumerateTask {
    fn run(&self) {
        // We do the work within a closure that returns a Result so we can
        // use the ? operator to simplify the implementation.
        self.result.store(Some(
            || -> Result<Vec<KeyValuePairResult>, KeyValueError> {
                let env = self.rkv.read()?;
                let reader = env.read()?;
                let from_key = str::from_utf8(&self.from_key)?;
                let to_key = str::from_utf8(&self.to_key)?;

                let iterator = if from_key.is_empty() {
                    self.store.iter_start(&reader)?
                } else {
                    self.store.iter_from(&reader, &from_key)?
                };

                // Ideally, we'd enumerate pairs lazily, as the consumer calls
                // nsIKeyValueEnumerator.getNext(), which calls our
                // KeyValueEnumerator.get_next() implementation.  But KeyValueEnumerator
                // can't reference the Iter because Rust "cannot #[derive(xpcom)]
                // on a generic type," and the Iter requires a lifetime parameter,
                // which would make KeyValueEnumerator generic.
                //
                // Our fallback approach is to eagerly collect the iterator
                // into a collection that KeyValueEnumerator owns.  Fixing this so we
                // enumerate pairs lazily is bug 1499252.
                let pairs: Vec<KeyValuePairResult> = iterator
                    // Convert the key to a string so we can compare it to the "to" key.
                    // For forward compatibility, we don't fail here if we can't convert
                    // a key to UTF-8.  Instead, we store the Err in the collection
                    // and fail lazily in KeyValueEnumerator.get_next().
                    .map(|result| match result {
                        Ok((key, val)) => Ok((str::from_utf8(&key), val)),
                        Err(err) => Err(err),
                    })
                    // Stop iterating once we reach the to_key, if any.
                    .take_while(|result| match result {
                        Ok((key, _val)) => {
                            if to_key.is_empty() {
                                true
                            } else {
                                match *key {
                                    Ok(key) => key < to_key,
                                    Err(_err) => true,
                                }
                            }
                        }
                        Err(_) => true,
                    })
                    // Convert the key/value pair to owned.
                    .map(|result| match result {
                        Ok((key, val)) => match (key, val) {
                            (Ok(key), val) => Ok((key.to_owned(), OwnedValue::from(&val))),
                            (Err(err), _) => Err(err.into()),
                        },
                        Err(err) => Err(KeyValueError::StoreError(err)),
                    })
                    .collect();

                Ok(pairs)
            }(),
        ));
    }

    task_done!(value);
}

[ Dauer der Verarbeitung: 0.55 Sekunden  (vorverarbeitet)  ]