Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  telemetry.rs   Sprache: unbekannt

 
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

//! Manage recording sync telemetry. Assumes some external telemetry
//! library/code which manages submitting.

use crate::error::Error;
#[cfg(feature = "sync-client")]
use crate::error::ErrorResponse;

use std::collections::HashMap;
use std::time;

use serde::{ser, Serialize, Serializer};

// A test helper, used by the many test modules below.
#[cfg(test)]
fn assert_json<T>(v: &T, expected: serde_json::Value)
where
    T: serde::Serialize + ?Sized,
{
    assert_eq!(
        serde_json::to_value(v).expect("should get a value"),
        expected
    );
}

/// What we record for 'when' and 'took' in a telemetry record.
#[derive(Debug, Serialize)]
struct WhenTook {
    when: f64,
    #[serde(skip_serializing_if = "crate::skip_if_default")]
    took: u64,
}

/// What we track while recording 'when' and 'took. It serializes as a WhenTook,
/// except when .finished() hasn't been called, in which case it panics.
#[allow(dead_code)]
#[derive(Debug)]
enum Stopwatch {
    Started(time::SystemTime, time::Instant),
    Finished(WhenTook),
}

impl Default for Stopwatch {
    fn default() -> Self {
        Stopwatch::new()
    }
}

impl Stopwatch {
    fn new() -> Self {
        Stopwatch::Started(time::SystemTime::now(), time::Instant::now())
    }

    // For tests we don't want real timestamps because we test against literals.
    #[cfg(test)]
    fn finished(&self) -> Self {
        Stopwatch::Finished(WhenTook { when: 0.0, took: 0 })
    }

    #[cfg(not(test))]
    fn finished(&self) -> Self {
        match self {
            Stopwatch::Started(st, si) => {
                let std = st.duration_since(time::UNIX_EPOCH).unwrap_or_default();
                let when = std.as_secs() as f64; // we don't want sub-sec accuracy. Do we need to write a float?

                let sid = si.elapsed();
                let took = sid.as_secs() * 1000 + (u64::from(sid.subsec_nanos()) / 1_000_000);
                Stopwatch::Finished(WhenTook { when, took })
            }
            _ => {
                unreachable!("can't finish twice");
            }
        }
    }
}

impl Serialize for Stopwatch {
    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        match self {
            Stopwatch::Started(_, _) => Err(ser::Error::custom("StopWatch has not been finished")),
            Stopwatch::Finished(c) => c.serialize(serializer),
        }
    }
}

#[cfg(test)]
mod stopwatch_tests {
    use super::*;

    // A wrapper struct because we flatten - this struct should serialize with
    // 'when' and 'took' keys (but with no 'sw'.)
    #[derive(Debug, Serialize)]
    struct WT {
        #[serde(flatten)]
        sw: Stopwatch,
    }

    #[test]
    fn test_not_finished() {
        let wt = WT {
            sw: Stopwatch::new(),
        };
        serde_json::to_string(&wt).expect_err("unfinished stopwatch should fail");
    }

    #[test]
    fn test() {
        assert_json(
            &WT {
                sw: Stopwatch::Finished(WhenTook { when: 1.0, took: 1 }),
            },
            serde_json::json!({"when": 1.0, "took": 1}),
        );
        assert_json(
            &WT {
                sw: Stopwatch::Finished(WhenTook { when: 1.0, took: 0 }),
            },
            serde_json::json!({"when": 1.0}),
        );
    }
}

/// A generic "Event" - suitable for all kinds of pings (although this module
/// only cares about the sync ping)
#[derive(Debug, Serialize)]
pub struct Event {
    // We use static str references as we expect values to be literals.
    object: &'static str,

    method: &'static str,

    // Maybe "value" should be a string?
    #[serde(skip_serializing_if = "Option::is_none")]
    value: Option<&'static str>,

    // we expect the keys to be literals but values are real strings.
    #[serde(skip_serializing_if = "Option::is_none")]
    extra: Option<HashMap<&'static str, String>>,
}

impl Event {
    pub fn new(object: &'static str, method: &'static str) -> Self {
        assert!(object.len() <= 20);
        assert!(method.len() <= 20);
        Self {
            object,
            method,
            value: None,
            extra: None,
        }
    }

    pub fn value(mut self, v: &'static str) -> Self {
        assert!(v.len() <= 80);
        self.value = Some(v);
        self
    }

    pub fn extra(mut self, key: &'static str, val: String) -> Self {
        assert!(key.len() <= 15);
        assert!(val.len() <= 85);
        match self.extra {
            None => self.extra = Some(HashMap::new()),
            Some(ref e) => assert!(e.len() < 10),
        }
        self.extra.as_mut().unwrap().insert(key, val);
        self
    }
}

#[cfg(test)]
mod test_events {
    use super::*;

    #[test]
    #[should_panic]
    fn test_invalid_length_ctor() {
        Event::new("A very long object value", "Method");
    }

    #[test]
    #[should_panic]
    fn test_invalid_length_extra_key() {
        Event::new("O", "M").extra("A very long key value", "v".to_string());
    }

    #[test]
    #[should_panic]
    fn test_invalid_length_extra_val() {
        let l = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ
                abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
        Event::new("O", "M").extra("k", l.to_string());
    }

    #[test]
    #[should_panic]
    fn test_too_many_extras() {
        let l = "abcdefghijk";
        let mut e = Event::new("Object", "Method");
        for i in 0..l.len() {
            e = e.extra(&l[i..=i], "v".to_string());
        }
    }

    #[test]
    fn test_json() {
        assert_json(
            &Event::new("Object", "Method").value("Value"),
            serde_json::json!({"object": "Object", "method": "Method", "value": "Value"}),
        );

        assert_json(
            &Event::new("Object", "Method").extra("one", "one".to_string()),
            serde_json::json!({"object": "Object",
             "method": "Method",
             "extra": {"one": "one"}
            }),
        )
    }
}

/// A Sync failure.
#[derive(Debug, Serialize)]
#[serde(tag = "name")]
pub enum SyncFailure {
    #[serde(rename = "shutdownerror")]
    Shutdown,

    #[serde(rename = "othererror")]
    Other { error: String },

    #[serde(rename = "unexpectederror")]
    Unexpected { error: String },

    #[serde(rename = "autherror")]
    Auth { from: &'static str },

    #[serde(rename = "httperror")]
    Http { code: u16 },
}

#[cfg(test)]
mod test {
    use super::*;

    #[test]
    fn reprs() {
        assert_json(
            &SyncFailure::Shutdown,
            serde_json::json!({"name": "shutdownerror"}),
        );

        assert_json(
            &SyncFailure::Other {
                error: "dunno".to_string(),
            },
            serde_json::json!({"name": "othererror", "error": "dunno"}),
        );

        assert_json(
            &SyncFailure::Unexpected {
                error: "dunno".to_string(),
            },
            serde_json::json!({"name": "unexpectederror", "error": "dunno"}),
        );

        assert_json(
            &SyncFailure::Auth { from: "FxA" },
            serde_json::json!({"name": "autherror", "from": "FxA"}),
        );

        assert_json(
            &SyncFailure::Http { code: 500 },
            serde_json::json!({"name": "httperror", "code": 500}),
        );
    }
}

/// Incoming record for an engine's sync
#[derive(Debug, Default, Serialize)]
pub struct EngineIncoming {
    #[serde(skip_serializing_if = "crate::skip_if_default")]
    applied: u32,

    #[serde(skip_serializing_if = "crate::skip_if_default")]
    failed: u32,

    #[serde(rename = "newFailed")]
    #[serde(skip_serializing_if = "crate::skip_if_default")]
    new_failed: u32,

    #[serde(skip_serializing_if = "crate::skip_if_default")]
    reconciled: u32,
}

impl EngineIncoming {
    pub fn new() -> Self {
        Self {
            ..Default::default()
        }
    }

    // A helper used via skip_serializing_if
    fn is_empty(inc: &Option<Self>) -> bool {
        match inc {
            Some(a) => a.applied == 0 && a.failed == 0 && a.new_failed == 0 && a.reconciled == 0,
            None => true,
        }
    }

    /// Increment the value of `applied` by `n`.
    #[inline]
    pub fn applied(&mut self, n: u32) {
        self.applied += n;
    }

    /// Increment the value of `failed` by `n`.
    #[inline]
    pub fn failed(&mut self, n: u32) {
        self.failed += n;
    }

    /// Increment the value of `new_failed` by `n`.
    #[inline]
    pub fn new_failed(&mut self, n: u32) {
        self.new_failed += n;
    }

    /// Increment the value of `reconciled` by `n`.
    #[inline]
    pub fn reconciled(&mut self, n: u32) {
        self.reconciled += n;
    }

    /// Accumulate values from another EngineIncoming - useful when dealing with
    /// incoming batches.
    fn accum(&mut self, other: &EngineIncoming) {
        self.applied += other.applied;
        self.failed += other.failed;
        self.new_failed += other.new_failed;
        self.reconciled += other.reconciled;
    }

    /// Get the value of `applied`. Mostly useful for testing.
    #[inline]
    pub fn get_applied(&self) -> u32 {
        self.applied
    }

    /// Get the value of `failed`. Mostly useful for testing.
    #[inline]
    pub fn get_failed(&self) -> u32 {
        self.failed
    }

    /// Get the value of `new_failed`. Mostly useful for testing.
    #[inline]
    pub fn get_new_failed(&self) -> u32 {
        self.new_failed
    }

    /// Get the value of `reconciled`. Mostly useful for testing.
    #[inline]
    pub fn get_reconciled(&self) -> u32 {
        self.reconciled
    }
}

/// Outgoing record for an engine's sync.
#[derive(Debug, Default, Serialize)]
pub struct EngineOutgoing {
    #[serde(skip_serializing_if = "crate::skip_if_default")]
    sent: usize,

    #[serde(skip_serializing_if = "crate::skip_if_default")]
    failed: usize,
}

impl EngineOutgoing {
    pub fn new() -> Self {
        EngineOutgoing {
            ..Default::default()
        }
    }

    #[inline]
    pub fn sent(&mut self, n: usize) {
        self.sent += n;
    }

    #[inline]
    pub fn failed(&mut self, n: usize) {
        self.failed += n;
    }
}

/// One engine's sync.
#[derive(Debug, Serialize)]
pub struct Engine {
    name: String,

    #[serde(flatten)]
    when_took: Stopwatch,

    #[serde(skip_serializing_if = "EngineIncoming::is_empty")]
    incoming: Option<EngineIncoming>,

    #[serde(skip_serializing_if = "Vec::is_empty")]
    outgoing: Vec<EngineOutgoing>, // one for each batch posted.

    #[serde(skip_serializing_if = "Option::is_none")]
    #[serde(rename = "failureReason")]
    failure: Option<SyncFailure>,

    #[serde(skip_serializing_if = "Option::is_none")]
    validation: Option<Validation>,
}

impl Engine {
    pub fn new(name: impl Into<String>) -> Self {
        Self {
            name: name.into(),
            when_took: Stopwatch::new(),
            incoming: None,
            outgoing: Vec::new(),
            failure: None,
            validation: None,
        }
    }

    pub fn incoming(&mut self, inc: EngineIncoming) {
        match &mut self.incoming {
            None => self.incoming = Some(inc),
            Some(ref mut existing) => existing.accum(&inc),
        };
    }

    // A bit hacky as we need this to report telemetry for desktop via the bridged engine.
    pub fn get_incoming(&self) -> &Option<EngineIncoming> {
        &self.incoming
    }

    pub fn outgoing(&mut self, out: EngineOutgoing) {
        self.outgoing.push(out);
    }

    pub fn failure(&mut self, err: impl Into<SyncFailure>) {
        // Currently we take the first error, under the assumption that the
        // first is the most important and all others stem from that.
        let failure = err.into();
        if self.failure.is_none() {
            self.failure = Some(failure);
        } else {
            log::warn!(
                "engine already has recorded a failure of {:?} - ignoring {:?}",
                &self.failure,
                &failure
            );
        }
    }

    pub fn validation(&mut self, v: Validation) {
        assert!(self.validation.is_none());
        self.validation = Some(v);
    }

    fn finished(&mut self) {
        self.when_took = self.when_took.finished();
    }
}

#[derive(Debug, Default, Serialize)]
pub struct Validation {
    version: u32,

    #[serde(skip_serializing_if = "Vec::is_empty")]
    problems: Vec<Problem>,

    #[serde(skip_serializing_if = "Option::is_none")]
    #[serde(rename = "failureReason")]
    failure: Option<SyncFailure>,
}

impl Validation {
    pub fn with_version(version: u32) -> Validation {
        Validation {
            version,
            ..Validation::default()
        }
    }

    pub fn problem(&mut self, name: &'static str, count: usize) -> &mut Self {
        if count > 0 {
            self.problems.push(Problem { name, count });
        }
        self
    }
}

#[derive(Debug, Default, Serialize)]
pub struct Problem {
    name: &'static str,
    #[serde(skip_serializing_if = "crate::skip_if_default")]
    count: usize,
}

#[cfg(test)]
mod engine_tests {
    use super::*;

    #[test]
    fn test_engine() {
        let mut e = Engine::new("test_engine");
        e.finished();
        assert_json(&e, serde_json::json!({"name": "test_engine", "when": 0.0}));
    }

    #[test]
    fn test_engine_not_finished() {
        let e = Engine::new("test_engine");
        serde_json::to_value(e).expect_err("unfinished stopwatch should fail");
    }

    #[test]
    fn test_incoming() {
        let mut i = EngineIncoming::new();
        i.applied(1);
        i.failed(2);
        let mut e = Engine::new("TestEngine");
        e.incoming(i);
        e.finished();
        assert_json(
            &e,
            serde_json::json!({"name": "TestEngine", "when": 0.0, "incoming": {"applied": 1, "failed": 2}}),
        );
    }

    #[test]
    fn test_incoming_accum() {
        let mut e = Engine::new("TestEngine");
        let mut i1 = EngineIncoming::new();
        i1.applied(1);
        i1.failed(2);
        e.incoming(i1);
        let mut i2 = EngineIncoming::new();
        i2.applied(1);
        i2.failed(1);
        i2.reconciled(4);
        e.incoming(i2);
        e.finished();
        assert_json(
            &e,
            serde_json::json!({"name": "TestEngine", "when": 0.0, "incoming": {"applied": 2, "failed": 3, "reconciled": 4}}),
        );
    }

    #[test]
    fn test_outgoing() {
        let mut o = EngineOutgoing::new();
        o.sent(2);
        o.failed(1);
        let mut e = Engine::new("TestEngine");
        e.outgoing(o);
        e.finished();
        assert_json(
            &e,
            serde_json::json!({"name": "TestEngine", "when": 0.0, "outgoing": [{"sent": 2, "failed": 1}]}),
        );
    }

    #[test]
    fn test_failure() {
        let mut e = Engine::new("TestEngine");
        e.failure(SyncFailure::Http { code: 500 });
        e.finished();
        assert_json(
            &e,
            serde_json::json!({"name": "TestEngine",
             "when": 0.0,
             "failureReason": {"name": "httperror", "code": 500}
            }),
        );
    }

    #[test]
    fn test_raw() {
        let mut e = Engine::new("TestEngine");
        let mut inc = EngineIncoming::new();
        inc.applied(10);
        e.incoming(inc);
        let mut out = EngineOutgoing::new();
        out.sent(1);
        e.outgoing(out);
        e.failure(SyncFailure::Http { code: 500 });
        e.finished();

        assert_eq!(e.outgoing.len(), 1);
        assert_eq!(e.incoming.as_ref().unwrap().applied, 10);
        assert_eq!(e.outgoing[0].sent, 1);
        assert!(e.failure.is_some());
        serde_json::to_string(&e).expect("should get json");
    }
}

/// A single sync. May have many engines, may have its own failure.
#[derive(Debug, Serialize, Default)]
pub struct SyncTelemetry {
    #[serde(flatten)]
    when_took: Stopwatch,

    #[serde(skip_serializing_if = "Vec::is_empty")]
    engines: Vec<Engine>,

    #[serde(skip_serializing_if = "Option::is_none")]
    #[serde(rename = "failureReason")]
    failure: Option<SyncFailure>,
}

impl SyncTelemetry {
    pub fn new() -> Self {
        Default::default()
    }

    pub fn engine(&mut self, mut e: Engine) {
        e.finished();
        self.engines.push(e);
    }

    pub fn failure(&mut self, failure: SyncFailure) {
        assert!(self.failure.is_none());
        self.failure = Some(failure);
    }

    // Note that unlike other 'finished' methods, this isn't private - someone
    // needs to explicitly call this before handling the json payload to
    // whatever ends up submitting it.
    pub fn finished(&mut self) {
        self.when_took = self.when_took.finished();
    }
}

#[cfg(test)]
mod sync_tests {
    use super::*;

    #[test]
    fn test_accum() {
        let mut s = SyncTelemetry::new();
        let mut inc = EngineIncoming::new();
        inc.applied(10);
        let mut e = Engine::new("test_engine");
        e.incoming(inc);
        e.failure(SyncFailure::Http { code: 500 });
        e.finished();
        s.engine(e);
        s.finished();

        assert_json(
            &s,
            serde_json::json!({
                "when": 0.0,
                "engines": [{
                    "name":"test_engine",
                    "when":0.0,
                    "incoming": {
                        "applied": 10
                    },
                    "failureReason": {
                        "name": "httperror",
                        "code": 500
                    }
                }]
            }),
        );
    }

    #[test]
    fn test_multi_engine() {
        let mut inc_e1 = EngineIncoming::new();
        inc_e1.applied(1);
        let mut e1 = Engine::new("test_engine");
        e1.incoming(inc_e1);

        let mut inc_e2 = EngineIncoming::new();
        inc_e2.failed(1);
        let mut e2 = Engine::new("test_engine_2");
        e2.incoming(inc_e2);
        let mut out_e2 = EngineOutgoing::new();
        out_e2.sent(1);
        e2.outgoing(out_e2);

        let mut s = SyncTelemetry::new();
        s.engine(e1);
        s.engine(e2);
        s.failure(SyncFailure::Http { code: 500 });
        s.finished();
        assert_json(
            &s,
            serde_json::json!({
                "when": 0.0,
                "engines": [{
                    "name": "test_engine",
                    "when": 0.0,
                    "incoming": {
                        "applied": 1
                    }
                },{
                    "name": "test_engine_2",
                    "when": 0.0,
                    "incoming": {
                        "failed": 1
                    },
                    "outgoing": [{
                        "sent": 1
                    }]
                }],
                "failureReason": {
                    "name": "httperror",
                    "code": 500
                }
            }),
        );
    }
}

/// The Sync ping payload, as documented at
/// https://firefox-source-docs.mozilla.org/toolkit/components/telemetry/telemetry/data/sync-ping.html.
/// May have many syncs, may have many events. However, due to the architecture
/// of apps which use these components, this payload is almost certainly not
/// suitable for submitting directly. For example, we will always return a
/// payload with exactly 1 sync, and it will not know certain other fields
/// in the payload, such as the *hashed* FxA device ID (see
/// https://searchfox.org/mozilla-central/rev/c3ebaf6de2d481c262c04bb9657eaf76bf47e2ac/services/sync/modules/browserid_identity.js#185
/// for an example of how the device ID is constructed). The intention is that
/// consumers of this will use this to create a "real" payload - eg, accumulating
/// until some threshold number of syncs is reached, and contributing
/// additional data which only the consumer knows.
#[derive(Debug, Serialize, Default)]
pub struct SyncTelemetryPing {
    version: u32,

    uid: Option<String>,

    #[serde(skip_serializing_if = "Vec::is_empty")]
    events: Vec<Event>,

    #[serde(skip_serializing_if = "Vec::is_empty")]
    syncs: Vec<SyncTelemetry>,
}

impl SyncTelemetryPing {
    pub fn new() -> Self {
        Self {
            version: 1,
            ..Default::default()
        }
    }

    pub fn uid(&mut self, uid: String) {
        if let Some(ref existing) = self.uid {
            if *existing != uid {
                log::warn!("existing uid ${} being replaced by {}", existing, uid);
            }
        }
        self.uid = Some(uid);
    }

    pub fn sync(&mut self, mut s: SyncTelemetry) {
        s.finished();
        self.syncs.push(s);
    }

    pub fn event(&mut self, e: Event) {
        self.events.push(e);
    }
}

#[cfg(test)]
mod ping_tests {
    use super::*;
    #[test]
    fn test_ping() {
        let engine = Engine::new("test");
        let mut s = SyncTelemetry::new();
        s.engine(engine);
        let mut p = SyncTelemetryPing::new();
        p.uid("user-id".into());
        p.sync(s);
        let event = Event::new("foo", "bar");
        p.event(event);
        assert_json(
            &p,
            serde_json::json!({
                "events": [{
                    "method": "bar", "object": "foo"
                }],
                "syncs": [{
                    "engines": [{
                        "name": "test", "when": 0.0
                    }],
                    "when": 0.0
                }],
                "uid": "user-id",
                "version": 1
            }),
        );
    }
}

impl From<&Error> for SyncFailure {
    fn from(e: &Error) -> SyncFailure {
        match e {
            #[cfg(feature = "sync-client")]
            Error::TokenserverHttpError(status) => {
                if *status == 401 {
                    SyncFailure::Auth {
                        from: "tokenserver",
                    }
                } else {
                    SyncFailure::Http { code: *status }
                }
            }
            #[cfg(feature = "sync-client")]
            Error::BackoffError(_) => SyncFailure::Http { code: 503 },
            #[cfg(feature = "sync-client")]
            Error::StorageHttpError(ref e) => match e {
                ErrorResponse::NotFound { .. } => SyncFailure::Http { code: 404 },
                ErrorResponse::Unauthorized { .. } => SyncFailure::Auth { from: "storage" },
                ErrorResponse::PreconditionFailed { .. } => SyncFailure::Http { code: 412 },
                ErrorResponse::ServerError { status, .. } => SyncFailure::Http { code: *status },
                ErrorResponse::RequestFailed { status, .. } => SyncFailure::Http { code: *status },
            },
            #[cfg(feature = "crypto")]
            Error::CryptoError(ref e) => SyncFailure::Unexpected {
                error: e.to_string(),
            },
            #[cfg(feature = "sync-client")]
            Error::RequestError(ref e) => SyncFailure::Unexpected {
                error: e.to_string(),
            },
            #[cfg(feature = "sync-client")]
            Error::UnexpectedStatus(ref e) => SyncFailure::Http { code: e.status },
            Error::Interrupted(ref e) => SyncFailure::Unexpected {
                error: e.to_string(),
            },
            e => SyncFailure::Other {
                error: e.to_string(),
            },
        }
    }
}

[ Dauer der Verarbeitung: 0.38 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge
 




SSL  | sichere Verbindung  | rahmenlose Ansicht  | Kontakt  | Seitenstruktur  | © 2026 JDD |