Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/JAVA/Threema/domain/libthreema/lib/src/csp_e2e/     Datei vom 25.3.2026 mit Größe 6 kB image not shown  

Quelle  reflect.rs   Sprache: unbekannt

 
Spracherkennung für: .rs vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]

//! Task for reflecting messages.
use const_format::formatcp;
use libthreema_macros::Name;
use tracing::{error, warn};

use crate::{
    common::Nonce,
    crypto::aead::AeadRandomNonceAhead as _,
    csp_e2e::{CspE2eProtocolError, D2xContext, InternalErrorCause, ReflectId},
    model::message::{MessageLifetime, MessageProperties},
    protobuf::{self},
    utils::{debug::Name as _, protobuf::PaddedMessage as _},
};

/// 1. Let `reflect-ids` be the list of all reflect IDs of `reflect_messages` that do not have the _ephemeral_
///    flag.
/// 2. Reflect each message of `reflect_messages` with the provided flags and ID as a `reflect` message.
/// 3. Wait until all `reflect-ids` have been acknowledged by a corresponding `reflect-ack` message, provide
///    the associated task with the `reflect-ids` as a [`ReflectResponse`] and poll again.
pub struct ReflectInstruction {
    /// Messages that need to be reflected.
    pub reflect_messages: Vec<ReflectPayload>,
}

/// Possible response for an [`ReflectInstruction`].
pub struct ReflectResponse {
    /// Acknowledgements for reflected messages.
    pub acknowledged_reflect_ids: Vec<ReflectId>,
}

/// Flags applied when reflecting/reflected
#[derive(Default)]
pub struct ReflectFlags {
    /// If `true`, the reflected message is only transmitted to other devices currently online.
    pub ephemeral: bool,
}
impl From<&MessageProperties> for ReflectFlags {
    fn from(properties: &MessageProperties) -> Self {
        Self {
            // Here's the rationale on why `MessageLifetime::Brief` does not also map to
            // _ephemeral_: A _short-lived_ (aka _brief_) message should either be received by no
            // device (if dropped) or all devices. This is because _short-lived_ is applied to
            // messages that affect the conversation history, e.g. a `call-offer` that would display
            // an attempted call. However, _ephemeral_ messages are not allowed to alter the
            // conversation history, at least not permanently (e.g. the _typing_ indicator).
            ephemeral: matches!(properties.lifetime, MessageLifetime::Ephemeral),
        }
    }
}

/// A message that should be reflected or has been reflected.
///
/// TODO(LIB-16): Add a `From` for the D2M layer reflect payload / align with those structs
pub struct ReflectPayload {
    /// Flags used for the reflected message.
    pub flags: ReflectFlags,

    /// ID used for the reflected message (only needed for acknowledgement).
    pub id: ReflectId,

    /// Enclosed encrypted D2D `Envelope`.
    pub envelope: Vec<u8>,
}
impl ReflectPayload {
    pub(super) fn encode_and_encrypt(
        d2x_context: &mut D2xContext,
        flags: ReflectFlags,
        content: protobuf::d2d::envelope::Content,
    ) -> Result<(Self, Nonce), CspE2eProtocolError> {
        let mut envelope = protobuf::d2d::Envelope {
            #[expect(deprecated, reason = "Will be filled by encode_to_vec_padded")]
            padding: vec![],
            device_id: d2x_context.device_id.0,
            protocol_version: protobuf::d2d::ProtocolVersion::V03 as u32,
            content: Some(content),
        }
        .encode_to_vec_padded();
        let nonce = d2x_context
            .device_group_key
            .reflect_key()
            .0
            .encrypt_in_place_random_nonce_ahead(b"", &mut envelope)
            .map_err(|_| InternalErrorCause::EncryptionFailed {
                name: protobuf::d2d::Envelope::NAME,
            })?;
        Ok((
            Self {
                flags,
                id: d2x_context.reflect_id.0.get_and_increment()?.into(),
                envelope,
            },
            nonce.into(),
        ))
    }
}

/// Subtask for reflecting a bundled list of messages and awaiting acknowledgement.
#[derive(Debug, Name)]
pub(crate) struct ReflectSubtask {
    reflect_ids: Vec<ReflectId>,
    acknowledged_reflect_ids: Option<Vec<ReflectId>>,
}
impl ReflectSubtask {
    pub(crate) fn new(reflect_messages: Vec<ReflectPayload>) -> (Self, ReflectInstruction) {
        (
            Self {
                // Only expect acknowledgements from reflection messages that are not ephemeral
                reflect_ids: reflect_messages
                    .iter()
                    .filter_map(|reflect_message| {
                        if reflect_message.flags.ephemeral {
                            None
                        } else {
                            Some(reflect_message.id)
                        }
                    })
                    .collect(),
                acknowledged_reflect_ids: None,
            },
            ReflectInstruction { reflect_messages },
        )
    }

    #[tracing::instrument(skip_all, fields(?self))]
    pub(crate) fn poll(mut self) -> Result<(), CspE2eProtocolError> {
        // Ensure the caller provided the acknowledged reflect IDs
        let Some(mut acknowledged_reflect_ids) = self.acknowledged_reflect_ids else {
            return Err(CspE2eProtocolError::InvalidState(formatcp!(
                "Acknowledged reflect IDs were not provided for '{}' state",
                ReflectSubtask::NAME,
            )));
        };

        // Ensure all reflect IDs have been acknowledged.
        //
        // Note: Input is expected to be small, so a linear search is fine.
        self.reflect_ids.retain(|reflect_id| {
            match acknowledged_reflect_ids
                .iter()
                .position(|other_reflect_id| other_reflect_id == reflect_id)
            {
                Some(index) => {
                    // Note: `swap_remove` is faster but we actually care about the order of the
                    // remaining acknowledged reflect IDs for debugging purposes
                    let _ = acknowledged_reflect_ids.remove(index);
                    false
                },
                None => true,
            }
        });

        // Check if there are reflect IDs that have not been acknowledged
        if !self.reflect_ids.is_empty() {
            let message = "Desync likely! Some provided reflect IDs have no associated acknowledgement";
            error!(
                unacknowledged_reflect_ids = ?self.reflect_ids,
                ?acknowledged_reflect_ids,
                message,
            );
            return Err(CspE2eProtocolError::DesyncError(message.to_owned()));
        }
        if !acknowledged_reflect_ids.is_empty() {
            warn!(
                ?acknowledged_reflect_ids,
                "Some acknowledged reflect IDs have no associated reflect ID",
            );
        }

        // Done
        Ok(())
    }

    // TODO(LIB-16): Add the `reflect-ack` timestamps here and forward them to the result!
    #[tracing::instrument(skip_all, fields(?self))]
    pub(crate) fn response(&mut self, response: ReflectResponse) {
        let _ = self
            .acknowledged_reflect_ids
            .insert(response.acknowledged_reflect_ids);
    }
}

[Dauer der Verarbeitung: 0.24 Sekunden, vorverarbeitet 2026-04-27]