Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  scheduler.rs   Sprache: unbekannt

 
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! # Metrics Ping Scheduler
//!
//! The Metrics Ping Scheduler (MPS) is responsible for scheduling "metrics" pings.
//! It implements the spec described in
//! [the docs](https://mozilla.github.io/glean/book/user/pings/metrics.html#scheduling)

use crate::metrics::{DatetimeMetric, StringMetric, TimeUnit};
use crate::storage::INTERNAL_STORAGE;
use crate::util::local_now_with_offset;
use crate::{CommonMetricData, Glean, Lifetime};
use chrono::prelude::*;
use chrono::Duration;
use once_cell::sync::Lazy;
use std::sync::{Arc, Condvar, Mutex};
use std::thread::JoinHandle;

const SCHEDULED_HOUR: u32 = 4;

// Clippy thinks an AtomicBool would be preferred, but Condvar requires a full Mutex.
// See https://github.com/rust-lang/rust-clippy/issues/1516
#[allow(clippy::mutex_atomic)]
static TASK_CONDVAR: Lazy<Arc<(Mutex<bool>, Condvar)>> =
    Lazy::new(|| Arc::new((Mutex::new(false), Condvar::new())));

/// Describes the interface for a submitter of "metrics" pings.
/// Used to decouple the implementation so we can test it.
trait MetricsPingSubmitter {
    /// Submits a metrics ping, updating the last sent time to `now`
    /// (which might not be _right now_ due to processing delays (or in tests))
    fn submit_metrics_ping(&self, glean: &Glean, reason: Option<&str>, now: DateTime<FixedOffset>);
}

/// Describes the interface for a scheduler of "metrics" pings.
/// Used to decouple the implementation so we can test it.
trait MetricsPingScheduler {
    /// Begins a recurring schedule of "metrics" ping submissions, on another thread.
    /// `now` is used with `when` to determine the first schedule interval and
    /// may not be _right now_ due to processing delays (or in tests).
    fn start_scheduler(
        &self,
        submitter: impl MetricsPingSubmitter + Send + 'static,
        now: DateTime<FixedOffset>,
        when: When,
    );
}

/// Uses Glean to submit "metrics" pings directly.
struct GleanMetricsPingSubmitter {}
impl MetricsPingSubmitter for GleanMetricsPingSubmitter {
    fn submit_metrics_ping(&self, glean: &Glean, reason: Option<&str>, now: DateTime<FixedOffset>) {
        glean.submit_ping_by_name("metrics", reason);
        // Always update the collection date, irrespective of the ping being sent.
        get_last_sent_time_metric().set_sync_chrono(glean, now);
    }
}

/// Schedule "metrics" pings directly using the default behaviour.
struct GleanMetricsPingScheduler {}
impl MetricsPingScheduler for GleanMetricsPingScheduler {
    fn start_scheduler(
        &self,
        submitter: impl MetricsPingSubmitter + Send + 'static,
        now: DateTime<FixedOffset>,
        when: When,
    ) {
        start_scheduler(submitter, now, when);
    }
}

/// Performs startup checks to decide when to schedule the next "metrics" ping collection.
/// **Must** be called before draining the preinit queue.
/// (We're at the Language Bindings' mercy for that)
pub fn schedule(glean: &Glean) {
    let now = local_now_with_offset();

    let (cancelled_lock, _condvar) = &**TASK_CONDVAR;
    if *cancelled_lock.lock().unwrap() {
        log::debug!("Told to schedule, but already cancelled. Are we in a test?");
    }
    *cancelled_lock.lock().unwrap() = false; // Uncancel the thread.

    let submitter = GleanMetricsPingSubmitter {};
    let scheduler = GleanMetricsPingScheduler {};

    schedule_internal(glean, submitter, scheduler, now)
}

/// Tells the scheduler task to exit quickly and cleanly.
pub fn cancel() {
    let (cancelled_lock, condvar) = &**TASK_CONDVAR; // One `*` for Lazy, the second for Arc
    *cancelled_lock.lock().unwrap() = true; // Cancel the scheduler thread.
    condvar.notify_all(); // Notify any/all listening schedulers to check whether they were cancelled.
}

fn schedule_internal(
    glean: &Glean,
    submitter: impl MetricsPingSubmitter + Send + 'static,
    scheduler: impl MetricsPingScheduler,
    now: DateTime<FixedOffset>,
) {
    let last_sent_build_metric = get_last_sent_build_metric();
    if let Some(last_sent_build) = last_sent_build_metric.get_value(glean, Some(INTERNAL_STORAGE)) {
        // If `app_build` is longer than StringMetric's max length, we will always
        // treat it as a changed build when really it isn't.
        // This will be externally-observable as InvalidOverflow errors on both the core
        // `client_info.app_build` metric and the scheduler's internal metric.
        if last_sent_build != glean.app_build {
            last_sent_build_metric.set_sync(glean, &glean.app_build);
            log::info!("App build changed. Sending 'metrics' ping");
            submitter.submit_metrics_ping(glean, Some("upgrade"), now);
            scheduler.start_scheduler(submitter, now, When::Reschedule);
            return;
        }
    } else {
        // No value in last_sent_build. Better set one.
        last_sent_build_metric.set_sync(glean, &glean.app_build);
    }

    let last_sent_time = get_last_sent_time_metric().get_value(glean, INTERNAL_STORAGE);
    if let Some(last_sent) = last_sent_time {
        log::info!("The 'metrics' ping was last sent on {}", last_sent);
    }

    // We aim to cover 3 cases here:
    //
    // 1. The ping was already collected on the current calendar day;
    //    only schedule one for collection on the next calendar day at the due time.
    // 2. The ping was NOT collected on the current calendar day AND we're later
    //    than today's due time; collect the ping immediately.
    // 3. The ping was NOT collected on the current calendar day BUT we still have
    //    some time to the due time; schedule for submitting the current calendar day.

    let already_sent_today = last_sent_time.is_some_and(|d| d.date() == now.date());
    if already_sent_today {
        // Case #1
        log::info!("The 'metrics' ping was already sent today, {}", now);
        scheduler.start_scheduler(submitter, now, When::Tomorrow);
    } else if now > now.date().and_hms(SCHEDULED_HOUR, 0, 0) {
        // Case #2
        log::info!("Sending the 'metrics' ping immediately, {}", now);
        submitter.submit_metrics_ping(glean, Some("overdue"), now);
        scheduler.start_scheduler(submitter, now, When::Reschedule);
    } else {
        // Case #3
        log::info!("The 'metrics' collection is scheduled for today, {}", now);
        scheduler.start_scheduler(submitter, now, When::Today);
    }
}

/// "metrics" ping scheduling deadlines.
#[derive(Debug, PartialEq)]
enum When {
    Today,
    Tomorrow,
    Reschedule,
}

impl When {
    /// Returns the duration from now until our deadline.
    /// Note that std::time::Duration doesn't do negative time spans, so if
    /// our deadline has passed, this will return zero.
    fn until(&self, now: DateTime<FixedOffset>) -> std::time::Duration {
        let fire_date = match self {
            Self::Today => now.date().and_hms(SCHEDULED_HOUR, 0, 0),
            // Doesn't actually save us from being an hour off on DST because
            // chrono doesn't know when DST changes. : (
            Self::Tomorrow | Self::Reschedule => {
                (now.date() + Duration::days(1)).and_hms(SCHEDULED_HOUR, 0, 0)
            }
        };
        // After rust-lang/rust#73544 can use std::time::Duration::ZERO
        (fire_date - now)
            .to_std()
            .unwrap_or_else(|_| std::time::Duration::from_millis(0))
    }

    /// The "metrics" ping reason corresponding to our deadline.
    fn reason(&self) -> &'static str {
        match self {
            Self::Today => "today",
            Self::Tomorrow => "tomorrow",
            Self::Reschedule => "reschedule",
        }
    }
}

fn start_scheduler(
    submitter: impl MetricsPingSubmitter + Send + 'static,
    now: DateTime<FixedOffset>,
    when: When,
) -> JoinHandle<()> {
    let pair = Arc::clone(&TASK_CONDVAR);
    std::thread::Builder::new()
        .name("glean.mps".into())
        .spawn(move || {
            let (cancelled_lock, condvar) = &*pair;
            let mut when = when;
            let mut now = now;
            loop {
                let dur = when.until(now);
                log::info!("Scheduling for {} after {:?}, reason {:?}", now, dur, when);
                let mut timed_out = false;
                {
                    match condvar.wait_timeout_while(cancelled_lock.lock().unwrap(), dur, |cancelled| !*cancelled) {
                        Err(err) => {
                            log::warn!("Condvar wait failure. MPS exiting. {}", err);
                            break;
                        }
                        Ok((cancelled, wait_result)) => {
                            if *cancelled {
                                log::info!("Metrics Ping Scheduler cancelled. Exiting.");
                                break;
                            } else if wait_result.timed_out() {
                                // Can't get the global glean while holding cancelled's lock.
                                timed_out = true;
                            } else {
                                // This should be impossible. `cancelled_lock` is acquired, and
                                // `!*cancelled` is checked by the condvar before it is allowed
                                // to return from `wait_timeout_while` (I checked).
                                // So `Ok(_)` implies `*cancelled || wait_result.timed_out`.
                                log::warn!("Spurious wakeup of the MPS condvar should be impossible.");
                            }
                        }
                    }
                }
                // Safety:
                // We are okay dropping the condvar's cancelled lock here because it only guards
                // whether we're cancelled, and we've established that we weren't when we timed out.
                // We might _now_ be cancelled at any time, in which case when we loop back over
                // we'll immediately exit. But first we need to submit our "metrics" ping.
                if timed_out {
                    log::info!("Time to submit our metrics ping, {:?}", when);
                    let glean = crate::core::global_glean().expect("Global Glean not present when trying to send scheduled 'metrics' ping?!").lock().unwrap();
                    submitter.submit_metrics_ping(&glean, Some(when.reason()), now);
                    when = When::Reschedule;
                }
                now = local_now_with_offset();
            }
        }).expect("Unable to spawn Metrics Ping Scheduler thread.")
}

fn get_last_sent_time_metric() -> DatetimeMetric {
    DatetimeMetric::new(
        CommonMetricData {
            name: "last_sent_time".into(),
            category: "mps".into(),
            send_in_pings: vec![INTERNAL_STORAGE.into()],
            lifetime: Lifetime::User,
            ..Default::default()
        },
        TimeUnit::Minute,
    )
}

fn get_last_sent_build_metric() -> StringMetric {
    StringMetric::new(CommonMetricData {
        name: "last_sent_build".into(),
        category: "mps".into(),
        send_in_pings: vec![INTERNAL_STORAGE.into()],
        lifetime: Lifetime::User,
        ..Default::default()
    })
}

#[cfg(test)]
mod test {
    use super::*;
    use crate::tests::new_glean;
    use std::sync::atomic::{AtomicU32, Ordering};

    struct ValidatingSubmitter<F: Fn(DateTime<FixedOffset>, Option<&str>)> {
        submit_validator: F,
        validator_run_count: Arc<AtomicU32>,
    }
    struct ValidatingScheduler<F: Fn(DateTime<FixedOffset>, When)> {
        schedule_validator: F,
        validator_run_count: Arc<AtomicU32>,
    }
    impl<F: Fn(DateTime<FixedOffset>, Option<&str>)> MetricsPingSubmitter for ValidatingSubmitter<F> {
        fn submit_metrics_ping(
            &self,
            _glean: &Glean,
            reason: Option<&str>,
            now: DateTime<FixedOffset>,
        ) {
            (self.submit_validator)(now, reason);
            self.validator_run_count.fetch_add(1, Ordering::Relaxed);
        }
    }
    impl<F: Fn(DateTime<FixedOffset>, When)> MetricsPingScheduler for ValidatingScheduler<F> {
        fn start_scheduler(
            &self,
            _submitter: impl MetricsPingSubmitter + Send + 'static,
            now: DateTime<FixedOffset>,
            when: When,
        ) {
            (self.schedule_validator)(now, when);
            self.validator_run_count.fetch_add(1, Ordering::Relaxed);
        }
    }

    fn new_proxies<
        F1: Fn(DateTime<FixedOffset>, Option<&str>),
        F2: Fn(DateTime<FixedOffset>, When),
    >(
        submit_validator: F1,
        schedule_validator: F2,
    ) -> (
        ValidatingSubmitter<F1>,
        Arc<AtomicU32>,
        ValidatingScheduler<F2>,
        Arc<AtomicU32>,
    ) {
        let submitter_count = Arc::new(AtomicU32::new(0));
        let submitter = ValidatingSubmitter {
            submit_validator,
            validator_run_count: Arc::clone(&submitter_count),
        };
        let scheduler_count = Arc::new(AtomicU32::new(0));
        let scheduler = ValidatingScheduler {
            schedule_validator,
            validator_run_count: Arc::clone(&scheduler_count),
        };
        (submitter, submitter_count, scheduler, scheduler_count)
    }

    // Ensure on first run that we actually set the last sent build metric.
    // (and that we send an "overdue" ping if it's after the scheduled hour)
    #[test]
    fn first_run_last_sent_build() {
        let (mut glean, _t) = new_glean(None);

        glean.app_build = "a build".into();
        let lsb_metric = get_last_sent_build_metric();
        assert_eq!(None, lsb_metric.get_value(&glean, Some(INTERNAL_STORAGE)));

        let fake_now = FixedOffset::east(0)
            .ymd(2022, 11, 15)
            .and_hms(SCHEDULED_HOUR, 0, 1);

        let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
            |_, reason| assert_eq!(reason, Some("overdue")),
            |_, when| assert_eq!(when, When::Reschedule),
        );

        schedule_internal(&glean, submitter, scheduler, fake_now);
        assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
        assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));

        assert_eq!(
            Some(glean.app_build.to_string()),
            lsb_metric.get_value(&glean, Some(INTERNAL_STORAGE))
        );
    }

    // Ensure that if we have a different build, we immediately submit an "upgrade" ping
    // and schedule a "reschedule" ping for tomorrow.
    #[test]
    fn different_app_builds_submit_and_reschedule() {
        let (mut glean, _t) = new_glean(None);

        glean.app_build = "a build".into();
        get_last_sent_build_metric().set_sync(&glean, "a different build");

        let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
            |_, reason| assert_eq!(reason, Some("upgrade")),
            |_, when| assert_eq!(when, When::Reschedule),
        );

        schedule_internal(&glean, submitter, scheduler, local_now_with_offset());
        assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
        assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
    }

    // If we've already sent a ping today, ensure we don't send a ping but we
    // do schedule a ping for tomorrow. ("Case #1" in schedule_internal)
    #[test]
    fn case_1_no_submit_but_schedule_tomorrow() {
        let (glean, _t) = new_glean(None);

        let fake_now = FixedOffset::east(0).ymd(2021, 4, 30).and_hms(14, 36, 14);
        get_last_sent_time_metric().set_sync_chrono(&glean, fake_now);

        let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
            |_, reason| panic!("Case #1 shouldn't submit a ping! reason: {:?}", reason),
            |_, when| assert_eq!(when, When::Tomorrow),
        );
        schedule_internal(&glean, submitter, scheduler, fake_now);
        assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
        assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
    }

    // If we haven't sent a ping today and we're after the scheduled time,
    // ensure we send a ping and then schedule a "reschedule" ping for tomorrow.
    // ("Case #2" in schedule_internal)
    #[test]
    fn case_2_submit_ping_and_reschedule() {
        let (glean, _t) = new_glean(None);

        let fake_yesterday = FixedOffset::east(0)
            .ymd(2021, 4, 29)
            .and_hms(SCHEDULED_HOUR, 0, 1);
        get_last_sent_time_metric().set_sync_chrono(&glean, fake_yesterday);
        let fake_now = fake_yesterday + Duration::days(1);

        let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
            |_, reason| assert_eq!(reason, Some("overdue")),
            |_, when| assert_eq!(when, When::Reschedule),
        );
        schedule_internal(&glean, submitter, scheduler, fake_now);
        assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
        assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
    }

    // If we haven't sent a ping today and we're before the scheduled time,
    // ensure we don't send a ping but schedule a "today" ping for today.
    // ("Case #3" in schedule_internal)
    #[test]
    fn case_3_no_submit_but_schedule_today() {
        let (glean, _t) = new_glean(None);

        let fake_yesterday =
            FixedOffset::east(0)
                .ymd(2021, 4, 29)
                .and_hms(SCHEDULED_HOUR - 1, 0, 1);
        get_last_sent_time_metric().set_sync_chrono(&glean, fake_yesterday);
        let fake_now = fake_yesterday + Duration::days(1);

        let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
            |_, reason| panic!("Case #3 shouldn't submit a ping! reason: {:?}", reason),
            |_, when| assert_eq!(when, When::Today),
        );
        schedule_internal(&glean, submitter, scheduler, fake_now);
        assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
        assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
    }

    // `When` is responsible for date math. Let's make sure it's correct.
    #[test]
    fn when_gets_at_least_some_date_math_correct() {
        let now = FixedOffset::east(0).ymd(2021, 4, 30).and_hms(15, 2, 10);
        // `now` is after `SCHEDULED_HOUR` so should be zero:
        assert_eq!(std::time::Duration::from_secs(0), When::Today.until(now));
        // If we bring it back before `SCHEDULED_HOUR` it should give us the duration:
        let earlier = now.date().and_hms(SCHEDULED_HOUR - 1, 0, 0);
        assert_eq!(
            std::time::Duration::from_secs(3600),
            When::Today.until(earlier)
        );

        // `Tomorrow` and `Reschedule` should differ only in their `reason()`
        // 46670s is 12h57m10s (aka, the time from 15:02:10 to 04:00:00
        // (when the timezone doesn't change between them)).
        assert_eq!(
            std::time::Duration::from_secs(46670),
            When::Tomorrow.until(now)
        );
        assert_eq!(
            std::time::Duration::from_secs(46670),
            When::Reschedule.until(now)
        );
        assert_eq!(When::Tomorrow.until(now), When::Reschedule.until(now));
        assert_ne!(When::Tomorrow.reason(), When::Reschedule.reason());
    }

    // Scheduler tests mutate global state and thus must not be run in parallel.
    // Otherwise one test could cancel the other.
    // This Mutex aims to solve that.
    static SCHEDULER_TEST_MUTEX: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));

    // The scheduler has been designed to be cancellable. Can we cancel it?
    #[test]
    fn cancellable_tasks_can_be_cancelled() {
        // First and foremost, all scheduler tests must ensure they start uncancelled.
        // Perils of having shared state.
        let _test_lock = SCHEDULER_TEST_MUTEX.lock().unwrap();
        let (cancelled_lock, _condvar) = &**TASK_CONDVAR; // One `*` for Lazy, the second for Arc
        *cancelled_lock.lock().unwrap() = false;

        // Pick a time at least two hours from the next scheduled submission.
        // (So that this test will time out if cancellation fails).
        let now = FixedOffset::east(0)
            .ymd(2021, 4, 30)
            .and_hms(SCHEDULED_HOUR - 2, 0, 0);

        let proxy_factory = || {
            new_proxies(
                |_, reason| {
                    panic!(
                        "Shouldn't submit when testing scheduler. reason: {:?}",
                        reason
                    )
                },
                |_, _| panic!("Not even using the scheduler this time."),
            )
        };

        // Test Today.
        let (submitter, submitter_count, _, _) = proxy_factory();
        let handle = start_scheduler(submitter, now, When::Today);
        super::cancel();
        handle.join().unwrap(); // Should complete immediately.
        assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));

        // Test Tomorrow.
        let (submitter, submitter_count, _, _) = proxy_factory();
        *cancelled_lock.lock().unwrap() = false; // Uncancel.
        let handle = start_scheduler(submitter, now, When::Tomorrow);
        super::cancel();
        handle.join().unwrap(); // Should complete immediately.
        assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));

        // Test Reschedule.
        let (submitter, submitter_count, _, _) = proxy_factory();
        *cancelled_lock.lock().unwrap() = false; // Uncancel.
        let handle = start_scheduler(submitter, now, When::Reschedule);
        super::cancel();
        handle.join().unwrap(); // Should complete immediately.
        assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
    }

    // We're not keen to wait like the scheduler is, but we can test a quick schedule.
    #[test]
    fn immediate_task_runs_immediately() {
        // First and foremost, all scheduler tests must ensure they start uncancelled.
        // Perils of having shared state.
        let _test_lock = SCHEDULER_TEST_MUTEX.lock().unwrap();
        let (cancelled_lock, _condvar) = &**TASK_CONDVAR; // One `*` for Lazy, the second for Arc
        *cancelled_lock.lock().unwrap() = false;

        // We're actually going to submit a ping from the scheduler, which requires a global glean.
        let (glean, _t) = new_glean(None);
        assert!(
            !glean.schedule_metrics_pings,
            "Real schedulers not allowed in tests!"
        );
        assert!(crate::core::setup_glean(glean).is_ok());

        // We're choosing a time after SCHEDULED_HOUR so `When::Today` will give us a duration of 0.
        let now = FixedOffset::east(0).ymd(2021, 4, 20).and_hms(15, 42, 0);

        let (submitter, submitter_count, _, _) = new_proxies(
            move |_, reason| {
                assert_eq!(reason, Some("today"));
                // After submitting the ping we expect, let's cancel this scheduler so the thread exits.
                // (But do it on another thread because the condvar loop is currently holding `cancelled`'s mutex)
                std::thread::spawn(super::cancel);
            },
            |_, _| panic!("Not using the scheduler this time."),
        );

        let handle = start_scheduler(submitter, now, When::Today);
        handle.join().unwrap();
        assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
    }
}

[ Dauer der Verarbeitung: 0.43 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....
    

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge