Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/Java/Threema/domain/libthreema/lib/examples/bin/     Datei vom 25.3.2026 mit Größe 14 kB image not shown  

Quelle  d2m_ping_pong.rs   Sprache: unbekannt

 
Spracherkennung für: .rs vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]

//! Example for usage of the Device to Mediator Protocol state machine, doing a real handshake with the
//! mediator server and an exemplary payload flow loop.
#![expect(unused_crate_dependencies, reason = "Example triggered false positive")]
#![expect(
    clippy::integer_division_remainder_used,
    reason = "Some internal of tokio::select triggers this"
)]

use core::time::Duration;

use anyhow::{Result, anyhow, bail};
use clap::Parser;
use futures_util::{SinkExt as _, TryStreamExt as _};
use libthreema::{
    cli::{FullIdentityConfig, FullIdentityConfigOptions},
    d2m::{
        D2mContext, D2mProtocol, D2mStateUpdate,
        payload::{BeginTransaction, IncomingPayload, OutgoingPayload, Reflect, ReflectFlags},
    },
    https::cli::https_client_builder,
    utils::logging::init_stderr_logging,
};
use rand::random;
use reqwest::StatusCode;
use tokio::{
    net::TcpStream,
    signal,
    sync::mpsc,
    time::{self, Instant},
};
use tokio_tungstenite::{
    MaybeTlsStream, WebSocketStream, connect_async,
    tungstenite::protocol::{CloseFrame, Message, frame::coding::CloseCode},
};
use tracing::{Level, debug, error, info, trace, warn};

#[derive(Parser)]
#[command()]
struct D2mPingPongCommand {
    #[command(flatten)]
    config: FullIdentityConfigOptions,
}

/// Payload queues for the main process
struct PayloadQueuesForD2mPingPong {
    incoming: mpsc::Receiver<IncomingPayload>,
    outgoing: mpsc::Sender<OutgoingPayload>,
}

/// Payload queues for the protocol flow runner
struct PayloadQueuesForProtocol {
    incoming: mpsc::Sender<IncomingPayload>,
    outgoing: mpsc::Receiver<OutgoingPayload>,
}

struct D2mProtocolRunner {
    /// The WebSocket stream
    stream: WebSocketStream<MaybeTlsStream<TcpStream>>,

    /// An instance of the [`D2mProtocol`] state machine
    protocol: D2mProtocol,
}
impl D2mProtocolRunner {
    /// Initiate a D2M protocol connection
    #[tracing::instrument(skip_all)]
    async fn new(context: D2mContext) -> Result<Self> {
        // Create the protocol
        let (d2m_protocol, url) = D2mProtocol::new(context);

        // Connect via WebSocket
        debug!(?url, "Establishing WebSocket connection to mediator server");
        let (stream, response) = connect_async(url).await?;
        if response.status() != StatusCode::SWITCHING_PROTOCOLS {
            bail!(
                "Expected response to switch protocols ({expected}), got {actual}",
                expected = StatusCode::SWITCHING_PROTOCOLS,
                actual = response.status(),
            );
        }
        Ok(Self {
            stream,
            protocol: d2m_protocol,
        })
    }

    /// Do the handshake with the mediator server by exchanging the following messages:
    ///
    /// ```txt
    /// C -- client-info --> S (was already sent as part of the URL's path)
    /// C <- server-hello -- S
    /// C -- client-hello -> S
    /// C <- server-info --- S
    /// ```
    async fn run_handshake_flow(&mut self) -> Result<()> {
        for iteration in 1_usize.. {
            trace!("Iteration #{iteration}");

            // Receive datagram and add it
            let datagram = self.receive().await?;
            self.protocol.add_datagrams(vec![datagram])?;

            // Handle instruction
            let Some(instruction) = self.protocol.poll()? else {
                continue;
            };

            // We do not expect an incoming payload at this stage
            if let Some(incoming_payload) = instruction.incoming_payload {
                let message = "Unexpected incoming payload during handshake";
                error!(?incoming_payload, message);
                bail!(message)
            }

            // Send any outgoing datagram
            if let Some(datagram) = instruction.outgoing_datagram {
                self.send(datagram.0).await?;
            }

            // Check if we've completed the handshake
            if let Some(D2mStateUpdate::PostHandshake(server_info)) = instruction.state_update {
                info!(?server_info, "Handshake completed");
                break;
            }
        }

        Ok(())
    }

    /// Run the payload exchange flow until stopped.
    #[tracing::instrument(skip_all)]
    async fn run_payload_flow(&mut self, mut queues: PayloadQueuesForProtocol) -> Result<()> {
        for iteration in 1_usize.. {
            trace!("Payload flow iteration #{iteration}");

            // Poll for any pending instruction
            let mut instruction = self.protocol.poll()?;
            if instruction.is_none() {
                // No pending instruction left, wait for more input
                instruction = tokio::select! {
                    // Forward any incoming datagrams from the WebSocket transport
                    datagram = self.receive() => {
                        // Add datagram (poll in the next iteration)
                        self.protocol.add_datagrams(vec![datagram?])?;
                        None
                    },

                    // Forward any outgoing payloads
                    Some(outgoing_payload) = queues.outgoing.recv() => {
                        debug!(?outgoing_payload, "Sending payload");
                        let instruction = self.protocol.create_payload(outgoing_payload)?;
                        Some(instruction)
                    }
                }
            }

            let Some(instruction) = instruction else {
                continue;
            };

            // We do not expect any state updates at this stage
            if let Some(state_update) = instruction.state_update {
                let message = "Unexpected state update after handshake";
                error!(?state_update, message);
                bail!(message)
            }

            // Log any incoming payload
            if let Some(incoming_payload) = instruction.incoming_payload {
                debug!(?incoming_payload, "Received payload");
                queues.incoming.send(incoming_payload).await?;
            }

            // Send any outgoing datagram
            if let Some(datagram) = instruction.outgoing_datagram {
                self.send(datagram.0).await?;
            }
        }

        Ok(())
    }

    #[tracing::instrument(skip_all)]
    async fn shutdown(mut self) -> Result<()> {
        info!("Shutting down WebSocket connection");

        // Normal closure, e.g. when the user is explicitly disconnecting
        Ok(self
            .stream
            .close(Some(CloseFrame {
                code: CloseCode::Normal,
                reason: "Bye".into(),
            }))
            .await?)
    }

    #[tracing::instrument(skip_all)]
    async fn send(&mut self, datagram: Vec<u8>) -> Result<()> {
        trace!(length = datagram.len(), "Sending datagram");
        self.stream.send(Message::Binary(datagram.into())).await?;
        Ok(())
    }

    #[tracing::instrument(skip_all)]
    async fn receive(&mut self) -> Result<Vec<u8>> {
        let datagram = loop {
            let message = self
                .stream
                .try_next()
                .await?
                .ok_or(anyhow!("WebSocket reading end closed"))?;
            match message {
                Message::Binary(bytes) => break bytes.to_vec(),

                Message::Text(text) => {
                    bail!("Received unexpected text message: {}", text.as_str())
                },

                Message::Ping(bytes) => {
                    // WARNING: There's a slight chance that the pong is lost when this is cancelled!
                    debug!(ping_length = bytes.len(), "Received ping, responding with a pong");
                    self.stream.feed(Message::Pong(bytes)).await?;
                    debug!("Pong sent");
                },

                Message::Pong(bytes) => {
                    debug!(pong_length = bytes.len(), "Received pong");
                },

                Message::Close(close_frame) => {
                    info!(?close_frame, "Received close");
                },

                Message::Frame(_) => {
                    bail!("Received unexpected raw frame");
                },
            }
        };
        debug!(datagram_length = datagram.len(), "Received datagram");
        Ok(datagram)
    }
}

#[derive(Debug, PartialEq)]
enum TransactionState {
    None,
    Blocked,
    AwaitingBeginAck,
    Running,
    AwaitingCommitAck,
}

struct D2mPingPongFlowRunner {
    queues: PayloadQueuesForD2mPingPong,
    transaction_state: TransactionState,
    reflect_id_counter: u32,
}
impl D2mPingPongFlowRunner {
    fn new(queues: PayloadQueuesForD2mPingPong) -> Self {
        Self {
            queues,
            transaction_state: TransactionState::None,
            reflect_id_counter: 0,
        }
    }

    async fn run(mut self) -> Result<()> {
        // Create a timer that will periodically trigger an outgoing payload
        let mut payload_timer = time::interval_at(
            Instant::now()
                .checked_add(Duration::from_secs(10))
                .expect("Oops, apocalypse is near"),
            Duration::from_secs(10),
        );

        // Enter ping-pong flow loop
        loop {
            let outgoing_payload = tokio::select! {
                // Create an outgoing payload when the timer fires
                _ = payload_timer.tick() => {
                    self.create_outgoing_payload()
                },

                // Process incoming payload
                incoming_payload = self.queues.incoming.recv() => {
                    if let Some(incoming_payload) = incoming_payload {
                        info!(?incoming_payload, "Received payload");
                        self.handle_incoming_payload(&incoming_payload)
                    } else {
                        break
                    }
                }
            };

            // Send any outgoing payload
            if let Some(outgoing_payload) = outgoing_payload {
                info!(?outgoing_payload, "Sending payload");
                self.queues.outgoing.send(outgoing_payload).await?;
            }
        }

        Ok(())
    }

    #[tracing::instrument(skip_all)]
    fn handle_incoming_payload(&mut self, incoming_payload: &IncomingPayload) -> Option<OutgoingPayload> {
        match incoming_payload {
            // Transaction acknowledged: Now running
            IncomingPayload::BeginTransactionAck => {
                self.transaction_state = TransactionState::Running;
                None
            },

            // Transaction committed: Now none ongoing
            IncomingPayload::CommitTransactionAck => {
                self.transaction_state = TransactionState::None;
                None
            },

            // Transaction rejected: Retry beginning once we're unblocked
            IncomingPayload::TransactionRejected(_) => {
                self.transaction_state = TransactionState::Blocked;
                None
            },

            // Another transaction ended: Retry if we were blocked
            IncomingPayload::TransactionEnded(_) => {
                if self.transaction_state == TransactionState::Blocked {
                    self.transaction_state = TransactionState::None;
                    Some(OutgoingPayload::BeginTransaction(BeginTransaction {
                        encrypted_scope: b"encrypted_scope".to_vec(),
                        ttl: None,
                    }))
                } else {
                    None
                }
            },

            _ => None,
        }
    }

    #[tracing::instrument(skip_all)]
    fn create_outgoing_payload(&mut self) -> Option<OutgoingPayload> {
        trace!(state = ?self.transaction_state);
        match self.transaction_state {
            // No transaction: Occasionally begin a transaction
            TransactionState::None if random::<bool>() => {
                self.transaction_state = TransactionState::AwaitingBeginAck;
                Some(OutgoingPayload::BeginTransaction(BeginTransaction {
                    encrypted_scope: b"encrypted_scope".to_vec(),
                    ttl: None,
                }))
            },

            // Transaction running: Commit
            TransactionState::Running => {
                self.transaction_state = TransactionState::AwaitingCommitAck;
                Some(OutgoingPayload::CommitTransaction)
            },

            // No trannsaction running: Reflect
            TransactionState::None
            | TransactionState::Blocked
            | TransactionState::AwaitingBeginAck
            | TransactionState::AwaitingCommitAck => {
                self.reflect_id_counter = self.reflect_id_counter.checked_add(1)?;
                Some(OutgoingPayload::Reflect(Reflect {
                    flags: ReflectFlags(ReflectFlags::EPHEMERAL_MARKER),
                    reflect_id: self.reflect_id_counter,
                    envelope: b"envelope".to_vec(),
                }))
            },
        }
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    // Configure logging
    init_stderr_logging(Level::TRACE);

    // Create HTTP client
    let http_client = https_client_builder().build()?;

    // Parse command
    let arguments = D2mPingPongCommand::parse();
    let config = FullIdentityConfig::from_options(&http_client, arguments.config).await?;

    // Create payload queues
    let (app_queues, protocol_queues) = {
        let incoming_payload = mpsc::channel(4);
        let outgoing_payload = mpsc::channel(4);
        (
            PayloadQueuesForD2mPingPong {
                incoming: incoming_payload.1,
                outgoing: outgoing_payload.0,
            },
            PayloadQueuesForProtocol {
                incoming: incoming_payload.0,
                outgoing: outgoing_payload.1,
            },
        )
    };

    // Create D2M protocol and establish a connection
    let mut d2m_connection = D2mProtocolRunner::new(
        config
            .d2m_context()
            .expect("Configuration must include D2X configuration"),
    )
    .await?;

    // Create protocol flow runner
    let ping_pong_flow_runner = D2mPingPongFlowRunner::new(app_queues);

    // Run the handshake flow
    d2m_connection.run_handshake_flow().await?;

    // Run the protocols
    tokio::select! {
        _ = d2m_connection.run_payload_flow(protocol_queues) => {}
        _ = ping_pong_flow_runner.run() => {}
        _ = signal::ctrl_c() => {},
    };

    // Shut down
    d2m_connection.shutdown().await?;
    Ok(())
}

#[test]
fn verify_cli() {
    use clap::CommandFactory;
    D2mPingPongCommand::command().debug_assert();
}

[Dauer der Verarbeitung: 0.20 Sekunden, vorverarbeitet 2026-04-27]