Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  mod.rs   Sprache: unbekannt

 
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! Ping collection, assembly & submission.

use std::fs::{self, create_dir_all, File};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};

use log::info;
use serde_json::{json, Value as JsonValue};

use crate::common_metric_data::{CommonMetricData, Lifetime};
use crate::metrics::{CounterMetric, DatetimeMetric, Metric, MetricType, PingType, TimeUnit};
use crate::storage::{StorageManager, INTERNAL_STORAGE};
use crate::upload::{HeaderMap, PingMetadata};
use crate::util::{get_iso_time_string, local_now_with_offset};
use crate::{Glean, Result, DELETION_REQUEST_PINGS_DIRECTORY, PENDING_PINGS_DIRECTORY};

/// Holds everything you need to store or send a ping.
pub struct Ping<'a> {
    /// The unique document id.
    pub doc_id: &'a str,
    /// The ping's name.
    pub name: &'a str,
    /// The path on the server to use when uplaoding this ping.
    pub url_path: &'a str,
    /// The payload, including `*_info` fields.
    pub content: JsonValue,
    /// The headers to upload with the payload.
    pub headers: HeaderMap,
    /// Whether the content contains {client|ping}_info sections.
    pub includes_info_sections: bool,
    /// Other pings that should be scheduled when this ping is sent.
    pub schedules_pings: Vec<String>,
}

/// Collect a ping's data, assemble it into its full payload and store it on disk.
pub struct PingMaker;

fn merge(a: &mut JsonValue, b: &JsonValue) {
    match (a, b) {
        (&mut JsonValue::Object(ref mut a), JsonValue::Object(b)) => {
            for (k, v) in b {
                merge(a.entry(k.clone()).or_insert(JsonValue::Null), v);
            }
        }
        (a, b) => {
            *a = b.clone();
        }
    }
}

impl Default for PingMaker {
    fn default() -> Self {
        Self::new()
    }
}

impl PingMaker {
    /// Creates a new [`PingMaker`].
    pub fn new() -> Self {
        Self
    }

    /// Gets, and then increments, the sequence number for a given ping.
    fn get_ping_seq(&self, glean: &Glean, storage_name: &str) -> usize {
        // Don't attempt to increase sequence number for disabled ping
        if !glean.is_ping_enabled(storage_name) {
            return 0;
        }

        // Sequence numbers are stored as a counter under a name that includes the storage name
        let seq = CounterMetric::new(CommonMetricData {
            name: format!("{}#sequence", storage_name),
            // We don't need a category, the name is already unique
            category: "".into(),
            send_in_pings: vec![INTERNAL_STORAGE.into()],
            lifetime: Lifetime::User,
            ..Default::default()
        });

        let current_seq = match StorageManager.snapshot_metric(
            glean.storage(),
            INTERNAL_STORAGE,
            &seq.meta().identifier(glean),
            seq.meta().inner.lifetime,
        ) {
            Some(Metric::Counter(i)) => i,
            _ => 0,
        };

        // Increase to next sequence id
        seq.add_sync(glean, 1);

        current_seq as usize
    }

    /// Gets the formatted start and end times for this ping and update for the next ping.
    fn get_start_end_times(
        &self,
        glean: &Glean,
        storage_name: &str,
        time_unit: TimeUnit,
    ) -> (String, String) {
        let start_time = DatetimeMetric::new(
            CommonMetricData {
                name: format!("{}#start", storage_name),
                category: "".into(),
                send_in_pings: vec![INTERNAL_STORAGE.into()],
                lifetime: Lifetime::User,
                ..Default::default()
            },
            time_unit,
        );

        // "start_time" is the time the ping was generated the last time.
        // If not available, we use the date the Glean object was initialized.
        let start_time_data = start_time
            .get_value(glean, INTERNAL_STORAGE)
            .unwrap_or_else(|| glean.start_time());
        let end_time_data = local_now_with_offset();

        // Update the start time with the current time.
        start_time.set_sync_chrono(glean, end_time_data);

        // Format the times.
        let start_time_data = get_iso_time_string(start_time_data, time_unit);
        let end_time_data = get_iso_time_string(end_time_data, time_unit);
        (start_time_data, end_time_data)
    }

    fn get_ping_info(
        &self,
        glean: &Glean,
        storage_name: &str,
        reason: Option<&str>,
        precision: TimeUnit,
    ) -> JsonValue {
        let (start_time, end_time) = self.get_start_end_times(glean, storage_name, precision);
        let mut map = json!({
            "seq": self.get_ping_seq(glean, storage_name),
            "start_time": start_time,
            "end_time": end_time,
        });

        if let Some(reason) = reason {
            map.as_object_mut()
                .unwrap() // safe unwrap, we created the object above
                .insert("reason".to_string(), JsonValue::String(reason.to_string()));
        };

        // Get the experiment data, if available.
        if let Some(experiment_data) =
            StorageManager.snapshot_experiments_as_json(glean.storage(), INTERNAL_STORAGE)
        {
            map.as_object_mut()
                .unwrap() // safe unwrap, we created the object above
                .insert("experiments".to_string(), experiment_data);
        };

        map
    }

    fn get_client_info(&self, glean: &Glean, include_client_id: bool) -> JsonValue {
        // Add the "telemetry_sdk_build", which is the glean-core version.
        let mut map = json!({
            "telemetry_sdk_build": crate::GLEAN_VERSION,
        });

        // Flatten the whole thing.
        if let Some(client_info) =
            StorageManager.snapshot_as_json(glean.storage(), "glean_client_info", true)
        {
            let client_info_obj = client_info.as_object().unwrap(); // safe unwrap, snapshot always returns an object.
            for (_key, value) in client_info_obj {
                merge(&mut map, value);
            }
        } else {
            log::warn!("Empty client info data.");
        }

        if !include_client_id {
            // safe unwrap, we created the object above
            map.as_object_mut().unwrap().remove("client_id");
        }

        json!(map)
    }

    /// Build the headers to be persisted and sent with a ping.
    ///
    /// Currently the only headers we persist are `X-Debug-ID` and `X-Source-Tags`.
    ///
    /// # Arguments
    ///
    /// * `glean` - the [`Glean`] instance to collect headers from.
    ///
    /// # Returns
    ///
    /// A map of header names to header values.
    /// Might be empty if there are no extra headers to send.
    /// ```
    fn get_headers(&self, glean: &Glean) -> HeaderMap {
        let mut headers_map = HeaderMap::new();

        if let Some(debug_view_tag) = glean.debug_view_tag() {
            headers_map.insert("X-Debug-ID".to_string(), debug_view_tag.to_string());
        }

        if let Some(source_tags) = glean.source_tags() {
            headers_map.insert("X-Source-Tags".to_string(), source_tags.join(","));
        }

        headers_map
    }

    /// Collects a snapshot for the given ping from storage and attach required meta information.
    ///
    /// # Arguments
    ///
    /// * `glean` - the [`Glean`] instance to collect data from.
    /// * `ping` - the ping to collect for.
    /// * `reason` - an optional reason code to include in the ping.
    /// * `doc_id` - the ping's unique document identifier.
    /// * `url_path` - the path on the server to upload this ping to.
    ///
    /// # Returns
    ///
    /// A fully assembled representation of the ping payload and associated metadata.
    /// If there is no data stored for the ping, `None` is returned.
    pub fn collect<'a>(
        &self,
        glean: &Glean,
        ping: &'a PingType,
        reason: Option<&str>,
        doc_id: &'a str,
        url_path: &'a str,
    ) -> Option<Ping<'a>> {
        info!("Collecting {}", ping.name());
        let database = glean.storage();

        // HACK: Only for metrics pings we add the ping timings.
        // But we want that to persist until the next metrics ping is actually sent.
        let write_samples = database.write_timings.replace(Vec::with_capacity(64));
        if !write_samples.is_empty() {
            glean
                .database_metrics
                .write_time
                .accumulate_samples_sync(glean, &write_samples);
        }

        let mut metrics_data = StorageManager.snapshot_as_json(database, ping.name(), true);

        let events_data = glean
            .event_storage()
            .snapshot_as_json(glean, ping.name(), true);

        // Due to the way the experimentation identifier could link datasets that are intentionally unlinked,
        // it will not be included in pings that specifically exclude the Glean client-id, those pings that
        // should not be sent if empty, or pings that exclude the {client|ping}_info sections wholesale.
        if (!ping.include_client_id() || !ping.send_if_empty() || !ping.include_info_sections())
            && glean.test_get_experimentation_id().is_some()
            && metrics_data.is_some()
        {
            // There is a lot of unwrapping here, but that's fine because the `if` conditions above mean that the
            // experimentation id is present in the metrics.
            let metrics = metrics_data.as_mut().unwrap().as_object_mut().unwrap();
            let metrics_count = metrics.len();
            let strings = metrics.get_mut("string").unwrap().as_object_mut().unwrap();
            let string_count = strings.len();

            // Handle the send_if_empty case by checking if the experimentation id is the only metric in the data.
            let empty_payload = events_data.is_none() && metrics_count == 1 && string_count == 1;
            if !ping.include_client_id() || (!ping.send_if_empty() && empty_payload) {
                strings.remove("glean.client.annotation.experimentation_id");
            }

            if strings.is_empty() {
                metrics.remove("string");
            }

            if metrics.is_empty() {
                metrics_data = None;
            }
        }

        let is_empty = metrics_data.is_none() && events_data.is_none();
        if !ping.send_if_empty() && is_empty {
            info!("Storage for {} empty. Bailing out.", ping.name());
            return None;
        } else if ping.name() == "events" && events_data.is_none() {
            info!("No events for 'events' ping. Bailing out.");
            return None;
        } else if is_empty {
            info!(
                "Storage for {} empty. Ping will still be sent.",
                ping.name()
            );
        }

        let precision = if ping.precise_timestamps() {
            TimeUnit::Millisecond
        } else {
            TimeUnit::Minute
        };

        let mut json = if ping.include_info_sections() {
            let ping_info = self.get_ping_info(glean, ping.name(), reason, precision);
            let client_info = self.get_client_info(glean, ping.include_client_id());

            json!({
                "ping_info": ping_info,
                "client_info": client_info
            })
        } else {
            json!({})
        };

        let json_obj = json.as_object_mut()?;
        if let Some(metrics_data) = metrics_data {
            json_obj.insert("metrics".to_string(), metrics_data);
        }
        if let Some(events_data) = events_data {
            json_obj.insert("events".to_string(), events_data);
        }

        Some(Ping {
            content: json,
            name: ping.name(),
            doc_id,
            url_path,
            headers: self.get_headers(glean),
            includes_info_sections: ping.include_info_sections(),
            schedules_pings: ping.schedules_pings().to_vec(),
        })
    }

    /// Gets the path to a directory for ping storage.
    ///
    /// The directory will be created inside the `data_path`.
    /// The `pings` directory (and its parents) is created if it does not exist.
    fn get_pings_dir(&self, data_path: &Path, ping_type: Option<&str>) -> std::io::Result<PathBuf> {
        // Use a special directory for deletion-request pings
        let pings_dir = match ping_type {
            Some("deletion-request") => data_path.join(DELETION_REQUEST_PINGS_DIRECTORY),
            _ => data_path.join(PENDING_PINGS_DIRECTORY),
        };

        create_dir_all(&pings_dir)?;
        Ok(pings_dir)
    }

    /// Gets path to a directory for temporary storage.
    ///
    /// The directory will be created inside the `data_path`.
    /// The `tmp` directory (and its parents) is created if it does not exist.
    fn get_tmp_dir(&self, data_path: &Path) -> std::io::Result<PathBuf> {
        let pings_dir = data_path.join("tmp");
        create_dir_all(&pings_dir)?;
        Ok(pings_dir)
    }

    /// Stores a ping to disk in the pings directory.
    pub fn store_ping(&self, data_path: &Path, ping: &Ping) -> std::io::Result<()> {
        let pings_dir = self.get_pings_dir(data_path, Some(ping.name))?;
        let temp_dir = self.get_tmp_dir(data_path)?;

        // Write to a temporary location and then move when done,
        // for transactional writes.
        let temp_ping_path = temp_dir.join(ping.doc_id);
        let ping_path = pings_dir.join(ping.doc_id);

        log::debug!(
            "Storing ping '{}' at '{}'",
            ping.doc_id,
            ping_path.display()
        );

        {
            let mut file = File::create(&temp_ping_path)?;
            file.write_all(ping.url_path.as_bytes())?;
            file.write_all(b"\n")?;
            file.write_all(::serde_json::to_string(&ping.content)?.as_bytes())?;
            file.write_all(b"\n")?;
            let metadata = PingMetadata {
                // We don't actually need to clone the headers except to match PingMetadata's ownership.
                // But since we're going to write a file to disk in a sec,
                // and HeaderMaps tend to have only like two things in them, tops,
                // the cost is bearable.
                headers: Some(ping.headers.clone()),
                body_has_info_sections: Some(ping.includes_info_sections),
                ping_name: Some(ping.name.to_string()),
            };
            file.write_all(::serde_json::to_string(&metadata)?.as_bytes())?;
        }

        if let Err(e) = std::fs::rename(&temp_ping_path, &ping_path) {
            log::warn!(
                "Unable to move '{}' to '{}",
                temp_ping_path.display(),
                ping_path.display()
            );
            return Err(e);
        }

        Ok(())
    }

    /// Clears any pending pings in the queue.
    pub fn clear_pending_pings(&self, data_path: &Path, ping_names: &[&str]) -> Result<()> {
        let pings_dir = self.get_pings_dir(data_path, None)?;

        // TODO(bug 1932909): Refactor this into its own function
        // and share it with `upload::directory`.
        let entries = pings_dir.read_dir()?;
        for entry in entries.filter_map(|entry| entry.ok()) {
            if let Ok(file_type) = entry.file_type() {
                if !file_type.is_file() {
                    continue;
                }
            } else {
                continue;
            }

            let file = match File::open(entry.path()) {
                Ok(file) => file,
                Err(_) => {
                    continue;
                }
            };

            let mut lines = BufReader::new(file).lines();
            if let (Some(Ok(path)), Some(Ok(_body)), Ok(metadata)) =
                (lines.next(), lines.next(), lines.next().transpose())
            {
                let PingMetadata { ping_name, .. } = metadata
                    .and_then(|m| crate::upload::process_metadata(&path, &m))
                    .unwrap_or_default();
                let ping_name =
                    ping_name.unwrap_or_else(|| path.split('/').nth(3).unwrap_or("").into());

                if ping_names.contains(&&ping_name[..]) {
                    _ = fs::remove_file(entry.path());
                }
            } else {
                continue;
            }
        }

        log::debug!("All pending pings deleted");

        Ok(())
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use crate::tests::new_glean;

    #[test]
    fn sequence_numbers_should_be_reset_when_toggling_uploading() {
        let (mut glean, _t) = new_glean(None);
        let ping_maker = PingMaker::new();

        assert_eq!(0, ping_maker.get_ping_seq(&glean, "store1"));
        assert_eq!(1, ping_maker.get_ping_seq(&glean, "store1"));

        glean.set_upload_enabled(false);
        assert_eq!(0, ping_maker.get_ping_seq(&glean, "store1"));
        assert_eq!(0, ping_maker.get_ping_seq(&glean, "store1"));

        glean.set_upload_enabled(true);
        assert_eq!(0, ping_maker.get_ping_seq(&glean, "store1"));
        assert_eq!(1, ping_maker.get_ping_seq(&glean, "store1"));
    }
}

[ Dauer der Verarbeitung: 0.25 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge