Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


SSL db.rs   Sprache: unbekannt

 
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 */

use std::{cell::OnceCell, path::Path, sync::Arc};

use interrupt_support::{SqlInterruptHandle, SqlInterruptScope};
use parking_lot::{Mutex, MutexGuard};
use remote_settings::RemoteSettingsResponse;
use rusqlite::{
    named_params,
    types::{FromSql, ToSql},
    Connection, OpenFlags, OptionalExtension,
};
use sql_support::{open_database::open_database_with_flags, repeat_sql_vars, ConnExt};

use crate::{
    config::{SuggestGlobalConfig, SuggestProviderConfig},
    error::RusqliteResultExt,
    fakespot,
    geoname::GeonameCache,
    pocket::{split_keyword, KeywordConfidence},
    provider::{AmpMatchingStrategy, SuggestionProvider},
    query::{full_keywords_to_fts_content, FtsQuery},
    rs::{
        DownloadedAmoSuggestion, DownloadedAmpSuggestion, DownloadedAmpWikipediaSuggestion,
        DownloadedExposureSuggestion, DownloadedFakespotSuggestion, DownloadedMdnSuggestion,
        DownloadedPocketSuggestion, DownloadedWikipediaSuggestion, Record, SuggestRecordId,
    },
    schema::{clear_database, SuggestConnectionInitializer},
    suggestion::{cook_raw_suggestion_url, AmpSuggestionType, FtsMatchInfo, Suggestion},
    util::full_keyword,
    weather::WeatherCache,
    Result, SuggestionQuery,
};

/// The metadata key whose value is a JSON string encoding a
/// `SuggestGlobalConfig`, which contains global Suggest configuration data.
pub const GLOBAL_CONFIG_META_KEY: &str = "global_config";
/// Prefix of metadata keys whose values are JSON strings encoding
/// `SuggestProviderConfig`, which contains per-provider configuration data. The
/// full key is this prefix plus the `SuggestionProvider` value as a u8.
pub const PROVIDER_CONFIG_META_KEY_PREFIX: &str = "provider_config_";

// Default value when Suggestion does not have a value for score
pub const DEFAULT_SUGGESTION_SCORE: f64 = 0.2;

/// The database connection type.
#[derive(Clone, Copy)]
pub(crate) enum ConnectionType {
    ReadOnly,
    ReadWrite,
}

impl From<ConnectionType> for OpenFlags {
    fn from(type_: ConnectionType) -> Self {
        match type_ {
            ConnectionType::ReadOnly => {
                OpenFlags::SQLITE_OPEN_URI
                    | OpenFlags::SQLITE_OPEN_NO_MUTEX
                    | OpenFlags::SQLITE_OPEN_READ_ONLY
            }
            ConnectionType::ReadWrite => {
                OpenFlags::SQLITE_OPEN_URI
                    | OpenFlags::SQLITE_OPEN_NO_MUTEX
                    | OpenFlags::SQLITE_OPEN_CREATE
                    | OpenFlags::SQLITE_OPEN_READ_WRITE
            }
        }
    }
}

#[derive(Default, Clone)]
pub struct Sqlite3Extension {
    pub library: String,
    pub entry_point: Option<String>,
}

/// A thread-safe wrapper around an SQLite connection to the Suggest database,
/// and its interrupt handle.
pub(crate) struct SuggestDb {
    pub conn: Mutex<Connection>,

    /// An object that's used to interrupt an ongoing database operation.
    ///
    /// When this handle is interrupted, the thread that's currently accessing
    /// the database will be told to stop and release the `conn` lock as soon
    /// as possible.
    pub interrupt_handle: Arc<SqlInterruptHandle>,
}

impl SuggestDb {
    /// Opens a read-only or read-write connection to a Suggest database at the
    /// given path.
    pub fn open(
        path: impl AsRef<Path>,
        extensions_to_load: &[Sqlite3Extension],
        type_: ConnectionType,
    ) -> Result<Self> {
        let conn = open_database_with_flags(
            path,
            type_.into(),
            &SuggestConnectionInitializer::new(extensions_to_load),
        )?;
        Ok(Self::with_connection(conn))
    }

    fn with_connection(conn: Connection) -> Self {
        let interrupt_handle = Arc::new(SqlInterruptHandle::new(&conn));
        Self {
            conn: Mutex::new(conn),
            interrupt_handle,
        }
    }

    /// Accesses the Suggest database for reading.
    pub fn read<T>(&self, op: impl FnOnce(&SuggestDao) -> Result<T>) -> Result<T> {
        let conn = self.conn.lock();
        let scope = self.interrupt_handle.begin_interrupt_scope()?;
        let dao = SuggestDao::new(&conn, &scope);
        op(&dao)
    }

    /// Accesses the Suggest database in a transaction for reading and writing.
    pub fn write<T>(&self, op: impl FnOnce(&mut SuggestDao) -> Result<T>) -> Result<T> {
        let mut conn = self.conn.lock();
        let scope = self.interrupt_handle.begin_interrupt_scope()?;
        let tx = conn.transaction()?;
        let mut dao = SuggestDao::new(&tx, &scope);
        let result = op(&mut dao)?;
        tx.commit()?;
        Ok(result)
    }

    /// Create a new write scope.
    ///
    /// This enables performing multiple `write()` calls with the same shared interrupt scope.
    /// This is important for things like ingestion, where you want the operation to be interrupted
    /// if [Self::interrupt_handle::interrupt] is called after the operation starts.  Calling
    /// [Self::write] multiple times during the operation risks missing a call that happens after
    /// between those calls.
    pub fn write_scope(&self) -> Result<WriteScope> {
        Ok(WriteScope {
            conn: self.conn.lock(),
            scope: self.interrupt_handle.begin_interrupt_scope()?,
        })
    }
}

pub(crate) struct WriteScope<'a> {
    pub conn: MutexGuard<'a, Connection>,
    pub scope: SqlInterruptScope,
}

impl WriteScope<'_> {
    /// Accesses the Suggest database in a transaction for reading and writing.
    pub fn write<T>(&mut self, op: impl FnOnce(&mut SuggestDao) -> Result<T>) -> Result<T> {
        let tx = self.conn.transaction()?;
        let mut dao = SuggestDao::new(&tx, &self.scope);
        let result = op(&mut dao)?;
        tx.commit()?;
        Ok(result)
    }

    /// Accesses the Suggest database in a transaction for reading only
    pub fn read<T>(&mut self, op: impl FnOnce(&SuggestDao) -> Result<T>) -> Result<T> {
        let tx = self.conn.transaction()?;
        let dao = SuggestDao::new(&tx, &self.scope);
        let result = op(&dao)?;
        tx.commit()?;
        Ok(result)
    }

    pub fn err_if_interrupted(&self) -> Result<()> {
        Ok(self.scope.err_if_interrupted()?)
    }
}

/// A data access object (DAO) that wraps a connection to the Suggest database
/// with methods for reading and writing suggestions, icons, and metadata.
///
/// Methods that only read from the database take an immutable reference to
/// `self` (`&self`), and methods that write to the database take a mutable
/// reference (`&mut self`).
pub(crate) struct SuggestDao<'a> {
    pub conn: &'a Connection,
    pub scope: &'a SqlInterruptScope,
    pub weather_cache: OnceCell<WeatherCache>,
    pub geoname_cache: OnceCell<GeonameCache>,
}

impl<'a> SuggestDao<'a> {
    fn new(conn: &'a Connection, scope: &'a SqlInterruptScope) -> Self {
        Self {
            conn,
            scope,
            weather_cache: std::cell::OnceCell::new(),
            geoname_cache: std::cell::OnceCell::new(),
        }
    }

    // =============== High level API ===============
    //
    //  These methods combine several low-level calls into one logical operation.

    pub fn delete_record_data(&mut self, record_id: &SuggestRecordId) -> Result<()> {
        // Drop either the icon or suggestions, records only contain one or the other
        match record_id.as_icon_id() {
            Some(icon_id) => self.drop_icon(icon_id)?,
            None => self.drop_suggestions(record_id)?,
        };
        Ok(())
    }

    // =============== Low level API ===============
    //
    //  These methods implement CRUD operations

    pub fn read_cached_rs_data(&self, collection: &str) -> Option<RemoteSettingsResponse> {
        match self.try_read_cached_rs_data(collection) {
            Ok(result) => result,
            Err(e) => {
                // Return None on failure . If the cached data is corrupted, maybe because the
                // RemoteSettingsResponse schema changed, then we want to just continue on.  This also matches
                // the proposed API from #6328, so it should be easier to adapt this code once
                // that's merged.
                error_support::report_error!("suggest-rs-cache-read", "{e}");
                None
            }
        }
    }

    pub fn write_cached_rs_data(&mut self, collection: &str, data: &RemoteSettingsResponse) {
        if let Err(e) = self.try_write_cached_rs_data(collection, data) {
            // Return None on failure for the same reason as in [Self::read_cached_rs_data]
            error_support::report_error!("suggest-rs-cache-write", "{e}");
        }
    }

    fn try_read_cached_rs_data(&self, collection: &str) -> Result<Option<RemoteSettingsResponse>> {
        let mut stmt = self
            .conn
            .prepare_cached("SELECT data FROM rs_cache WHERE collection = ?")?;
        let data = stmt
            .query_row((collection,), |row| row.get::<_, Vec<u8>>(0))
            .optional()?;
        match data {
            Some(data) => Ok(Some(rmp_serde::decode::from_slice(data.as_slice())?)),
            None => Ok(None),
        }
    }

    fn try_write_cached_rs_data(
        &mut self,
        collection: &str,
        data: &RemoteSettingsResponse,
    ) -> Result<()> {
        let mut stmt = self
            .conn
            .prepare_cached("INSERT OR REPLACE INTO rs_cache(collection, data) VALUES(?, ?)")?;
        stmt.execute((collection, rmp_serde::encode::to_vec(data)?))?;
        Ok(())
    }

    pub fn get_ingested_records(&self) -> Result<Vec<IngestedRecord>> {
        let mut stmt = self
            .conn
            .prepare_cached("SELECT id, collection, type, last_modified FROM ingested_records")?;
        let rows = stmt.query_and_then((), IngestedRecord::from_row)?;
        rows.collect()
    }

    pub fn update_ingested_records(
        &mut self,
        collection: &str,
        new_records: &[&Record],
        updated_records: &[&Record],
        deleted_records: &[&IngestedRecord],
    ) -> Result<()> {
        let mut delete_stmt = self
            .conn
            .prepare_cached("DELETE FROM ingested_records WHERE collection = ? AND id = ?")?;
        for deleted in deleted_records {
            delete_stmt.execute((collection, deleted.id.as_str()))?;
        }

        let mut insert_stmt = self.conn.prepare_cached(
            "INSERT OR REPLACE INTO ingested_records(id, collection, type, last_modified) VALUES(?, ?, ?, ?)",
        )?;
        for record in new_records.iter().chain(updated_records) {
            insert_stmt.execute((
                record.id.as_str(),
                collection,
                record.record_type().as_str(),
                record.last_modified,
            ))?;
        }
        Ok(())
    }

    /// Update the DB so that we re-ingest all records on the next ingestion.
    ///
    /// We hack this by setting the last_modified time to 1 so that the next time around we always
    /// re-ingest the record.
    pub fn force_reingest(&mut self) -> Result<()> {
        self.conn
            .prepare_cached("UPDATE ingested_records SET last_modified=1")?
            .execute(())?;
        Ok(())
    }

    pub fn suggestions_table_empty(&self) -> Result<bool> {
        Ok(self
            .conn
            .query_one::<bool>("SELECT NOT EXISTS (SELECT 1 FROM suggestions)")?)
    }

    /// Fetches Suggestions of type Amp provider that match the given query
    pub fn fetch_amp_suggestions(
        &self,
        query: &SuggestionQuery,
        suggestion_type: AmpSuggestionType,
    ) -> Result<Vec<Suggestion>> {
        let strategy = query
            .provider_constraints
            .as_ref()
            .and_then(|c| c.amp_alternative_matching.as_ref());
        match strategy {
            None => self.fetch_amp_suggestions_using_keywords(query, suggestion_type, true),
            Some(AmpMatchingStrategy::NoKeywordExpansion) => {
                self.fetch_amp_suggestions_using_keywords(query, suggestion_type, false)
            }
            Some(AmpMatchingStrategy::FtsAgainstFullKeywords) => {
                self.fetch_amp_suggestions_using_fts(query, suggestion_type, "full_keywords")
            }
            Some(AmpMatchingStrategy::FtsAgainstTitle) => {
                self.fetch_amp_suggestions_using_fts(query, suggestion_type, "title")
            }
        }
    }

    pub fn fetch_amp_suggestions_using_keywords(
        &self,
        query: &SuggestionQuery,
        suggestion_type: AmpSuggestionType,
        allow_keyword_expansion: bool,
    ) -> Result<Vec<Suggestion>> {
        let keyword_lowercased = &query.keyword.to_lowercase();
        let provider = match suggestion_type {
            AmpSuggestionType::Mobile => SuggestionProvider::AmpMobile,
            AmpSuggestionType::Desktop => SuggestionProvider::Amp,
        };
        let where_extra = if allow_keyword_expansion {
            ""
        } else {
            "AND INSTR(CONCAT(fk.full_keyword, ' '), k.keyword) != 0"
        };
        let suggestions = self.conn.query_rows_and_then_cached(
            &format!(
                r#"
                SELECT
                  s.id,
                  k.rank,
                  s.title,
                  s.url,
                  s.provider,
                  s.score,
                  fk.full_keyword
                FROM
                  suggestions s
                JOIN
                  keywords k
                  ON k.suggestion_id = s.id
                LEFT JOIN
                  full_keywords fk
                  ON k.full_keyword_id = fk.id
                WHERE
                  s.provider = :provider
                  AND k.keyword = :keyword
                  {where_extra}
                AND NOT EXISTS (SELECT 1 FROM dismissed_suggestions WHERE url=s.url)
                "#
            ),
            named_params! {
                ":keyword": keyword_lowercased,
                ":provider": provider
            },
            |row| -> Result<Suggestion> {
                let suggestion_id: i64 = row.get("id")?;
                let title = row.get("title")?;
                let raw_url: String = row.get("url")?;
                let score: f64 = row.get("score")?;
                let full_keyword_from_db: Option<String> = row.get("full_keyword")?;

                let keywords: Vec<String> = self.conn.query_rows_and_then_cached(
                    r#"
                    SELECT
                        keyword
                    FROM
                        keywords
                    WHERE
                        suggestion_id = :suggestion_id
                        AND rank >= :rank
                    ORDER BY
                        rank ASC
                    "#,
                    named_params! {
                        ":suggestion_id": suggestion_id,
                        ":rank": row.get::<_, i64>("rank")?,
                    },
                    |row| row.get(0),
                )?;
                self.conn.query_row_and_then(
                    r#"
                    SELECT
                      amp.advertiser,
                      amp.block_id,
                      amp.iab_category,
                      amp.impression_url,
                      amp.click_url,
                      i.data AS icon,
                      i.mimetype AS icon_mimetype
                    FROM
                      amp_custom_details amp
                    LEFT JOIN
                      icons i ON amp.icon_id = i.id
                    WHERE
                      amp.suggestion_id = :suggestion_id
                    "#,
                    named_params! {
                        ":suggestion_id": suggestion_id
                    },
                    |row| {
                        let cooked_url = cook_raw_suggestion_url(&raw_url);
                        let raw_click_url = row.get::<_, String>("click_url")?;
                        let cooked_click_url = cook_raw_suggestion_url(&raw_click_url);

                        Ok(Suggestion::Amp {
                            block_id: row.get("block_id")?,
                            advertiser: row.get("advertiser")?,
                            iab_category: row.get("iab_category")?,
                            title,
                            url: cooked_url,
                            raw_url,
                            full_keyword: full_keyword_from_db
                                .unwrap_or_else(|| full_keyword(keyword_lowercased, &keywords)),
                            icon: row.get("icon")?,
                            icon_mimetype: row.get("icon_mimetype")?,
                            impression_url: row.get("impression_url")?,
                            click_url: cooked_click_url,
                            raw_click_url,
                            score,
                            fts_match_info: None,
                        })
                    },
                )
            },
        )?;
        Ok(suggestions)
    }

    pub fn fetch_amp_suggestions_using_fts(
        &self,
        query: &SuggestionQuery,
        suggestion_type: AmpSuggestionType,
        fts_column: &str,
    ) -> Result<Vec<Suggestion>> {
        let fts_query = query.fts_query();
        let match_arg = &fts_query.match_arg;
        let provider = match suggestion_type {
            AmpSuggestionType::Mobile => SuggestionProvider::AmpMobile,
            AmpSuggestionType::Desktop => SuggestionProvider::Amp,
        };
        let suggestions = self.conn.query_rows_and_then_cached(
            &format!(
                r#"
                SELECT
                  s.id,
                  s.title,
                  s.url,
                  s.provider,
                  s.score
                FROM
                  suggestions s
                JOIN
                  amp_fts fts
                  ON fts.rowid = s.id
                WHERE
                  s.provider = :provider
                  AND amp_fts match '{fts_column}: {match_arg}'
                AND NOT EXISTS (SELECT 1 FROM dismissed_suggestions WHERE url=s.url)
                ORDER BY rank
                LIMIT 1
                "#
            ),
            named_params! {
                ":provider": provider
            },
            |row| -> Result<Suggestion> {
                let suggestion_id: i64 = row.get("id")?;
                let title: String = row.get("title")?;
                let raw_url: String = row.get("url")?;
                let score: f64 = row.get("score")?;

                self.conn.query_row_and_then(
                    r#"
                    SELECT
                      amp.advertiser,
                      amp.block_id,
                      amp.iab_category,
                      amp.impression_url,
                      amp.click_url,
                      i.data AS icon,
                      i.mimetype AS icon_mimetype
                    FROM
                      amp_custom_details amp
                    LEFT JOIN
                      icons i ON amp.icon_id = i.id
                    WHERE
                      amp.suggestion_id = :suggestion_id
                    "#,
                    named_params! {
                        ":suggestion_id": suggestion_id
                    },
                    |row| {
                        let cooked_url = cook_raw_suggestion_url(&raw_url);
                        let raw_click_url = row.get::<_, String>("click_url")?;
                        let cooked_click_url = cook_raw_suggestion_url(&raw_click_url);
                        let match_info = self.fetch_amp_fts_match_info(
                            &fts_query,
                            suggestion_id,
                            fts_column,
                            &title,
                        )?;

                        Ok(Suggestion::Amp {
                            block_id: row.get("block_id")?,
                            advertiser: row.get("advertiser")?,
                            iab_category: row.get("iab_category")?,
                            title,
                            url: cooked_url,
                            raw_url,
                            full_keyword: query.keyword.clone(),
                            icon: row.get("icon")?,
                            icon_mimetype: row.get("icon_mimetype")?,
                            impression_url: row.get("impression_url")?,
                            click_url: cooked_click_url,
                            raw_click_url,
                            score,
                            fts_match_info: Some(match_info),
                        })
                    },
                )
            },
        )?;
        Ok(suggestions)
    }

    fn fetch_amp_fts_match_info(
        &self,
        fts_query: &FtsQuery<'_>,
        suggestion_id: i64,
        fts_column: &str,
        title: &str,
    ) -> Result<FtsMatchInfo> {
        let fts_content = match fts_column {
            "title" => title.to_lowercase(),
            "full_keywords" => {
                let full_keyword_list: Vec<String> = self.conn.query_rows_and_then(
                    "
                    SELECT fk.full_keyword
                    FROM full_keywords fk
                    JOIN keywords k on fk.id == k.full_keyword_id
                    WHERE k.suggestion_id = ?
                    ",
                    (suggestion_id,),
                    |row| row.get(0),
                )?;
                full_keywords_to_fts_content(full_keyword_list.iter().map(String::as_str))
            }
            // fts_column comes from the code above and we know there's only 2 possibilities
            _ => unreachable!(),
        };

        let prefix = if fts_query.is_prefix_query {
            // If the query was a prefix match query then test if the query without the prefix
            // match would have also matched.  If not, then this counts as a prefix match.
            let sql = "SELECT 1 FROM amp_fts WHERE rowid = ? AND amp_fts MATCH ?";
            let params = (&suggestion_id, &fts_query.match_arg_without_prefix_match);
            !self.conn.exists(sql, params)?
        } else {
            // If not, then it definitely wasn't a prefix match
            false
        };

        Ok(FtsMatchInfo {
            prefix,
            stemming: fts_query.match_required_stemming(&fts_content),
        })
    }

    /// Fetches Suggestions of type Wikipedia provider that match the given query
    pub fn fetch_wikipedia_suggestions(&self, query: &SuggestionQuery) -> Result<Vec<Suggestion>> {
        let keyword_lowercased = &query.keyword.to_lowercase();
        let suggestions = self.conn.query_rows_and_then_cached(
            r#"
            SELECT
              s.id,
              k.rank,
              s.title,
              s.url
            FROM
              suggestions s
            JOIN
              keywords k
              ON k.suggestion_id = s.id
            WHERE
              s.provider = :provider
              AND k.keyword = :keyword
              AND NOT EXISTS (SELECT 1 FROM dismissed_suggestions WHERE url=s.url)
            "#,
            named_params! {
                ":keyword": keyword_lowercased,
                ":provider": SuggestionProvider::Wikipedia
            },
            |row| -> Result<Suggestion> {
                let suggestion_id: i64 = row.get("id")?;
                let title = row.get("title")?;
                let raw_url = row.get::<_, String>("url")?;

                let keywords: Vec<String> = self.conn.query_rows_and_then_cached(
                    "SELECT keyword FROM keywords
                     WHERE suggestion_id = :suggestion_id AND rank >= :rank
                     ORDER BY rank ASC",
                    named_params! {
                        ":suggestion_id": suggestion_id,
                        ":rank": row.get::<_, i64>("rank")?,
                    },
                    |row| row.get(0),
                )?;
                let (icon, icon_mimetype) = self
                    .conn
                    .try_query_row(
                        "SELECT i.data, i.mimetype
                     FROM icons i
                     JOIN wikipedia_custom_details s ON s.icon_id = i.id
                     WHERE s.suggestion_id = :suggestion_id
                     LIMIT 1",
                        named_params! {
                            ":suggestion_id": suggestion_id
                        },
                        |row| -> Result<_> {
                            Ok((
                                row.get::<_, Option<Vec<u8>>>(0)?,
                                row.get::<_, Option<String>>(1)?,
                            ))
                        },
                        true,
                    )?
                    .unwrap_or((None, None));

                Ok(Suggestion::Wikipedia {
                    title,
                    url: raw_url,
                    full_keyword: full_keyword(keyword_lowercased, &keywords),
                    icon,
                    icon_mimetype,
                })
            },
        )?;
        Ok(suggestions)
    }

    /// Query for suggestions using the keyword prefix and provider
    fn map_prefix_keywords<T>(
        &self,
        query: &SuggestionQuery,
        provider: &SuggestionProvider,
        mut mapper: impl FnMut(&rusqlite::Row, &str) -> Result<T>,
    ) -> Result<Vec<T>> {
        let keyword_lowercased = &query.keyword.to_lowercase();
        let (keyword_prefix, keyword_suffix) = split_keyword(keyword_lowercased);
        let suggestions_limit = query.limit.unwrap_or(-1);
        self.conn.query_rows_and_then_cached(
            r#"
                SELECT
                  s.id,
                  MAX(k.rank) AS rank,
                  s.title,
                  s.url,
                  s.provider,
                  s.score,
                  k.keyword_suffix
                FROM
                  suggestions s
                JOIN
                  prefix_keywords k
                  ON k.suggestion_id = s.id
                WHERE
                  k.keyword_prefix = :keyword_prefix
                  AND (k.keyword_suffix BETWEEN :keyword_suffix AND :keyword_suffix || x'FFFF')
                  AND s.provider = :provider
                  AND NOT EXISTS (SELECT 1 FROM dismissed_suggestions WHERE url=s.url)
                GROUP BY
                  s.id
                ORDER BY
                  s.score DESC,
                  rank DESC
                LIMIT
                  :suggestions_limit
                "#,
            &[
                (":keyword_prefix", &keyword_prefix as &dyn ToSql),
                (":keyword_suffix", &keyword_suffix as &dyn ToSql),
                (":provider", provider as &dyn ToSql),
                (":suggestions_limit", &suggestions_limit as &dyn ToSql),
            ],
            |row| mapper(row, keyword_suffix),
        )
    }

    /// Fetches Suggestions of type Amo provider that match the given query
    pub fn fetch_amo_suggestions(&self, query: &SuggestionQuery) -> Result<Vec<Suggestion>> {
        let suggestions = self
            .map_prefix_keywords(
                query,
                &SuggestionProvider::Amo,
                |row, keyword_suffix| -> Result<Option<Suggestion>> {
                    let suggestion_id: i64 = row.get("id")?;
                    let title = row.get("title")?;
                    let raw_url = row.get::<_, String>("url")?;
                    let score = row.get::<_, f64>("score")?;

                    let full_suffix = row.get::<_, String>("keyword_suffix")?;
                    full_suffix
                        .starts_with(keyword_suffix)
                        .then(|| {
                            self.conn.query_row_and_then(
                                r#"
                                SELECT
                                  amo.description,
                                  amo.guid,
                                  amo.rating,
                                  amo.icon_url,
                                  amo.number_of_ratings
                                FROM
                                  amo_custom_details amo
                                WHERE
                                  amo.suggestion_id = :suggestion_id
                                "#,
                                named_params! {
                                    ":suggestion_id": suggestion_id
                                },
                                |row| {
                                    Ok(Suggestion::Amo {
                                        title,
                                        url: raw_url,
                                        icon_url: row.get("icon_url")?,
                                        description: row.get("description")?,
                                        rating: row.get("rating")?,
                                        number_of_ratings: row.get("number_of_ratings")?,
                                        guid: row.get("guid")?,
                                        score,
                                    })
                                },
                            )
                        })
                        .transpose()
                },
            )?
            .into_iter()
            .flatten()
            .collect();
        Ok(suggestions)
    }

    /// Fetches Suggestions of type pocket provider that match the given query
    pub fn fetch_pocket_suggestions(&self, query: &SuggestionQuery) -> Result<Vec<Suggestion>> {
        let keyword_lowercased = &query.keyword.to_lowercase();
        let (keyword_prefix, keyword_suffix) = split_keyword(keyword_lowercased);
        let suggestions = self
            .conn
            .query_rows_and_then_cached(
                r#"
            SELECT
              s.id,
              MAX(k.rank) AS rank,
              s.title,
              s.url,
              s.provider,
              s.score,
              k.confidence,
              k.keyword_suffix
            FROM
              suggestions s
            JOIN
              prefix_keywords k
              ON k.suggestion_id = s.id
            WHERE
              k.keyword_prefix = :keyword_prefix
              AND (k.keyword_suffix BETWEEN :keyword_suffix AND :keyword_suffix || x'FFFF')
              AND s.provider = :provider
              AND NOT EXISTS (SELECT 1 FROM dismissed_suggestions WHERE url=s.url)
            GROUP BY
              s.id,
              k.confidence
            ORDER BY
              s.score DESC,
              rank DESC
            "#,
                named_params! {
                    ":keyword_prefix": keyword_prefix,
                    ":keyword_suffix": keyword_suffix,
                    ":provider": SuggestionProvider::Pocket,
                },
                |row| -> Result<Option<Suggestion>> {
                    let title = row.get("title")?;
                    let raw_url = row.get::<_, String>("url")?;
                    let score = row.get::<_, f64>("score")?;
                    let confidence = row.get("confidence")?;
                    let full_suffix = row.get::<_, String>("keyword_suffix")?;
                    let suffixes_match = match confidence {
                        KeywordConfidence::Low => full_suffix.starts_with(keyword_suffix),
                        KeywordConfidence::High => full_suffix == keyword_suffix,
                    };
                    if suffixes_match {
                        Ok(Some(Suggestion::Pocket {
                            title,
                            url: raw_url,
                            score,
                            is_top_pick: matches!(confidence, KeywordConfidence::High),
                        }))
                    } else {
                        Ok(None)
                    }
                },
            )?
            .into_iter()
            .flatten()
            .take(
                query
                    .limit
                    .and_then(|limit| usize::try_from(limit).ok())
                    .unwrap_or(usize::MAX),
            )
            .collect();
        Ok(suggestions)
    }

    /// Fetches suggestions for MDN
    pub fn fetch_mdn_suggestions(&self, query: &SuggestionQuery) -> Result<Vec<Suggestion>> {
        let suggestions = self
            .map_prefix_keywords(
                query,
                &SuggestionProvider::Mdn,
                |row, keyword_suffix| -> Result<Option<Suggestion>> {
                    let suggestion_id: i64 = row.get("id")?;
                    let title = row.get("title")?;
                    let raw_url = row.get::<_, String>("url")?;
                    let score = row.get::<_, f64>("score")?;

                    let full_suffix = row.get::<_, String>("keyword_suffix")?;
                    full_suffix
                        .starts_with(keyword_suffix)
                        .then(|| {
                            self.conn.query_row_and_then(
                                r#"
                                SELECT
                                    description
                                FROM
                                    mdn_custom_details
                                WHERE
                                    suggestion_id = :suggestion_id
                                "#,
                                named_params! {
                                    ":suggestion_id": suggestion_id
                                },
                                |row| {
                                    Ok(Suggestion::Mdn {
                                        title,
                                        url: raw_url,
                                        description: row.get("description")?,
                                        score,
                                    })
                                },
                            )
                        })
                        .transpose()
                },
            )?
            .into_iter()
            .flatten()
            .collect();

        Ok(suggestions)
    }

    /// Fetches Fakespot suggestions
    pub fn fetch_fakespot_suggestions(&self, query: &SuggestionQuery) -> Result<Vec<Suggestion>> {
        let fts_query = query.fts_query();
        let sql = r#"
            SELECT
                s.id,
                s.title,
                s.url,
                s.score,
                f.fakespot_grade,
                f.product_id,
                f.rating,
                f.total_reviews,
                i.data,
                i.mimetype,
                f.keywords,
                f.product_type
            FROM
                suggestions s
            JOIN
                fakespot_fts fts
                ON fts.rowid = s.id
            JOIN
                fakespot_custom_details f
                ON f.suggestion_id = s.id
            LEFT JOIN
                icons i
                ON i.id = f.icon_id
            WHERE
                fakespot_fts MATCH ?
            ORDER BY
                s.score DESC
            "#
        .to_string();

        // Store the list of results plus the suggestion id for calculating the FTS match info
        let mut results =
            self.conn
                .query_rows_and_then_cached(&sql, (&fts_query.match_arg,), |row| {
                    let id: usize = row.get(0)?;
                    let score = fakespot::FakespotScore::new(
                        &query.keyword,
                        row.get(10)?,
                        row.get(11)?,
                        row.get(3)?,
                    )
                    .as_suggest_score();
                    Result::Ok((
                        Suggestion::Fakespot {
                            title: row.get(1)?,
                            url: row.get(2)?,
                            score,
                            fakespot_grade: row.get(4)?,
                            product_id: row.get(5)?,
                            rating: row.get(6)?,
                            total_reviews: row.get(7)?,
                            icon: row.get(8)?,
                            icon_mimetype: row.get(9)?,
                            match_info: None,
                        },
                        id,
                    ))
                })?;
        // Sort the results, then add the FTS match info to the first one
        // For performance reasons, this is only calculated for the result with the highest score.
        // We assume that only one that will be shown to the user and therefore the only one we'll
        // collect metrics for.
        results.sort();
        if let Some((suggestion, id)) = results.first_mut() {
            match suggestion {
                Suggestion::Fakespot {
                    match_info, title, ..
                } => {
                    *match_info = Some(self.fetch_fakespot_fts_match_info(&fts_query, *id, title)?);
                }
                _ => unreachable!(),
            }
        }
        Ok(results
            .into_iter()
            .map(|(suggestion, _)| suggestion)
            .collect())
    }

    fn fetch_fakespot_fts_match_info(
        &self,
        fts_query: &FtsQuery<'_>,
        suggestion_id: usize,
        title: &str,
    ) -> Result<FtsMatchInfo> {
        let prefix = if fts_query.is_prefix_query {
            // If the query was a prefix match query then test if the query without the prefix
            // match would have also matched.  If not, then this counts as a prefix match.
            let sql = "SELECT 1 FROM fakespot_fts WHERE rowid = ? AND fakespot_fts MATCH ?";
            let params = (&suggestion_id, &fts_query.match_arg_without_prefix_match);
            !self.conn.exists(sql, params)?
        } else {
            // If not, then it definitely wasn't a prefix match
            false
        };

        Ok(FtsMatchInfo {
            prefix,
            stemming: fts_query.match_required_stemming(title),
        })
    }

    /// Fetches exposure suggestions
    pub fn fetch_exposure_suggestions(&self, query: &SuggestionQuery) -> Result<Vec<Suggestion>> {
        // A single exposure suggestion can be spread across multiple remote
        // settings records, for example if it has very many keywords. On ingest
        // we will insert one row in `exposure_custom_details` and one row in
        // `suggestions` per record, but that's only an implementation detail.
        // Logically, and for consumers, there's only ever at most one exposure
        // suggestion with a given exposure suggestion type.
        //
        // Why do insertions this way? It's how other suggestions work, and it
        // lets us perform relational operations on suggestions, records, and
        // keywords. For example, when a record is deleted we can look up its ID
        // in `suggestions`, join the keywords table on the suggestion ID, and
        // delete the keywords that were added by that record.

        let Some(suggestion_types) = query
            .provider_constraints
            .as_ref()
            .and_then(|c| c.exposure_suggestion_types.as_ref())
        else {
            return Ok(vec![]);
        };

        let keyword = query.keyword.to_lowercase();
        let params = rusqlite::params_from_iter(
            std::iter::once(&SuggestionProvider::Exposure as &dyn ToSql)
                .chain(std::iter::once(&keyword as &dyn ToSql))
                .chain(suggestion_types.iter().map(|t| t as &dyn ToSql)),
        );
        self.conn.query_rows_and_then_cached(
            &format!(
                r#"
                    SELECT DISTINCT
                      d.type
                    FROM
                      suggestions s
                    JOIN
                      exposure_custom_details d
                      ON d.suggestion_id = s.id
                    JOIN
                      keywords k
                      ON k.suggestion_id = s.id
                    WHERE
                      s.provider = ?
                      AND k.keyword = ?
                      AND d.type IN ({})
                    ORDER BY
                      d.type
                    "#,
                repeat_sql_vars(suggestion_types.len())
            ),
            params,
            |row| -> Result<Suggestion> {
                Ok(Suggestion::Exposure {
                    suggestion_type: row.get("type")?,
                    score: 1.0,
                })
            },
        )
    }

    pub fn is_exposure_suggestion_ingested(&self, record_id: &SuggestRecordId) -> Result<bool> {
        Ok(self.conn.exists(
            r#"
            SELECT
              id
            FROM
              suggestions
            WHERE
              record_id = :record_id
            "#,
            named_params! {
                ":record_id": record_id.as_str(),
            },
        )?)
    }

    pub fn is_amp_fts_data_ingested(&self, record_id: &SuggestRecordId) -> Result<bool> {
        Ok(self.conn.exists(
            r#"
            SELECT 1
            FROM suggestions s
            JOIN amp_fts fts
              ON fts.rowid = s.id
            WHERE s.record_id = :record_id
            "#,
            named_params! {
                ":record_id": record_id.as_str(),
            },
        )?)
    }

    /// Inserts all suggestions from a downloaded AMO attachment into
    /// the database.
    pub fn insert_amo_suggestions(
        &mut self,
        record_id: &SuggestRecordId,
        suggestions: &[DownloadedAmoSuggestion],
    ) -> Result<()> {
        let mut suggestion_insert = SuggestionInsertStatement::new(self.conn)?;
        let mut amo_insert = AmoInsertStatement::new(self.conn)?;
        let mut prefix_keyword_insert = PrefixKeywordInsertStatement::new(self.conn)?;
        for suggestion in suggestions {
            self.scope.err_if_interrupted()?;
            let suggestion_id = suggestion_insert.execute(
                record_id,
                &suggestion.title,
                &suggestion.url,
                suggestion.score,
                SuggestionProvider::Amo,
            )?;
            amo_insert.execute(suggestion_id, suggestion)?;
            for (index, keyword) in suggestion.keywords.iter().enumerate() {
                let (keyword_prefix, keyword_suffix) = split_keyword(keyword);
                prefix_keyword_insert.execute(
                    suggestion_id,
                    None,
                    keyword_prefix,
                    keyword_suffix,
                    index,
                )?;
            }
        }
        Ok(())
    }

    /// Inserts all suggestions from a downloaded AMP-Wikipedia attachment into
    /// the database.
    pub fn insert_amp_wikipedia_suggestions(
        &mut self,
        record_id: &SuggestRecordId,
        suggestions: &[DownloadedAmpWikipediaSuggestion],
        enable_fts: bool,
    ) -> Result<()> {
        // Prepare statements outside of the loop.  This results in a large performance
        // improvement on a fresh ingest, since there are so many rows.
        let mut suggestion_insert = SuggestionInsertStatement::new(self.conn)?;
        let mut amp_insert = AmpInsertStatement::new(self.conn)?;
        let mut wiki_insert = WikipediaInsertStatement::new(self.conn)?;
        let mut keyword_insert = KeywordInsertStatement::new(self.conn)?;
        let mut fts_insert = AmpFtsInsertStatement::new(self.conn)?;
        for suggestion in suggestions {
            self.scope.err_if_interrupted()?;
            let common_details = suggestion.common_details();
            let provider = suggestion.provider();

            let suggestion_id = suggestion_insert.execute(
                record_id,
                &common_details.title,
                &common_details.url,
                common_details.score.unwrap_or(DEFAULT_SUGGESTION_SCORE),
                provider,
            )?;
            match suggestion {
                DownloadedAmpWikipediaSuggestion::Amp(amp) => {
                    amp_insert.execute(suggestion_id, amp)?;
                }
                DownloadedAmpWikipediaSuggestion::Wikipedia(wikipedia) => {
                    wiki_insert.execute(suggestion_id, wikipedia)?;
                }
            }
            if enable_fts {
                fts_insert.execute(
                    suggestion_id,
                    &common_details.full_keywords_fts_column(),
                    &common_details.title,
                )?;
            }
            let mut full_keyword_inserter = FullKeywordInserter::new(self.conn, suggestion_id);
            for keyword in common_details.keywords() {
                let full_keyword_id = match (suggestion, keyword.full_keyword) {
                    // Try to associate full keyword data.  Only do this for AMP, we decided to
                    // skip it for Wikipedia in https://bugzilla.mozilla.org/show_bug.cgi?id=1876217
                    (DownloadedAmpWikipediaSuggestion::Amp(_), Some(full_keyword)) => {
                        Some(full_keyword_inserter.maybe_insert(full_keyword)?)
                    }
                    _ => None,
                };

                keyword_insert.execute(
                    suggestion_id,
                    keyword.keyword,
                    full_keyword_id,
                    keyword.rank,
                )?;
            }
        }
        Ok(())
    }

    /// Inserts all suggestions from a downloaded AMP-Mobile attachment into
    /// the database.
    pub fn insert_amp_mobile_suggestions(
        &mut self,
        record_id: &SuggestRecordId,
        suggestions: &[DownloadedAmpSuggestion],
    ) -> Result<()> {
        let mut suggestion_insert = SuggestionInsertStatement::new(self.conn)?;
        let mut amp_insert = AmpInsertStatement::new(self.conn)?;
        let mut keyword_insert = KeywordInsertStatement::new(self.conn)?;
        for suggestion in suggestions {
            self.scope.err_if_interrupted()?;
            let common_details = &suggestion.common_details;
            let suggestion_id = suggestion_insert.execute(
                record_id,
                &common_details.title,
                &common_details.url,
                common_details.score.unwrap_or(DEFAULT_SUGGESTION_SCORE),
                SuggestionProvider::AmpMobile,
            )?;
            amp_insert.execute(suggestion_id, suggestion)?;

            let mut full_keyword_inserter = FullKeywordInserter::new(self.conn, suggestion_id);
            for keyword in common_details.keywords() {
                let full_keyword_id = keyword
                    .full_keyword
                    .map(|full_keyword| full_keyword_inserter.maybe_insert(full_keyword))
                    .transpose()?;
                keyword_insert.execute(
                    suggestion_id,
                    keyword.keyword,
                    full_keyword_id,
                    keyword.rank,
                )?;
            }
        }
        Ok(())
    }

    /// Inserts all suggestions from a downloaded Pocket attachment into
    /// the database.
    pub fn insert_pocket_suggestions(
        &mut self,
        record_id: &SuggestRecordId,
        suggestions: &[DownloadedPocketSuggestion],
    ) -> Result<()> {
        let mut suggestion_insert = SuggestionInsertStatement::new(self.conn)?;
        let mut prefix_keyword_insert = PrefixKeywordInsertStatement::new(self.conn)?;
        for suggestion in suggestions {
            self.scope.err_if_interrupted()?;
            let suggestion_id = suggestion_insert.execute(
                record_id,
                &suggestion.title,
                &suggestion.url,
                suggestion.score,
                SuggestionProvider::Pocket,
            )?;
            for ((rank, keyword), confidence) in suggestion
                .high_confidence_keywords
                .iter()
                .enumerate()
                .zip(std::iter::repeat(KeywordConfidence::High))
                .chain(
                    suggestion
                        .low_confidence_keywords
                        .iter()
                        .enumerate()
                        .zip(std::iter::repeat(KeywordConfidence::Low)),
                )
            {
                let (keyword_prefix, keyword_suffix) = split_keyword(keyword);
                prefix_keyword_insert.execute(
                    suggestion_id,
                    Some(confidence as u8),
                    keyword_prefix,
                    keyword_suffix,
                    rank,
                )?;
            }
        }
        Ok(())
    }

    /// Inserts all suggestions from a downloaded MDN attachment into
    /// the database.
    pub fn insert_mdn_suggestions(
        &mut self,
        record_id: &SuggestRecordId,
        suggestions: &[DownloadedMdnSuggestion],
    ) -> Result<()> {
        let mut suggestion_insert = SuggestionInsertStatement::new(self.conn)?;
        let mut mdn_insert = MdnInsertStatement::new(self.conn)?;
        let mut prefix_keyword_insert = PrefixKeywordInsertStatement::new(self.conn)?;
        for suggestion in suggestions {
            self.scope.err_if_interrupted()?;
            let suggestion_id = suggestion_insert.execute(
                record_id,
                &suggestion.title,
                &suggestion.url,
                suggestion.score,
                SuggestionProvider::Mdn,
            )?;
            mdn_insert.execute(suggestion_id, suggestion)?;
            for (index, keyword) in suggestion.keywords.iter().enumerate() {
                let (keyword_prefix, keyword_suffix) = split_keyword(keyword);
                prefix_keyword_insert.execute(
                    suggestion_id,
                    None,
                    keyword_prefix,
                    keyword_suffix,
                    index,
                )?;
            }
        }
        Ok(())
    }

    /// Inserts all suggestions from a downloaded Fakespot attachment into the database.
    pub fn insert_fakespot_suggestions(
        &mut self,
        record_id: &SuggestRecordId,
        suggestions: &[DownloadedFakespotSuggestion],
    ) -> Result<()> {
        let mut suggestion_insert = SuggestionInsertStatement::new(self.conn)?;
        let mut fakespot_insert = FakespotInsertStatement::new(self.conn)?;
        for suggestion in suggestions {
            let suggestion_id = suggestion_insert.execute(
                record_id,
                &suggestion.title,
                &suggestion.url,
                suggestion.score,
                SuggestionProvider::Fakespot,
            )?;
            fakespot_insert.execute(suggestion_id, suggestion)?;
        }
        Ok(())
    }

    /// Inserts exposure suggestion records data into the database.
    pub fn insert_exposure_suggestions(
        &mut self,
        record_id: &SuggestRecordId,
        suggestion_type: &str,
        suggestions: &[DownloadedExposureSuggestion],
    ) -> Result<()> {
        // `suggestion.keywords()` can yield duplicates for exposure
        // suggestions, so ignore failures on insert in the uniqueness
        // constraint on `(suggestion_id, keyword)`.
        let mut keyword_insert = KeywordInsertStatement::new_with_or_ignore(self.conn)?;
        let mut suggestion_insert = SuggestionInsertStatement::new(self.conn)?;
        let mut exposure_insert = ExposureInsertStatement::new(self.conn)?;
        for suggestion in suggestions {
            self.scope.err_if_interrupted()?;
            let suggestion_id = suggestion_insert.execute(
                record_id,
                "", // title, not used by exposure suggestions
                "", // url, not used by exposure suggestions
                DEFAULT_SUGGESTION_SCORE,
                SuggestionProvider::Exposure,
            )?;
            exposure_insert.execute(suggestion_id, suggestion_type)?;

            // Exposure suggestions don't use `rank` but `(suggestion_id, rank)`
            // must be unique since there's an index on that tuple.
            for (rank, keyword) in suggestion.keywords().enumerate() {
                keyword_insert.execute(suggestion_id, &keyword, None, rank)?;
            }
        }
        Ok(())
    }

    /// Inserts or replaces an icon for a suggestion into the database.
    pub fn put_icon(&mut self, icon_id: &str, data: &[u8], mimetype: &str) -> Result<()> {
        self.conn.execute(
            "INSERT OR REPLACE INTO icons(
                 id,
                 data,
                 mimetype
             )
             VALUES(
                 :id,
                 :data,
                 :mimetype
             )",
            named_params! {
                ":id": icon_id,
                ":data": data,
                ":mimetype": mimetype,
            },
        )?;
        Ok(())
    }

    pub fn insert_dismissal(&self, url: &str) -> Result<()> {
        self.conn.execute(
            "INSERT OR IGNORE INTO dismissed_suggestions(url)
             VALUES(:url)",
            named_params! {
                ":url": url,
            },
        )?;
        Ok(())
    }

    pub fn clear_dismissals(&self) -> Result<()> {
        self.conn.execute("DELETE FROM dismissed_suggestions", ())?;
        Ok(())
    }

    /// Deletes all suggestions associated with a Remote Settings record from
    /// the database.
    pub fn drop_suggestions(&mut self, record_id: &SuggestRecordId) -> Result<()> {
        // If you update this, you probably need to update
        // `schema::clear_database()` too!
        //
        // Call `err_if_interrupted` before each statement since these have historically taken a
        // long time and caused shutdown hangs.

        self.scope.err_if_interrupted()?;
        self.conn.execute_cached(
            "DELETE FROM keywords WHERE suggestion_id IN (SELECT id from suggestions WHERE record_id = :record_id)",
            named_params! { ":record_id": record_id.as_str() },
        )?;
        self.scope.err_if_interrupted()?;
        self.conn.execute_cached(
            "DELETE FROM full_keywords WHERE suggestion_id IN (SELECT id from suggestions WHERE record_id = :record_id)",
            named_params! { ":record_id": record_id.as_str() },
        )?;
        self.scope.err_if_interrupted()?;
        self.conn.execute_cached(
            "DELETE FROM prefix_keywords WHERE suggestion_id IN (SELECT id from suggestions WHERE record_id = :record_id)",
            named_params! { ":record_id": record_id.as_str() },
        )?;
        self.scope.err_if_interrupted()?;
        self.conn.execute_cached(
            "DELETE FROM keywords_metrics WHERE record_id = :record_id",
            named_params! { ":record_id": record_id.as_str() },
        )?;
        self.scope.err_if_interrupted()?;
        self.conn.execute_cached(
            "
            DELETE FROM fakespot_fts
            WHERE rowid IN (SELECT id from suggestions WHERE record_id = :record_id)
            ",
            named_params! { ":record_id": record_id.as_str() },
        )?;
        self.scope.err_if_interrupted()?;
        self.conn.execute_cached(
            "DELETE FROM suggestions WHERE record_id = :record_id",
            named_params! { ":record_id": record_id.as_str() },
        )?;
        self.scope.err_if_interrupted()?;
        self.conn.execute_cached(
            "DELETE FROM yelp_subjects WHERE record_id = :record_id",
            named_params! { ":record_id": record_id.as_str() },
        )?;
        self.scope.err_if_interrupted()?;
        self.conn.execute_cached(
            "DELETE FROM yelp_modifiers WHERE record_id = :record_id",
            named_params! { ":record_id": record_id.as_str() },
        )?;
        self.scope.err_if_interrupted()?;
        self.conn.execute_cached(
            "DELETE FROM yelp_location_signs WHERE record_id = :record_id",
            named_params! { ":record_id": record_id.as_str() },
        )?;
        self.scope.err_if_interrupted()?;
        self.conn.execute_cached(
            "DELETE FROM yelp_custom_details WHERE record_id = :record_id",
            named_params! { ":record_id": record_id.as_str() },
        )?;
        self.scope.err_if_interrupted()?;
        self.conn.execute_cached(
            "DELETE FROM geonames WHERE record_id = :record_id",
            named_params! { ":record_id": record_id.as_str() },
        )?;
        self.scope.err_if_interrupted()?;
        self.conn.execute_cached(
            "DELETE FROM geonames_metrics WHERE record_id = :record_id",
            named_params! { ":record_id": record_id.as_str() },
        )?;

        // Invalidate these caches since we might have deleted a record their
        // contents are based on.
        self.weather_cache.take();
        self.geoname_cache.take();

        Ok(())
    }

    /// Deletes an icon for a suggestion from the database.
    pub fn drop_icon(&mut self, icon_id: &str) -> Result<()> {
        self.conn.execute_cached(
            "DELETE FROM icons WHERE id = :id",
            named_params! { ":id": icon_id },
        )?;
        Ok(())
    }

    /// Clears the database, removing all suggestions, icons, and metadata.
    pub fn clear(&mut self) -> Result<()> {
        Ok(clear_database(self.conn)?)
    }

    /// Returns the value associated with a metadata key.
    pub fn get_meta<T: FromSql>(&self, key: &str) -> Result<Option<T>> {
        Ok(self.conn.try_query_one(
            "SELECT value FROM meta WHERE key = :key",
            named_params! { ":key": key },
            true,
        )?)
    }

    /// Sets the value for a metadata key.
    pub fn put_meta(&mut self, key: &str, value: impl ToSql) -> Result<()> {
        self.conn.execute_cached(
            "INSERT OR REPLACE INTO meta(key, value) VALUES(:key, :value)",
            named_params! { ":key": key, ":value": value },
        )?;
        Ok(())
    }

    /// Stores global Suggest configuration data.
    pub fn put_global_config(&mut self, config: &SuggestGlobalConfig) -> Result<()> {
        self.put_meta(GLOBAL_CONFIG_META_KEY, serde_json::to_string(config)?)
    }

    /// Gets the stored global Suggest configuration data or a default config if
    /// none is stored.
    pub fn get_global_config(&self) -> Result<SuggestGlobalConfig> {
        self.get_meta::<String>(GLOBAL_CONFIG_META_KEY)?
            .map_or_else(
                || Ok(SuggestGlobalConfig::default()),
                |json| Ok(serde_json::from_str(&json)?),
            )
    }

    /// Stores configuration data for a given provider.
    pub fn put_provider_config(
        &mut self,
        provider: SuggestionProvider,
        config: &SuggestProviderConfig,
    ) -> Result<()> {
        self.put_meta(
            &provider_config_meta_key(provider),
            serde_json::to_string(config)?,
        )
    }

    /// Gets the stored configuration data for a given provider or None if none
    /// is stored.
    pub fn get_provider_config(
        &self,
        provider: SuggestionProvider,
    ) -> Result<Option<SuggestProviderConfig>> {
        self.get_meta::<String>(&provider_config_meta_key(provider))?
            .map_or_else(|| Ok(None), |json| Ok(serde_json::from_str(&json)?))
    }
}

#[derive(Debug, PartialEq, Eq, Hash)]
pub struct IngestedRecord {
    pub id: SuggestRecordId,
    pub collection: String,
    pub record_type: String,
    pub last_modified: u64,
}

impl IngestedRecord {
    fn from_row(row: &rusqlite::Row) -> Result<Self> {
        Ok(Self {
            id: SuggestRecordId::new(row.get("id")?),
            collection: row.get("collection")?,
            record_type: row.get("type")?,
            last_modified: row.get("last_modified")?,
        })
    }
}

/// Helper struct to get full_keyword_ids for a suggestion
///
/// `FullKeywordInserter` handles repeated full keywords efficiently.  The first instance will
/// cause a row to be inserted into the database.  Subsequent instances will return the same
/// full_keyword_id.
struct FullKeywordInserter<'a> {
    conn: &'a Connection,
    suggestion_id: i64,
    last_inserted: Option<(&'a str, i64)>,
}

impl<'a> FullKeywordInserter<'a> {
    fn new(conn: &'a Connection, suggestion_id: i64) -> Self {
        Self {
            conn,
            suggestion_id,
            last_inserted: None,
        }
    }

    fn maybe_insert(&mut self, full_keyword: &'a str) -> rusqlite::Result<i64> {
        match self.last_inserted {
            Some((s, id)) if s == full_keyword => Ok(id),
            _ => {
                let full_keyword_id = self.conn.query_row_and_then(
                    "INSERT INTO full_keywords(
                        suggestion_id,
                        full_keyword
                     )
                     VALUES(
                        :suggestion_id,
                        :keyword
                     )
                     RETURNING id",
                    named_params! {
                        ":keyword": full_keyword,
                        ":suggestion_id": self.suggestion_id,
                    },
                    |row| row.get(0),
                )?;
                self.last_inserted = Some((full_keyword, full_keyword_id));
                Ok(full_keyword_id)
            }
        }
    }
}

// ======================== Statement types ========================
//
// During ingestion we can insert hundreds of thousands of rows.  These types enable speedups by
// allowing us to prepare a statement outside a loop and use it many times inside the loop.
//
// Each type wraps [Connection::prepare] and [Statement] to provide a simplified interface,
// tailored to a specific query.
//
// This pattern is applicable for whenever we execute the same query repeatedly in a loop.
// The impact scales with the number of loop iterations, which is why we currently don't do this
// for providers like Mdn, Pocket, and Weather, which have relatively small number of records
// compared to Amp/Wikipedia.

pub(crate) struct SuggestionInsertStatement<'conn>(rusqlite::Statement<'conn>);

impl<'conn> SuggestionInsertStatement<'conn> {
    pub(crate) fn new(conn: &'conn Connection) -> Result<Self> {
        Ok(Self(conn.prepare(
            "INSERT INTO suggestions(
                 record_id,
                 title,
                 url,
                 score,
                 provider
             )
             VALUES(?, ?, ?, ?, ?)
             RETURNING id",
        )?))
    }

    /// Execute the insert and return the `suggestion_id` for the new row
    pub(crate) fn execute(
        &mut self,
        record_id: &SuggestRecordId,
        title: &str,
        url: &str,
        score: f64,
        provider: SuggestionProvider,
    ) -> Result<i64> {
        self.0
            .query_row(
                (record_id.as_str(), title, url, score, provider as u8),
                |row| row.get(0),
            )
            .with_context("suggestion insert")
    }
}

struct AmpInsertStatement<'conn>(rusqlite::Statement<'conn>);

impl<'conn> AmpInsertStatement<'conn> {
    fn new(conn: &'conn Connection) -> Result<Self> {
        Ok(Self(conn.prepare(
            "INSERT INTO amp_custom_details(
                 suggestion_id,
                 advertiser,
                 block_id,
                 iab_category,
                 impression_url,
                 click_url,
                 icon_id
             )
             VALUES(?, ?, ?, ?, ?, ?, ?)
             ",
        )?))
    }

    fn execute(&mut self, suggestion_id: i64, amp: &DownloadedAmpSuggestion) -> Result<()> {
        self.0
            .execute((
                suggestion_id,
                &.advertiser,
                amp.block_id,
                &.iab_category,
                &.impression_url,
                &.click_url,
                &.icon_id,
            ))
            .with_context("amp insert")?;
        Ok(())
    }
}

struct WikipediaInsertStatement<'conn>(rusqlite::Statement<'conn>);

impl<'conn> WikipediaInsertStatement<'conn> {
    fn new(conn: &'conn Connection) -> Result<Self> {
        Ok(Self(conn.prepare(
            "INSERT INTO wikipedia_custom_details(
                 suggestion_id,
                 icon_id
             )
             VALUES(?, ?)
             ",
        )?))
    }

    fn execute(
        &mut self,
        suggestion_id: i64,
        wikipedia: &DownloadedWikipediaSuggestion,
    ) -> Result<()> {
        self.0
            .execute((suggestion_id, &wikipedia.icon_id))
            .with_context("wikipedia insert")?;
        Ok(())
    }
}

struct AmoInsertStatement<'conn>(rusqlite::Statement<'conn>);

impl<'conn> AmoInsertStatement<'conn> {
    fn new(conn: &'conn Connection) -> Result<Self> {
        Ok(Self(conn.prepare(
            "INSERT INTO amo_custom_details(
                 suggestion_id,
                 description,
                 guid,
--> --------------------

--> maximum size reached

--> --------------------

[ zur Elbe Produktseite wechseln0.70Quellennavigators  Analyse erneut starten  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....
    

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge