Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/third_party/rust/sync15/src/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 23 kB image not shown  

Quelle  telemetry.rs   Sprache: unbekannt

 
Spracherkennung für: .rs vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]

/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

//! Manage recording sync telemetry. Assumes some external telemetry
//! library/code which manages submitting.

use crate::error::Error;
#[cfg(feature = "sync-client")]
use crate::error::ErrorResponse;

use std::collections::HashMap;
use std::time;

use serde::{ser, Serialize, Serializer};

// A test helper, used by the many test modules below.
#[cfg(test)]
fn assert_json<T>(v: &T, expected: serde_json::Value)
where
    T: serde::Serialize + ?Sized,
{
    assert_eq!(
        serde_json::to_value(v).expect("should get a value"),
        expected
    );
}

/// What we record for 'when' and 'took' in a telemetry record.
#[derive(Debug, Serialize)]
struct WhenTook {
    when: f64,
    #[serde(skip_serializing_if = "crate::skip_if_default")]
    took: u64,
}

/// What we track while recording 'when' and 'took. It serializes as a WhenTook,
/// except when .finished() hasn't been called, in which case it panics.
#[allow(dead_code)]
#[derive(Debug)]
enum Stopwatch {
    Started(time::SystemTime, time::Instant),
    Finished(WhenTook),
}

impl Default for Stopwatch {
    fn default() -> Self {
        Stopwatch::new()
    }
}

impl Stopwatch {
    fn new() -> Self {
        Stopwatch::Started(time::SystemTime::now(), time::Instant::now())
    }

    // For tests we don't want real timestamps because we test against literals.
    #[cfg(test)]
    fn finished(&self) -> Self {
        Stopwatch::Finished(WhenTook { when: 0.0, took: 0 })
    }

    #[cfg(not(test))]
    fn finished(&self) -> Self {
        match self {
            Stopwatch::Started(st, si) => {
                let std = st.duration_since(time::UNIX_EPOCH).unwrap_or_default();
                let when = std.as_secs() as f64; // we don't want sub-sec accuracy. Do we need to write a float?

                let sid = si.elapsed();
                let took = sid.as_secs() * 1000 + (u64::from(sid.subsec_nanos()) / 1_000_000);
                Stopwatch::Finished(WhenTook { when, took })
            }
            _ => {
                unreachable!("can't finish twice");
            }
        }
    }
}

impl Serialize for Stopwatch {
    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        match self {
            Stopwatch::Started(_, _) => Err(ser::Error::custom("StopWatch has not been finished")),
            Stopwatch::Finished(c) => c.serialize(serializer),
        }
    }
}

#[cfg(test)]
mod stopwatch_tests {
    use super::*;

    // A wrapper struct because we flatten - this struct should serialize with
    // 'when' and 'took' keys (but with no 'sw'.)
    #[derive(Debug, Serialize)]
    struct WT {
        #[serde(flatten)]
        sw: Stopwatch,
    }

    #[test]
    fn test_not_finished() {
        let wt = WT {
            sw: Stopwatch::new(),
        };
        serde_json::to_string(&wt).expect_err("unfinished stopwatch should fail");
    }

    #[test]
    fn test() {
        assert_json(
            &WT {
                sw: Stopwatch::Finished(WhenTook { when: 1.0, took: 1 }),
            },
            serde_json::json!({"when": 1.0, "took": 1}),
        );
        assert_json(
            &WT {
                sw: Stopwatch::Finished(WhenTook { when: 1.0, took: 0 }),
            },
            serde_json::json!({"when": 1.0}),
        );
    }
}

/// A generic "Event" - suitable for all kinds of pings (although this module
/// only cares about the sync ping)
#[derive(Debug, Serialize)]
pub struct Event {
    // We use static str references as we expect values to be literals.
    object: &'static str,

    method: &'static str,

    // Maybe "value" should be a string?
    #[serde(skip_serializing_if = "Option::is_none")]
    value: Option<&'static str>,

    // we expect the keys to be literals but values are real strings.
    #[serde(skip_serializing_if = "Option::is_none")]
    extra: Option<HashMap<&'static str, String>>,
}

impl Event {
    pub fn new(object: &'static str, method: &'static str) -> Self {
        assert!(object.len() <= 20);
        assert!(method.len() <= 20);
        Self {
            object,
            method,
            value: None,
            extra: None,
        }
    }

    pub fn value(mut self, v: &'static str) -> Self {
        assert!(v.len() <= 80);
        self.value = Some(v);
        self
    }

    pub fn extra(mut self, key: &'static str, val: String) -> Self {
        assert!(key.len() <= 15);
        assert!(val.len() <= 85);
        match self.extra {
            None => self.extra = Some(HashMap::new()),
            Some(ref e) => assert!(e.len() < 10),
        }
        self.extra.as_mut().unwrap().insert(key, val);
        self
    }
}

#[cfg(test)]
mod test_events {
    use super::*;

    #[test]
    #[should_panic]
    fn test_invalid_length_ctor() {
        Event::new("A very long object value", "Method");
    }

    #[test]
    #[should_panic]
    fn test_invalid_length_extra_key() {
        Event::new("O", "M").extra("A very long key value", "v".to_string());
    }

    #[test]
    #[should_panic]
    fn test_invalid_length_extra_val() {
        let l = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ
                abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
        Event::new("O", "M").extra("k", l.to_string());
    }

    #[test]
    #[should_panic]
    fn test_too_many_extras() {
        let l = "abcdefghijk";
        let mut e = Event::new("Object", "Method");
        for i in 0..l.len() {
            e = e.extra(&l[i..=i], "v".to_string());
        }
    }

    #[test]
    fn test_json() {
        assert_json(
            &Event::new("Object", "Method").value("Value"),
            serde_json::json!({"object": "Object", "method": "Method", "value": "Value"}),
        );

        assert_json(
            &Event::new("Object", "Method").extra("one", "one".to_string()),
            serde_json::json!({"object": "Object",
             "method": "Method",
             "extra": {"one": "one"}
            }),
        )
    }
}

/// A Sync failure.
#[derive(Debug, Serialize)]
#[serde(tag = "name")]
pub enum SyncFailure {
    #[serde(rename = "shutdownerror")]
    Shutdown,

    #[serde(rename = "othererror")]
    Other { error: String },

    #[serde(rename = "unexpectederror")]
    Unexpected { error: String },

    #[serde(rename = "autherror")]
    Auth { from: &'static str },

    #[serde(rename = "httperror")]
    Http { code: u16 },
}

#[cfg(test)]
mod test {
    use super::*;

    #[test]
    fn reprs() {
        assert_json(
            &SyncFailure::Shutdown,
            serde_json::json!({"name": "shutdownerror"}),
        );

        assert_json(
            &SyncFailure::Other {
                error: "dunno".to_string(),
            },
            serde_json::json!({"name": "othererror", "error": "dunno"}),
        );

        assert_json(
            &SyncFailure::Unexpected {
                error: "dunno".to_string(),
            },
            serde_json::json!({"name": "unexpectederror", "error": "dunno"}),
        );

        assert_json(
            &SyncFailure::Auth { from: "FxA" },
            serde_json::json!({"name": "autherror", "from": "FxA"}),
        );

        assert_json(
            &SyncFailure::Http { code: 500 },
            serde_json::json!({"name": "httperror", "code": 500}),
        );
    }
}

/// Incoming record for an engine's sync
#[derive(Debug, Default, Serialize)]
pub struct EngineIncoming {
    #[serde(skip_serializing_if = "crate::skip_if_default")]
    applied: u32,

    #[serde(skip_serializing_if = "crate::skip_if_default")]
    failed: u32,

    #[serde(rename = "newFailed")]
    #[serde(skip_serializing_if = "crate::skip_if_default")]
    new_failed: u32,

    #[serde(skip_serializing_if = "crate::skip_if_default")]
    reconciled: u32,
}

impl EngineIncoming {
    pub fn new() -> Self {
        Self {
            ..Default::default()
        }
    }

    // A helper used via skip_serializing_if
    fn is_empty(inc: &Option<Self>) -> bool {
        match inc {
            Some(a) => a.applied == 0 && a.failed == 0 && a.new_failed == 0 && a.reconciled == 0,
            None => true,
        }
    }

    /// Increment the value of `applied` by `n`.
    #[inline]
    pub fn applied(&mut self, n: u32) {
        self.applied += n;
    }

    /// Increment the value of `failed` by `n`.
    #[inline]
    pub fn failed(&mut self, n: u32) {
        self.failed += n;
    }

    /// Increment the value of `new_failed` by `n`.
    #[inline]
    pub fn new_failed(&mut self, n: u32) {
        self.new_failed += n;
    }

    /// Increment the value of `reconciled` by `n`.
    #[inline]
    pub fn reconciled(&mut self, n: u32) {
        self.reconciled += n;
    }

    /// Accumulate values from another EngineIncoming - useful when dealing with
    /// incoming batches.
    fn accum(&mut self, other: &EngineIncoming) {
        self.applied += other.applied;
        self.failed += other.failed;
        self.new_failed += other.new_failed;
        self.reconciled += other.reconciled;
    }

    /// Get the value of `applied`. Mostly useful for testing.
    #[inline]
    pub fn get_applied(&self) -> u32 {
        self.applied
    }

    /// Get the value of `failed`. Mostly useful for testing.
    #[inline]
    pub fn get_failed(&self) -> u32 {
        self.failed
    }

    /// Get the value of `new_failed`. Mostly useful for testing.
    #[inline]
    pub fn get_new_failed(&self) -> u32 {
        self.new_failed
    }

    /// Get the value of `reconciled`. Mostly useful for testing.
    #[inline]
    pub fn get_reconciled(&self) -> u32 {
        self.reconciled
    }
}

/// Outgoing record for an engine's sync.
#[derive(Debug, Default, Serialize)]
pub struct EngineOutgoing {
    #[serde(skip_serializing_if = "crate::skip_if_default")]
    sent: usize,

    #[serde(skip_serializing_if = "crate::skip_if_default")]
    failed: usize,
}

impl EngineOutgoing {
    pub fn new() -> Self {
        EngineOutgoing {
            ..Default::default()
        }
    }

    #[inline]
    pub fn sent(&mut self, n: usize) {
        self.sent += n;
    }

    #[inline]
    pub fn failed(&mut self, n: usize) {
        self.failed += n;
    }
}

/// One engine's sync.
#[derive(Debug, Serialize)]
pub struct Engine {
    name: String,

    #[serde(flatten)]
    when_took: Stopwatch,

    #[serde(skip_serializing_if = "EngineIncoming::is_empty")]
    incoming: Option<EngineIncoming>,

    #[serde(skip_serializing_if = "Vec::is_empty")]
    outgoing: Vec<EngineOutgoing>, // one for each batch posted.

    #[serde(skip_serializing_if = "Option::is_none")]
    #[serde(rename = "failureReason")]
    failure: Option<SyncFailure>,

    #[serde(skip_serializing_if = "Option::is_none")]
    validation: Option<Validation>,
}

impl Engine {
    pub fn new(name: impl Into<String>) -> Self {
        Self {
            name: name.into(),
            when_took: Stopwatch::new(),
            incoming: None,
            outgoing: Vec::new(),
            failure: None,
            validation: None,
        }
    }

    pub fn incoming(&mut self, inc: EngineIncoming) {
        match &mut self.incoming {
            None => self.incoming = Some(inc),
            Some(ref mut existing) => existing.accum(&inc),
        };
    }

    // A bit hacky as we need this to report telemetry for desktop via the bridged engine.
    pub fn get_incoming(&self) -> &Option<EngineIncoming> {
        &self.incoming
    }

    pub fn outgoing(&mut self, out: EngineOutgoing) {
        self.outgoing.push(out);
    }

    pub fn failure(&mut self, err: impl Into<SyncFailure>) {
        // Currently we take the first error, under the assumption that the
        // first is the most important and all others stem from that.
        let failure = err.into();
        if self.failure.is_none() {
            self.failure = Some(failure);
        } else {
            log::warn!(
                "engine already has recorded a failure of {:?} - ignoring {:?}",
                &self.failure,
                &failure
            );
        }
    }

    pub fn validation(&mut self, v: Validation) {
        assert!(self.validation.is_none());
        self.validation = Some(v);
    }

    fn finished(&mut self) {
        self.when_took = self.when_took.finished();
    }
}

#[derive(Debug, Default, Serialize)]
pub struct Validation {
    version: u32,

    #[serde(skip_serializing_if = "Vec::is_empty")]
    problems: Vec<Problem>,

    #[serde(skip_serializing_if = "Option::is_none")]
    #[serde(rename = "failureReason")]
    failure: Option<SyncFailure>,
}

impl Validation {
    pub fn with_version(version: u32) -> Validation {
        Validation {
            version,
            ..Validation::default()
        }
    }

    pub fn problem(&mut self, name: &'static str, count: usize) -> &mut Self {
        if count > 0 {
            self.problems.push(Problem { name, count });
        }
        self
    }
}

#[derive(Debug, Default, Serialize)]
pub struct Problem {
    name: &'static str,
    #[serde(skip_serializing_if = "crate::skip_if_default")]
    count: usize,
}

#[cfg(test)]
mod engine_tests {
    use super::*;

    #[test]
    fn test_engine() {
        let mut e = Engine::new("test_engine");
        e.finished();
        assert_json(&e, serde_json::json!({"name": "test_engine", "when": 0.0}));
    }

    #[test]
    fn test_engine_not_finished() {
        let e = Engine::new("test_engine");
        serde_json::to_value(e).expect_err("unfinished stopwatch should fail");
    }

    #[test]
    fn test_incoming() {
        let mut i = EngineIncoming::new();
        i.applied(1);
        i.failed(2);
        let mut e = Engine::new("TestEngine");
        e.incoming(i);
        e.finished();
        assert_json(
            &e,
            serde_json::json!({"name": "TestEngine", "when": 0.0, "incoming": {"applied": 1, "failed": 2}}),
        );
    }

    #[test]
    fn test_incoming_accum() {
        let mut e = Engine::new("TestEngine");
        let mut i1 = EngineIncoming::new();
        i1.applied(1);
        i1.failed(2);
        e.incoming(i1);
        let mut i2 = EngineIncoming::new();
        i2.applied(1);
        i2.failed(1);
        i2.reconciled(4);
        e.incoming(i2);
        e.finished();
        assert_json(
            &e,
            serde_json::json!({"name": "TestEngine", "when": 0.0, "incoming": {"applied": 2, "failed": 3, "reconciled": 4}}),
        );
    }

    #[test]
    fn test_outgoing() {
        let mut o = EngineOutgoing::new();
        o.sent(2);
        o.failed(1);
        let mut e = Engine::new("TestEngine");
        e.outgoing(o);
        e.finished();
        assert_json(
            &e,
            serde_json::json!({"name": "TestEngine", "when": 0.0, "outgoing": [{"sent": 2, "failed": 1}]}),
        );
    }

    #[test]
    fn test_failure() {
        let mut e = Engine::new("TestEngine");
        e.failure(SyncFailure::Http { code: 500 });
        e.finished();
        assert_json(
            &e,
            serde_json::json!({"name": "TestEngine",
             "when": 0.0,
             "failureReason": {"name": "httperror", "code": 500}
            }),
        );
    }

    #[test]
    fn test_raw() {
        let mut e = Engine::new("TestEngine");
        let mut inc = EngineIncoming::new();
        inc.applied(10);
        e.incoming(inc);
        let mut out = EngineOutgoing::new();
        out.sent(1);
        e.outgoing(out);
        e.failure(SyncFailure::Http { code: 500 });
        e.finished();

        assert_eq!(e.outgoing.len(), 1);
        assert_eq!(e.incoming.as_ref().unwrap().applied, 10);
        assert_eq!(e.outgoing[0].sent, 1);
        assert!(e.failure.is_some());
        serde_json::to_string(&e).expect("should get json");
    }
}

/// A single sync. May have many engines, may have its own failure.
#[derive(Debug, Serialize, Default)]
pub struct SyncTelemetry {
    #[serde(flatten)]
    when_took: Stopwatch,

    #[serde(skip_serializing_if = "Vec::is_empty")]
    engines: Vec<Engine>,

    #[serde(skip_serializing_if = "Option::is_none")]
    #[serde(rename = "failureReason")]
    failure: Option<SyncFailure>,
}

impl SyncTelemetry {
    pub fn new() -> Self {
        Default::default()
    }

    pub fn engine(&mut self, mut e: Engine) {
        e.finished();
        self.engines.push(e);
    }

    pub fn failure(&mut self, failure: SyncFailure) {
        assert!(self.failure.is_none());
        self.failure = Some(failure);
    }

    // Note that unlike other 'finished' methods, this isn't private - someone
    // needs to explicitly call this before handling the json payload to
    // whatever ends up submitting it.
    pub fn finished(&mut self) {
        self.when_took = self.when_took.finished();
    }
}

#[cfg(test)]
mod sync_tests {
    use super::*;

    #[test]
    fn test_accum() {
        let mut s = SyncTelemetry::new();
        let mut inc = EngineIncoming::new();
        inc.applied(10);
        let mut e = Engine::new("test_engine");
        e.incoming(inc);
        e.failure(SyncFailure::Http { code: 500 });
        e.finished();
        s.engine(e);
        s.finished();

        assert_json(
            &s,
            serde_json::json!({
                "when": 0.0,
                "engines": [{
                    "name":"test_engine",
                    "when":0.0,
                    "incoming": {
                        "applied": 10
                    },
                    "failureReason": {
                        "name": "httperror",
                        "code": 500
                    }
                }]
            }),
        );
    }

    #[test]
    fn test_multi_engine() {
        let mut inc_e1 = EngineIncoming::new();
        inc_e1.applied(1);
        let mut e1 = Engine::new("test_engine");
        e1.incoming(inc_e1);

        let mut inc_e2 = EngineIncoming::new();
        inc_e2.failed(1);
        let mut e2 = Engine::new("test_engine_2");
        e2.incoming(inc_e2);
        let mut out_e2 = EngineOutgoing::new();
        out_e2.sent(1);
        e2.outgoing(out_e2);

        let mut s = SyncTelemetry::new();
        s.engine(e1);
        s.engine(e2);
        s.failure(SyncFailure::Http { code: 500 });
        s.finished();
        assert_json(
            &s,
            serde_json::json!({
                "when": 0.0,
                "engines": [{
                    "name": "test_engine",
                    "when": 0.0,
                    "incoming": {
                        "applied": 1
                    }
                },{
                    "name": "test_engine_2",
                    "when": 0.0,
                    "incoming": {
                        "failed": 1
                    },
                    "outgoing": [{
                        "sent": 1
                    }]
                }],
                "failureReason": {
                    "name": "httperror",
                    "code": 500
                }
            }),
        );
    }
}

/// The Sync ping payload, as documented at
/// https://firefox-source-docs.mozilla.org/toolkit/components/telemetry/telemetry/data/sync-ping.html.
/// May have many syncs, may have many events. However, due to the architecture
/// of apps which use these components, this payload is almost certainly not
/// suitable for submitting directly. For example, we will always return a
/// payload with exactly 1 sync, and it will not know certain other fields
/// in the payload, such as the *hashed* FxA device ID (see
/// https://searchfox.org/mozilla-central/rev/c3ebaf6de2d481c262c04bb9657eaf76bf47e2ac/services/sync/modules/browserid_identity.js#185
/// for an example of how the device ID is constructed). The intention is that
/// consumers of this will use this to create a "real" payload - eg, accumulating
/// until some threshold number of syncs is reached, and contributing
/// additional data which only the consumer knows.
#[derive(Debug, Serialize, Default)]
pub struct SyncTelemetryPing {
    version: u32,

    uid: Option<String>,

    #[serde(skip_serializing_if = "Vec::is_empty")]
    events: Vec<Event>,

    #[serde(skip_serializing_if = "Vec::is_empty")]
    syncs: Vec<SyncTelemetry>,
}

impl SyncTelemetryPing {
    pub fn new() -> Self {
        Self {
            version: 1,
            ..Default::default()
        }
    }

    pub fn uid(&mut self, uid: String) {
        if let Some(ref existing) = self.uid {
            if *existing != uid {
                log::warn!("existing uid ${} being replaced by {}", existing, uid);
            }
        }
        self.uid = Some(uid);
    }

    pub fn sync(&mut self, mut s: SyncTelemetry) {
        s.finished();
        self.syncs.push(s);
    }

    pub fn event(&mut self, e: Event) {
        self.events.push(e);
    }
}

#[cfg(test)]
mod ping_tests {
    use super::*;
    #[test]
    fn test_ping() {
        let engine = Engine::new("test");
        let mut s = SyncTelemetry::new();
        s.engine(engine);
        let mut p = SyncTelemetryPing::new();
        p.uid("user-id".into());
        p.sync(s);
        let event = Event::new("foo", "bar");
        p.event(event);
        assert_json(
            &p,
            serde_json::json!({
                "events": [{
                    "method": "bar", "object": "foo"
                }],
                "syncs": [{
                    "engines": [{
                        "name": "test", "when": 0.0
                    }],
                    "when": 0.0
                }],
                "uid": "user-id",
                "version": 1
            }),
        );
    }
}

impl From<&Error> for SyncFailure {
    fn from(e: &Error) -> SyncFailure {
        match e {
            #[cfg(feature = "sync-client")]
            Error::TokenserverHttpError(status) => {
                if *status == 401 {
                    SyncFailure::Auth {
                        from: "tokenserver",
                    }
                } else {
                    SyncFailure::Http { code: *status }
                }
            }
            #[cfg(feature = "sync-client")]
            Error::BackoffError(_) => SyncFailure::Http { code: 503 },
            #[cfg(feature = "sync-client")]
            Error::StorageHttpError(ref e) => match e {
                ErrorResponse::NotFound { .. } => SyncFailure::Http { code: 404 },
                ErrorResponse::Unauthorized { .. } => SyncFailure::Auth { from: "storage" },
                ErrorResponse::PreconditionFailed { .. } => SyncFailure::Http { code: 412 },
                ErrorResponse::ServerError { status, .. } => SyncFailure::Http { code: *status },
                ErrorResponse::RequestFailed { status, .. } => SyncFailure::Http { code: *status },
            },
            #[cfg(feature = "crypto")]
            Error::CryptoError(ref e) => SyncFailure::Unexpected {
                error: e.to_string(),
            },
            #[cfg(feature = "sync-client")]
            Error::RequestError(ref e) => SyncFailure::Unexpected {
                error: e.to_string(),
            },
            #[cfg(feature = "sync-client")]
            Error::UnexpectedStatus(ref e) => SyncFailure::Http { code: e.status },
            Error::Interrupted(ref e) => SyncFailure::Unexpected {
                error: e.to_string(),
            },
            e => SyncFailure::Other {
                error: e.to_string(),
            },
        }
    }
}

[ Dauer der Verarbeitung: 0.41 Sekunden  ]