Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  importer.rs   Sprache: unbekannt

 
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use rkv::{
    backend::{BackendDatabase, BackendEnvironment, BackendRwCursorTransaction},
    Readable, Rkv, SingleStore, StoreOptions, Value, Writer,
};
use xpcom::interfaces::nsIKeyValueImporter;

use crate::skv::{
    connection::{ConnectionIncident, ToConnectionIncident},
    key::Key as SkvKey,
    store::{Store as SkvStore, StoreError as SkvStoreError},
    value::Value as SkvValue,
};

/// Specifies the databases to import.
#[derive(Clone, Debug)]
pub enum SourceDatabases {
    /// Import all key-value pairs from one or more named databases.
    Named(Vec<NamedSourceDatabase>),

    /// Import all key-value pairs from all databases.
    All {
        conflict_policy: ConflictPolicy,
        cleanup_policy: CleanupPolicy,
    },
}

/// Specifies the name and settings for a single database.
#[derive(Clone, Debug)]
pub struct NamedSourceDatabase {
    pub name: DatabaseName,
    pub conflict_policy: ConflictPolicy,
    pub cleanup_policy: CleanupPolicy,
}

/// Imports Rkv databases into Skv.
pub struct RkvImporter<'a, W, S> {
    writer: W,
    sources: Vec<(NamedSourceDatabase, S)>,
    store: &'a SkvStore,
}

impl<'env, 'a, T, D> RkvImporter<'a, Writer<T>, SingleStore<D>>
where
    T: BackendRwCursorTransaction<'env, Database = D>,
    D: BackendDatabase,
{
    /// Creates an importer for one or more Rkv databases in the same
    /// Rkv environment.
    pub fn new<'rkv: 'env, E>(
        env: &'rkv Rkv<E>,
        store: &'a SkvStore,
        dbs: SourceDatabases,
    ) -> Result<Self, ImporterError>
    where
        E: BackendEnvironment<'env, Database = D, RwTransaction = T>,
    {
        let dbs = match dbs {
            SourceDatabases::Named(dbs) => dbs,
            SourceDatabases::All {
                conflict_policy,
                cleanup_policy,
            } => env
                .get_dbs()?
                .into_iter()
                .map(|name| {
                    Ok(NamedSourceDatabase {
                        name: name.try_into()?,
                        conflict_policy,
                        cleanup_policy,
                    })
                })
                .collect::<Result<_, ImporterError>>()?,
        };
        let sources = dbs
            .into_iter()
            .map(|db| {
                let store = env.open_single(Some(db.name.as_str()), StoreOptions::default())?;
                Ok((db, store))
            })
            .collect::<Result<_, ImporterError>>()?;
        Ok(Self {
            writer: env.write()?,
            sources,
            store,
        })
    }

    /// Copies all key-value pairs from the specified Rkv databases into
    /// the corresponding Skv databases.
    ///
    /// [`RkvImporter::import`] and [`RkvImporter::cleanup`] are exposed as
    /// separate methods because of lifetime requirements on the
    /// Rkv environment.
    pub fn import(&'env self) -> Result<(), ImporterError> {
        let writer = self.store.writer()?;
        writer.write(|tx| {
            for (db, store) in &self.sources {
                let importer = RkvSingleStoreImporter {
                    name: &db.name,
                    reader: &self.writer,
                    store,
                    tx,
                    conflict_policy: db.conflict_policy,
                };
                importer.import()?;
            }
            Ok(())
        })
    }

    /// Cleans up imported key-value pairs from Rkv.
    pub fn cleanup(mut self) {
        for (db, store) in self.sources {
            match db.cleanup_policy {
                CleanupPolicy::Keep => continue,
                CleanupPolicy::Delete => {
                    let _ = store.clear(&mut self.writer);
                }
            }
        }
        let _ = self.writer.commit();
    }
}

/// Copies all key-value pairs from a single Rkv database into
/// the corresponding Skv database.
struct RkvSingleStoreImporter<'a, 'rkv, 'conn, R, D> {
    name: &'a DatabaseName,
    reader: &'rkv R,
    store: &'a SingleStore<D>,
    tx: &'a rusqlite::Transaction<'conn>,
    conflict_policy: ConflictPolicy,
}

impl<'env, 'a, 'rkv: 'env, 'conn, R, D> RkvSingleStoreImporter<'a, 'rkv, 'conn, R, D>
where
    R: Readable<'env, Database = D>,
    D: BackendDatabase,
{
    fn import(&self) -> Result<(), ImporterError> {
        self.ensure_database_exists()?;
        for result in self.store.iter_start(self.reader)? {
            let (key, value) = result?;
            self.import_pair(key, value)?;
        }
        Ok(())
    }

    fn ensure_database_exists(&self) -> Result<(), ImporterError> {
        let mut statement = self
            .tx
            .prepare_cached("INSERT OR IGNORE INTO dbs(name) VALUES(:name)")?;
        statement.execute(rusqlite::named_params! {
            ":name": self.name.as_str(),
        })?;
        Ok(())
    }

    fn import_pair(&self, key: &[u8], value: rkv::Value) -> Result<(), ImporterError> {
        let key = SkvKey::from(
            std::str::from_utf8(key).map_err(|err| ImporterError::RkvKey(err.into()))?,
        );
        let value = <SkvValue as From<serde_json::Value>>::from(match value {
            Value::Bool(b) => b.into(),
            Value::I64(n) => n.into(),
            Value::F64(n) => n.into_inner().into(),
            Value::Str(s) => s.into(),
            _ => Err(ImporterError::UnsupportedRkvValueType)?,
        });

        let mut statement = self.tx.prepare_cached(&format!(
            "INSERT INTO data(
               db_id,
               key,
               value
             )
             VALUES(
               (SELECT id FROM dbs WHERE name = :name),
               :key,
               jsonb(:value)
             )
             ON CONFLICT {}",
            match self.conflict_policy {
                ConflictPolicy::Error => {
                    // Throw an error if the values are different;
                    // do nothing if they're equal.
                    "DO UPDATE SET
                       value = throw(printf('conflict: %Q', key))
                     WHERE value <> excluded.value"
                }
                ConflictPolicy::Ignore => "DO NOTHING",
                ConflictPolicy::Replace => {
                    "DO UPDATE SET
                       value = excluded.value"
                }
            },
        ))?;
        statement.execute(rusqlite::named_params! {
            ":name": self.name.as_str(),
            ":key": key,
            ":value": value,
        })?;

        Ok(())
    }
}

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct DatabaseName(String);

impl From<String> for DatabaseName {
    fn from(value: String) -> Self {
        Self(value)
    }
}

impl TryFrom<Option<String>> for DatabaseName {
    type Error = ImporterError;

    fn try_from(value: Option<String>) -> Result<Self, Self::Error> {
        Ok(value.ok_or(ImporterError::UnnamedDatabase)?.into())
    }
}

impl DatabaseName {
    fn as_str(&self) -> &str {
        &self.0
    }
}

/// The action to take if a key in a source database already exists
/// and has a different value in the corresponding destination database.
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
#[repr(u8)]
pub enum ConflictPolicy {
    Error = nsIKeyValueImporter::ERROR_ON_CONFLICT,
    Ignore = nsIKeyValueImporter::IGNORE_ON_CONFLICT,
    Replace = nsIKeyValueImporter::REPLACE_ON_CONFLICT,
}

impl TryFrom<u8> for ConflictPolicy {
    type Error = ImporterError;

    fn try_from(value: u8) -> Result<Self, <Self as TryFrom<u8>>::Error> {
        Ok(match value {
            // Rust doesn't allow `as` expressions directly in
            // match arms.
            _ if value == Self::Error as u8 => Self::Error,
            _ if value == Self::Ignore as u8 => Self::Ignore,
            _ if value == Self::Replace as u8 => Self::Replace,
            _ => Err(ImporterError::UnknownConflictPolicy(value))?,
        })
    }
}

/// The action to take after a successful import.
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
#[repr(u8)]
pub enum CleanupPolicy {
    Keep = nsIKeyValueImporter::KEEP_AFTER_IMPORT,
    Delete = nsIKeyValueImporter::DELETE_AFTER_IMPORT,
}

impl TryFrom<u8> for CleanupPolicy {
    type Error = ImporterError;

    fn try_from(value: u8) -> Result<Self, Self::Error> {
        Ok(match value {
            _ if value == Self::Keep as u8 => Self::Keep,
            _ if value == Self::Delete as u8 => Self::Delete,
            _ => Err(ImporterError::UnknownCleanupPolicy(value))?,
        })
    }
}

#[derive(Debug, thiserror::Error)]
pub enum ImporterError {
    #[error("unknown conflict policy: {0}")]
    UnknownConflictPolicy(u8),
    #[error("unknown cleanup policy: {0}")]
    UnknownCleanupPolicy(u8),
    #[error("rkv store: {0}")]
    RkvStore(#[from] rkv::StoreError),
    #[error("skv store: {0}")]
    SkvStore(#[from] SkvStoreError),
    #[error("sqlite: {0}")]
    Sqlite(#[from] rusqlite::Error),
    #[error("rkv key: {0}")]
    RkvKey(Box<dyn std::error::Error + Send + Sync + 'static>),
    #[error("unsupported rkv value type")]
    UnsupportedRkvValueType,
    #[error("unnamed database")]
    UnnamedDatabase,
}

impl ToConnectionIncident for ImporterError {
    fn to_incident(&self) -> Option<ConnectionIncident> {
        match self {
            Self::SkvStore(err) => err.to_incident(),
            Self::Sqlite(err) => err.to_incident(),
            _ => None,
        }
    }
}

[ Dauer der Verarbeitung: 0.23 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....
    

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge