Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/third_party/rust/sync15/src/client/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 19 kB image not shown  

Quelle  sync_multiple.rs   Sprache: unbekannt

 
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

// This helps you perform a sync of multiple engines and helps you manage
// global and local state between syncs.

use super::state::{EngineChangesNeeded, GlobalState, PersistedGlobalState, SetupStateMachine};
use super::status::{ServiceStatus, SyncResult};
use super::storage_client::{BackoffListener, Sync15StorageClient, Sync15StorageClientInit};
use crate::clients_engine::{self, CommandProcessor, CLIENTS_TTL_REFRESH};
use crate::engine::{EngineSyncAssociation, SyncEngine};
use crate::error::Error;
use crate::telemetry;
use crate::KeyBundle;
use interrupt_support::Interruptee;
use std::collections::HashMap;
use std::result;
use std::time::{Duration, SystemTime};

/// Info about the client to use. We reuse the client unless
/// we discover the client_init has changed, in which case we re-create one.
#[derive(Debug)]
struct ClientInfo {
    // the client_init used to create `client`.
    client_init: Sync15StorageClientInit,
    // the client (our tokenserver state machine state, and our http library's state)
    client: Sync15StorageClient,
}

impl ClientInfo {
    fn new(ci: &Sync15StorageClientInit) -> Result<Self, Error> {
        Ok(Self {
            client_init: ci.clone(),
            client: Sync15StorageClient::new(ci.clone())?,
        })
    }
}

/// Info we want callers to engine *in memory* for us so that subsequent
/// syncs are faster. This should never be persisted to storage as it holds
/// sensitive information, such as the sync decryption keys.
#[derive(Debug, Default)]
pub struct MemoryCachedState {
    last_client_info: Option<ClientInfo>,
    last_global_state: Option<GlobalState>,
    // These are just engined in memory, as persisting an invalid value far in the
    // future has the potential to break sync for good.
    next_sync_after: Option<SystemTime>,
    next_client_refresh_after: Option<SystemTime>,
}

impl MemoryCachedState {
    // Called we notice the cached state is stale.
    pub fn clear_sensitive_info(&mut self) {
        self.last_client_info = None;
        self.last_global_state = None;
        // Leave the backoff time, as there's no reason to think it's not still
        // true.
    }
    pub fn get_next_sync_after(&self) -> Option<SystemTime> {
        self.next_sync_after
    }
    pub fn should_refresh_client(&self) -> bool {
        match self.next_client_refresh_after {
            Some(t) => SystemTime::now() > t,
            None => true,
        }
    }
    pub fn note_client_refresh(&mut self) {
        self.next_client_refresh_after =
            Some(SystemTime::now() + Duration::from_secs(CLIENTS_TTL_REFRESH));
    }
}

/// Sync multiple engines
/// * `engines` - The engines to sync
/// * `persisted_global_state` - The global state to use, or None if never
///   before provided. At the end of the sync, and even when the sync fails,
///   the value in this cell should be persisted to permanent storage and
///   provided next time the sync is called.
/// * `last_client_info` - The client state to use, or None if never before
///   provided. At the end of the sync, the value should be persisted
///   *in memory only* - it should not be persisted to disk.
/// * `storage_init` - Information about how the sync http client should be
///   configured.
/// * `root_sync_key` - The KeyBundle used for encryption.
///
/// Returns a map, keyed by name and holding an error value - if any engine
/// fails, the sync will continue on to other engines, but the error will be
/// places in this map. The absence of a name in the map implies the engine
/// succeeded.
pub fn sync_multiple(
    engines: &[&dyn SyncEngine],
    persisted_global_state: &mut Option<String>,
    mem_cached_state: &mut MemoryCachedState,
    storage_init: &Sync15StorageClientInit,
    root_sync_key: &KeyBundle,
    interruptee: &dyn Interruptee,
    req_info: Option<SyncRequestInfo<'_>>,
) -> SyncResult {
    sync_multiple_with_command_processor(
        None,
        engines,
        persisted_global_state,
        mem_cached_state,
        storage_init,
        root_sync_key,
        interruptee,
        req_info,
    )
}

/// Like `sync_multiple`, but specifies an optional command processor to handle
/// commands from the clients collection. This function is called by the sync
/// manager, which provides its own processor.
#[allow(clippy::too_many_arguments)]
pub fn sync_multiple_with_command_processor(
    command_processor: Option<&dyn CommandProcessor>,
    engines: &[&dyn SyncEngine],
    persisted_global_state: &mut Option<String>,
    mem_cached_state: &mut MemoryCachedState,
    storage_init: &Sync15StorageClientInit,
    root_sync_key: &KeyBundle,
    interruptee: &dyn Interruptee,
    req_info: Option<SyncRequestInfo<'_>>,
) -> SyncResult {
    log::info!("Syncing {} engines", engines.len());
    let mut sync_result = SyncResult {
        service_status: ServiceStatus::OtherError,
        result: Ok(()),
        declined: None,
        next_sync_after: None,
        engine_results: HashMap::with_capacity(engines.len()),
        telemetry: telemetry::SyncTelemetryPing::new(),
    };
    let backoff = super::storage_client::new_backoff_listener();
    let req_info = req_info.unwrap_or_default();
    let driver = SyncMultipleDriver {
        command_processor,
        engines,
        storage_init,
        interruptee,
        engines_to_state_change: req_info.engines_to_state_change,
        backoff: backoff.clone(),
        root_sync_key,
        result: &mut sync_result,
        persisted_global_state,
        mem_cached_state,
        saw_auth_error: false,
        ignore_soft_backoff: req_info.is_user_action,
    };
    match driver.sync() {
        Ok(()) => {
            log::debug!(
                "sync was successful, final status={:?}",
                sync_result.service_status
            );
        }
        Err(e) => {
            log::warn!(
                "sync failed: {}, final status={:?}",
                e,
                sync_result.service_status,
            );
            sync_result.result = Err(e);
        }
    }
    // Respect `backoff` value when computing the next sync time even if we were
    // ignoring it during the sync
    sync_result.set_sync_after(backoff.get_required_wait(false).unwrap_or_default());
    mem_cached_state.next_sync_after = sync_result.next_sync_after;
    log::trace!("Sync result: {:?}", sync_result);
    sync_result
}

/// This is essentially a bag of information that the sync manager knows, but
/// otherwise we won't. It should probably be rethought if it gains many more
/// fields.
#[derive(Debug, Default)]
pub struct SyncRequestInfo<'a> {
    pub engines_to_state_change: Option<&'a HashMap<String, bool>>,
    pub is_user_action: bool,
}

// The sync multiple driver
struct SyncMultipleDriver<'info, 'res, 'pgs, 'mcs> {
    command_processor: Option<&'info dyn CommandProcessor>,
    engines: &'info [&'info dyn SyncEngine],
    storage_init: &'info Sync15StorageClientInit,
    root_sync_key: &'info KeyBundle,
    interruptee: &'info dyn Interruptee,
    backoff: BackoffListener,
    engines_to_state_change: Option<&'info HashMap<String, bool>>,
    result: &'res mut SyncResult,
    persisted_global_state: &'pgs mut Option<String>,
    mem_cached_state: &'mcs mut MemoryCachedState,
    ignore_soft_backoff: bool,
    saw_auth_error: bool,
}

impl SyncMultipleDriver<'_, '_, '_, '_> {
    /// The actual worker for sync_multiple.
    fn sync(mut self) -> result::Result<(), Error> {
        log::info!("Loading/initializing persisted state");
        let mut pgs = self.prepare_persisted_state();

        log::info!("Preparing client info");
        let client_info = self.prepare_client_info()?;

        if self.was_interrupted() {
            return Ok(());
        }

        log::info!("Entering sync state machine");
        // Advance the state machine to the point where it can perform a full
        // sync. This may involve uploading meta/global, crypto/keys etc.
        let mut global_state = self.run_state_machine(&client_info, &mut pgs)?;

        if self.was_interrupted() {
            return Ok(());
        }

        // Set the service status to OK here - we may adjust it based on an individual
        // engine failing.
        self.result.service_status = ServiceStatus::Ok;

        let clients_engine = if let Some(command_processor) = self.command_processor {
            log::info!("Synchronizing clients engine");
            let should_refresh = self.mem_cached_state.should_refresh_client();
            let mut engine = clients_engine::Engine::new(command_processor, self.interruptee);
            if let Err(e) = engine.sync(
                &client_info.client,
                &global_state,
                self.root_sync_key,
                should_refresh,
            ) {
                // Record telemetry with the error just in case...
                let mut telem_sync = telemetry::SyncTelemetry::new();
                let mut telem_engine = telemetry::Engine::new("clients");
                telem_engine.failure(&e);
                telem_sync.engine(telem_engine);
                self.result.service_status = ServiceStatus::from_err(&e);

                // ...And bail, because a clients engine sync failure is fatal.
                return Err(e);
            }
            // We don't record telemetry for successful clients engine
            // syncs, since we only keep client records in memory, we
            // expect the counts to be the same most times, and a
            // failure aborts the entire sync.
            if self.was_interrupted() {
                return Ok(());
            }
            self.mem_cached_state.note_client_refresh();
            Some(engine)
        } else {
            None
        };

        log::info!("Synchronizing engines");

        let telem_sync =
            self.sync_engines(&client_info, &mut global_state, clients_engine.as_ref());
        self.result.telemetry.sync(telem_sync);

        log::info!("Finished syncing engines.");

        if !self.saw_auth_error {
            log::trace!("Updating persisted global state");
            self.mem_cached_state.last_client_info = Some(client_info);
            self.mem_cached_state.last_global_state = Some(global_state);
        }

        Ok(())
    }

    fn was_interrupted(&mut self) -> bool {
        if self.interruptee.was_interrupted() {
            log::info!("Interrupted, bailing out");
            self.result.service_status = ServiceStatus::Interrupted;
            true
        } else {
            false
        }
    }

    fn sync_engines(
        &mut self,
        client_info: &ClientInfo,
        global_state: &mut GlobalState,
        clients: Option<&clients_engine::Engine<'_>>,
    ) -> telemetry::SyncTelemetry {
        let mut telem_sync = telemetry::SyncTelemetry::new();
        for engine in self.engines {
            let name = engine.collection_name();
            if self
                .backoff
                .get_required_wait(self.ignore_soft_backoff)
                .is_some()
            {
                log::warn!("Got backoff, bailing out of sync early");
                break;
            }
            if global_state.global.declined.iter().any(|e| e == &*name) {
                log::info!("The {} engine is declined. Skipping", name);
                continue;
            }
            log::info!("Syncing {} engine!", name);

            let mut telem_engine = telemetry::Engine::new(&*name);
            let result = super::sync::synchronize_with_clients_engine(
                &client_info.client,
                global_state,
                self.root_sync_key,
                clients,
                *engine,
                true,
                &mut telem_engine,
                self.interruptee,
            );

            match result {
                Ok(()) => log::info!("Sync of {} was successful!", name),
                Err(ref e) => {
                    log::warn!("Sync of {} failed! {:?}", name, e);
                    let this_status = ServiceStatus::from_err(e);
                    // The only error which forces us to discard our state is an
                    // auth error.
                    self.saw_auth_error =
                        self.saw_auth_error || this_status == ServiceStatus::AuthenticationError;
                    telem_engine.failure(e);
                    // If the failure from the engine looks like anything other than
                    // a "engine error" we don't bother trying the others.
                    if this_status != ServiceStatus::OtherError {
                        telem_sync.engine(telem_engine);
                        self.result.engine_results.insert(name.into(), result);
                        self.result.service_status = this_status;
                        break;
                    }
                }
            }
            telem_sync.engine(telem_engine);
            self.result.engine_results.insert(name.into(), result);
            if self.was_interrupted() {
                break;
            }
        }
        telem_sync
    }

    fn run_state_machine(
        &mut self,
        client_info: &ClientInfo,
        pgs: &mut PersistedGlobalState,
    ) -> result::Result<GlobalState, Error> {
        let last_state = self.mem_cached_state.last_global_state.take();

        let mut state_machine = SetupStateMachine::for_full_sync(
            &client_info.client,
            self.root_sync_key,
            pgs,
            self.engines_to_state_change,
            self.interruptee,
        );

        log::info!("Advancing state machine to ready (full)");
        let res = state_machine.run_to_ready(last_state);
        // Grab this now even though we don't need it until later to avoid a
        // lifetime issue
        let changes = state_machine.changes_needed.take();
        // The state machine might have updated our persisted_global_state, so
        // update the caller's repr of it.
        *self.persisted_global_state = Some(serde_json::to_string(&pgs)?);

        // Now that we've gone through the state machine, engine the declined list in
        // the sync_result
        self.result.declined = Some(pgs.get_declined().to_vec());
        log::debug!(
            "Declined engines list after state machine set to: {:?}",
            self.result.declined,
        );

        if let Some(c) = changes {
            self.wipe_or_reset_engines(c, &client_info.client)?;
        }
        let state = match res {
            Err(e) => {
                self.result.service_status = ServiceStatus::from_err(&e);
                return Err(e);
            }
            Ok(state) => state,
        };
        self.result.telemetry.uid(client_info.client.hashed_uid()?);
        // As for client_info, put None back now so we start from scratch on error.
        self.mem_cached_state.last_global_state = None;
        Ok(state)
    }

    fn wipe_or_reset_engines(
        &mut self,
        changes: EngineChangesNeeded,
        client: &Sync15StorageClient,
    ) -> result::Result<(), Error> {
        if changes.local_resets.is_empty() && changes.remote_wipes.is_empty() {
            return Ok(());
        }
        for e in &changes.remote_wipes {
            log::info!("Engine {:?} just got disabled locally, wiping server", e);
            client.wipe_remote_engine(e)?;
        }

        for s in self.engines {
            let name = s.collection_name();
            if changes.local_resets.contains(&*name) {
                log::info!("Resetting engine {}, as it was declined remotely", name);
                s.reset(&EngineSyncAssociation::Disconnected)?;
            }
        }

        Ok(())
    }

    fn prepare_client_info(&mut self) -> result::Result<ClientInfo, Error> {
        let mut client_info = match self.mem_cached_state.last_client_info.take() {
            Some(client_info) => {
                // if our storage_init has changed it probably means the user has
                // changed, courtesy of the 'kid' in the structure. Thus, we can't
                // reuse the client or the memory cached state. We do keep the disk
                // state as currently that's only the declined list.
                if client_info.client_init != *self.storage_init {
                    log::info!("Discarding all state as the account might have changed");
                    *self.mem_cached_state = MemoryCachedState::default();
                    ClientInfo::new(self.storage_init)?
                } else {
                    log::debug!("Reusing memory-cached client_info");
                    // we can reuse it (which should be the common path)
                    client_info
                }
            }
            None => {
                log::debug!("mem_cached_state was stale or missing, need setup");
                // We almost certainly have no other state here, but to be safe, we
                // throw away any memory state we do have.
                self.mem_cached_state.clear_sensitive_info();
                ClientInfo::new(self.storage_init)?
            }
        };
        // Ensure we use the correct listener here rather than on all the branches
        // above, since it seems less error prone.
        client_info.client.backoff = self.backoff.clone();
        Ok(client_info)
    }

    fn prepare_persisted_state(&mut self) -> PersistedGlobalState {
        // Note that any failure to use a persisted state means we also decline
        // to use our memory cached state, so that we fully rebuild that
        // persisted state for next time.
        match self.persisted_global_state {
            Some(persisted_string) if !persisted_string.is_empty() => {
                match serde_json::from_str::<PersistedGlobalState>(persisted_string) {
                    Ok(state) => {
                        log::trace!("Read persisted state: {:?}", state);
                        // Note that we don't set `result.declined` from the
                        // data in state - it remains None, which explicitly
                        // indicates "we don't have updated info".
                        state
                    }
                    _ => {
                        // Don't log the error since it might contain sensitive
                        // info (although currently it only contains the declined engines list)
                        error_support::report_error!(
                            "sync15-prepare-persisted-state",
                            "Failed to parse PersistedGlobalState from JSON! Falling back to default"
                        );
                        *self.mem_cached_state = MemoryCachedState::default();
                        PersistedGlobalState::default()
                    }
                }
            }
            _ => {
                log::info!(
                    "The application didn't give us persisted state - \
                     this is only expected on the very first run for a given user."
                );
                *self.mem_cached_state = MemoryCachedState::default();
                PersistedGlobalState::default()
            }
        }
    }
}

[ Dauer der Verarbeitung: 0.28 Sekunden  (vorverarbeitet)  ]