Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/toolkit/components/kvstore/src/skv/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 9 kB image not shown  

Quelle  importer.rs   Sprache: unbekannt

 
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use rkv::{
    backend::{BackendDatabase, BackendEnvironment, BackendRwCursorTransaction},
    Readable, Rkv, SingleStore, StoreOptions, Value, Writer,
};
use xpcom::interfaces::nsIKeyValueImporter;

use crate::skv::{
    connection::{ConnectionIncident, ToConnectionIncident},
    key::Key as SkvKey,
    store::{Store as SkvStore, StoreError as SkvStoreError},
    value::Value as SkvValue,
};

/// Specifies the databases to import.
#[derive(Clone, Debug)]
pub enum SourceDatabases {
    /// Import all key-value pairs from one or more named databases.
    Named(Vec<NamedSourceDatabase>),

    /// Import all key-value pairs from all databases.
    All {
        conflict_policy: ConflictPolicy,
        cleanup_policy: CleanupPolicy,
    },
}

/// Specifies the name and settings for a single database.
#[derive(Clone, Debug)]
pub struct NamedSourceDatabase {
    pub name: DatabaseName,
    pub conflict_policy: ConflictPolicy,
    pub cleanup_policy: CleanupPolicy,
}

/// Imports Rkv databases into Skv.
pub struct RkvImporter<'a, W, S> {
    writer: W,
    sources: Vec<(NamedSourceDatabase, S)>,
    store: &'a SkvStore,
}

impl<'env, 'a, T, D> RkvImporter<'a, Writer<T>, SingleStore<D>>
where
    T: BackendRwCursorTransaction<'env, Database = D>,
    D: BackendDatabase,
{
    /// Creates an importer for one or more Rkv databases in the same
    /// Rkv environment.
    pub fn new<'rkv: 'env, E>(
        env: &'rkv Rkv<E>,
        store: &'a SkvStore,
        dbs: SourceDatabases,
    ) -> Result<Self, ImporterError>
    where
        E: BackendEnvironment<'env, Database = D, RwTransaction = T>,
    {
        let dbs = match dbs {
            SourceDatabases::Named(dbs) => dbs,
            SourceDatabases::All {
                conflict_policy,
                cleanup_policy,
            } => env
                .get_dbs()?
                .into_iter()
                .map(|name| {
                    Ok(NamedSourceDatabase {
                        name: name.try_into()?,
                        conflict_policy,
                        cleanup_policy,
                    })
                })
                .collect::<Result<_, ImporterError>>()?,
        };
        let sources = dbs
            .into_iter()
            .map(|db| {
                let store = env.open_single(Some(db.name.as_str()), StoreOptions::default())?;
                Ok((db, store))
            })
            .collect::<Result<_, ImporterError>>()?;
        Ok(Self {
            writer: env.write()?,
            sources,
            store,
        })
    }

    /// Copies all key-value pairs from the specified Rkv databases into
    /// the corresponding Skv databases.
    ///
    /// [`RkvImporter::import`] and [`RkvImporter::cleanup`] are exposed as
    /// separate methods because of lifetime requirements on the
    /// Rkv environment.
    pub fn import(&'env self) -> Result<(), ImporterError> {
        let writer = self.store.writer()?;
        writer.write(|tx| {
            for (db, store) in &self.sources {
                let importer = RkvSingleStoreImporter {
                    name: &db.name,
                    reader: &self.writer,
                    store,
                    tx,
                    conflict_policy: db.conflict_policy,
                };
                importer.import()?;
            }
            Ok(())
        })
    }

    /// Cleans up imported key-value pairs from Rkv.
    pub fn cleanup(mut self) {
        for (db, store) in self.sources {
            match db.cleanup_policy {
                CleanupPolicy::Keep => continue,
                CleanupPolicy::Delete => {
                    let _ = store.clear(&mut self.writer);
                }
            }
        }
        let _ = self.writer.commit();
    }
}

/// Copies all key-value pairs from a single Rkv database into
/// the corresponding Skv database.
struct RkvSingleStoreImporter<'a, 'rkv, 'conn, R, D> {
    name: &'a DatabaseName,
    reader: &'rkv R,
    store: &'a SingleStore<D>,
    tx: &'a rusqlite::Transaction<'conn>,
    conflict_policy: ConflictPolicy,
}

impl<'env, 'a, 'rkv: 'env, 'conn, R, D> RkvSingleStoreImporter<'a, 'rkv, 'conn, R, D>
where
    R: Readable<'env, Database = D>,
    D: BackendDatabase,
{
    fn import(&self) -> Result<(), ImporterError> {
        self.ensure_database_exists()?;
        for result in self.store.iter_start(self.reader)? {
            let (key, value) = result?;
            self.import_pair(key, value)?;
        }
        Ok(())
    }

    fn ensure_database_exists(&self) -> Result<(), ImporterError> {
        let mut statement = self
            .tx
            .prepare_cached("INSERT OR IGNORE INTO dbs(name) VALUES(:name)")?;
        statement.execute(rusqlite::named_params! {
            ":name": self.name.as_str(),
        })?;
        Ok(())
    }

    fn import_pair(&self, key: &[u8], value: rkv::Value) -> Result<(), ImporterError> {
        let key = SkvKey::from(
            std::str::from_utf8(key).map_err(|err| ImporterError::RkvKey(err.into()))?,
        );
        let value = <SkvValue as From<serde_json::Value>>::from(match value {
            Value::Bool(b) => b.into(),
            Value::I64(n) => n.into(),
            Value::F64(n) => n.into_inner().into(),
            Value::Str(s) => s.into(),
            _ => Err(ImporterError::UnsupportedRkvValueType)?,
        });

        let mut statement = self.tx.prepare_cached(&format!(
            "INSERT INTO data(
               db_id,
               key,
               value
             )
             VALUES(
               (SELECT id FROM dbs WHERE name = :name),
               :key,
               jsonb(:value)
             )
             ON CONFLICT {}",
            match self.conflict_policy {
                ConflictPolicy::Error => {
                    // Throw an error if the values are different;
                    // do nothing if they're equal.
                    "DO UPDATE SET
                       value = throw(printf('conflict: %Q', key))
                     WHERE value <> excluded.value"
                }
                ConflictPolicy::Ignore => "DO NOTHING",
                ConflictPolicy::Replace => {
                    "DO UPDATE SET
                       value = excluded.value"
                }
            },
        ))?;
        statement.execute(rusqlite::named_params! {
            ":name": self.name.as_str(),
            ":key": key,
            ":value": value,
        })?;

        Ok(())
    }
}

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct DatabaseName(String);

impl From<String> for DatabaseName {
    fn from(value: String) -> Self {
        Self(value)
    }
}

impl TryFrom<Option<String>> for DatabaseName {
    type Error = ImporterError;

    fn try_from(value: Option<String>) -> Result<Self, Self::Error> {
        Ok(value.ok_or(ImporterError::UnnamedDatabase)?.into())
    }
}

impl DatabaseName {
    fn as_str(&self) -> &str {
        &self.0
    }
}

/// The action to take if a key in a source database already exists
/// and has a different value in the corresponding destination database.
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
#[repr(u8)]
pub enum ConflictPolicy {
    Error = nsIKeyValueImporter::ERROR_ON_CONFLICT,
    Ignore = nsIKeyValueImporter::IGNORE_ON_CONFLICT,
    Replace = nsIKeyValueImporter::REPLACE_ON_CONFLICT,
}

impl TryFrom<u8> for ConflictPolicy {
    type Error = ImporterError;

    fn try_from(value: u8) -> Result<Self, <Self as TryFrom<u8>>::Error> {
        Ok(match value {
            // Rust doesn't allow `as` expressions directly in
            // match arms.
            _ if value == Self::Error as u8 => Self::Error,
            _ if value == Self::Ignore as u8 => Self::Ignore,
            _ if value == Self::Replace as u8 => Self::Replace,
            _ => Err(ImporterError::UnknownConflictPolicy(value))?,
        })
    }
}

/// The action to take after a successful import.
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
#[repr(u8)]
pub enum CleanupPolicy {
    Keep = nsIKeyValueImporter::KEEP_AFTER_IMPORT,
    Delete = nsIKeyValueImporter::DELETE_AFTER_IMPORT,
}

impl TryFrom<u8> for CleanupPolicy {
    type Error = ImporterError;

    fn try_from(value: u8) -> Result<Self, Self::Error> {
        Ok(match value {
            _ if value == Self::Keep as u8 => Self::Keep,
            _ if value == Self::Delete as u8 => Self::Delete,
            _ => Err(ImporterError::UnknownCleanupPolicy(value))?,
        })
    }
}

#[derive(Debug, thiserror::Error)]
pub enum ImporterError {
    #[error("unknown conflict policy: {0}")]
    UnknownConflictPolicy(u8),
    #[error("unknown cleanup policy: {0}")]
    UnknownCleanupPolicy(u8),
    #[error("rkv store: {0}")]
    RkvStore(#[from] rkv::StoreError),
    #[error("skv store: {0}")]
    SkvStore(#[from] SkvStoreError),
    #[error("sqlite: {0}")]
    Sqlite(#[from] rusqlite::Error),
    #[error("rkv key: {0}")]
    RkvKey(Box<dyn std::error::Error + Send + Sync + 'static>),
    #[error("unsupported rkv value type")]
    UnsupportedRkvValueType,
    #[error("unnamed database")]
    UnnamedDatabase,
}

impl ToConnectionIncident for ImporterError {
    fn to_incident(&self) -> Option<ConnectionIncident> {
        match self {
            Self::SkvStore(err) => err.to_incident(),
            Self::Sqlite(err) => err.to_incident(),
            _ => None,
        }
    }
}

[ Dauer der Verarbeitung: 0.25 Sekunden  (vorverarbeitet)  ]