Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  mod.rs   Sprache: unbekannt

 
Spracherkennung für: .rs vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]

//! Implementation of the _Connection Rendezvous Protocol_.
use std::collections::HashMap;

use duplicate::duplicate_item;
use educe::Educe;
use libthreema_macros::{DebugVariantNames, VariantNames};
use prost::Message as _;
use rand::{self, Rng as _};
use tracing::{debug, trace, warn};
use zeroize::ZeroizeOnDrop;

pub use crate::d2d_rendezvous::frame::{IncomingFrame, OutgoingFrame};
use crate::{
    crypto::x25519,
    d2d_rendezvous::frame::FrameDecoder,
    protobuf::d2d_rendezvous as protobuf,
    utils::time::{Duration, Instant},
};

mod frame;
mod rxdak;
mod rxdtk;
mod rxdxk;

/// An error occurred while running the Connection Rendezvous Protocol.
///
/// Note: Errors can occur when using the API incorrectly or when the remote party behaves
/// incorrectly. Since the Connection Rendezvous Protocol is short-lived and all involved parties
/// are required to behave on all paths, none of these errors are considered recoverable.
///
/// When encountering an error:
///
/// 1. Let `error` be the provided [`RendezvousProtocolError`].
/// 2. Abort the protocol due to `error`.
///
/// TODO(LIB-30): Align errors with other protocols
#[derive(Debug, thiserror::Error)]
#[cfg_attr(feature = "uniffi", derive(uniffi::Error), uniffi(flat_error))]
pub enum RendezvousProtocolError {
    /// Invalid parameter provided by foreign code.
    #[cfg(feature = "uniffi")]
    #[error("Invalid parameter: {0}")]
    InvalidParameter(&'static str),

    /// Exhausted the available sequence numbers to use for sending/receiving frames.
    #[error("Sequence number would overflow")]
    SequenceNumberOverflow,

    /// Oversized frame received.
    #[error("Oversized frame of {0} bytes")]
    OversizedFrame(usize),

    /// Unable to decrypt a frame's payload.
    #[error("Decryption failed")]
    DecryptionFailed,

    /// Unable to encrypt a frame's payload.
    #[error("Encryption failed")]
    EncryptionFailed,

    /// Unable to decode a protobuf message.
    #[error("Decoding failed: {0}")]
    ProtobufDecodeFailed(#[from] prost::DecodeError),

    /// Incoming RRD's `Hello` message is invalid.
    #[error("Invalid RRD Hello message: {0}")]
    InvalidRrdHelloMessage(String),

    /// Incoming RID's `AuthHello` message is invalid.
    #[error("Invalid RID AuthHello message: {0}")]
    InvalidRidAuthHelloMessage(String),

    /// Incoming RRD's `Auth` message is invalid.
    #[error("Invalid RRD Auth message: {0}")]
    InvalidRrdAuthMessage(String),

    /// Unexpected frame received (e.g. during the nomination phase where only one role is allowed
    /// to send frames).
    #[error("Frame received unexpectedly")]
    UnexpectedFrame,

    /// Unable to find the referenced path. It was either never created or already dropped due to
    /// nomination of another path.
    #[error("Unknown or dropped path with PID {0}")]
    UnknownOrDroppedPath(u32),

    /// The referenced path has been closed (most likely due to a previous error encountered on the
    /// path).
    #[error("Path with PID {0} is closed")]
    PathClosed(u32),

    /// The local role does not allow for nomination.
    #[error("Nomination is not allowed for the local role")]
    NominateNotAllowed,

    /// Nomination already occurred for a path.
    #[error("Nomination already occurred for PID {0}")]
    NominationAlreadyDone(u32),

    /// Nomination is not allowed in the current state.
    #[error("Nomination is not allowed in state '{0}'")]
    InvalidStateForNomination(&'static str),

    /// Nomination is required before sending ULP data.
    #[error("Nomination is required before sending ULP data")]
    NominationRequired,
}

/// Authentication Key (AK).
#[derive(ZeroizeOnDrop)]
pub struct AuthenticationKey(pub [u8; 32]);

/// Rendezvous Path Hash (RPH), derived from the Shared Transport Key (STK).
pub struct RendezvousPathHash(pub [u8; 32]);

/// A path state update.
#[derive(DebugVariantNames, VariantNames)]
pub enum PathStateUpdate {
    /// The handshake on this path was successful and is await nomination (or being dropped).
    AwaitingNominate {
        /// RTT measured during the handshake, to be used by the nominator to select a path.
        measured_rtt: Duration,
    },

    /// The path has been nominated, allowing for ULP frames to be exchanged.
    Nominated {
        /// The Rendezvous Path Hash (RPH) of the nominated path.
        rph: RendezvousPathHash,
    },
}

/// Result returned when interacting with the protocol state machine. The result is associated to
/// the path whose PID was used when calling a function that yielded this result.
///
/// When handling this result, run the following steps:
///
/// 1. Let `path` be the context of the path whose PID was used in the function call that yielded this result.
/// 2. If the current phase is the _handshake and nominaton phase_:
///    1. If `incoming_ulp_data` is present, abort the protocol due to an error and abort these steps.
///    2. If `outgoing_frame` is present, enqueue it to be sent on `path`.
///    3. If `state_update` is [`PathStateUpdate::AwaitingNominate`] and the protocol took the role of the
///       nominator, run the _Path Awaiting Nomination Steps_ with `path` and abort these steps.
///    4. If `state_update` is [`PathStateUpdate::Nominated`]:
///       1. Mark `path` as _nominated_.
///       2. If the protocol took the role of the nominee, mark all other paths except `path` as _disregarded_
///          and close them (for WebSocket, use close code `1000`).
///       3. If the protocol took the rule of the nominator, mark all other paths except `path` as
///          _disregarded_ but keep them open with a timeout of 15s after which they should be closed (for
///          WebSocket, use close code `1000`).[^close-race]
///       4. Enter the _ULP phase_. At this point, the ULP may start creating frames through
///          [`RendezvousProtocol::create_ulp_frame`].
///    5. (Unreachable)
/// 3. If the current phase is the _ULP phase_:
///    1. If `path` is not marked as _nominated_, abort the protocol due to an error and abort these steps.
///    2. If `state_update` is present, abort the protocol due to an error and abort these steps.
///    3. If `outgoing_frame` is present, enqueue it to be sent on `path`.
///    4. If `incoming_ulp_data` is present, hand it off to the ULP.
/// 4. (Unreachable)
///
/// [^close-race]: This prevents a race condition between RID nominating a path and path close
/// detection on RRD's side.
#[derive(Debug)]
pub struct PathProcessResult {
    /// The path's state updated.
    pub state_update: Option<PathStateUpdate>,
    /// An outgoing frame is ready to be sent on the path.
    pub outgoing_frame: Option<OutgoingFrame>,
    /// An incoming frame has been reassembled and is ready to be handed off to the ULP.
    pub incoming_ulp_data: Option<Vec<u8>>,
}

/// 16 byte random authentication challenge
struct Challenge([u8; 16]);
impl Challenge {
    fn random() -> Self {
        let mut challenge = Self([0_u8; 16]);
        rand::thread_rng().fill(&mut challenge.0);
        challenge
    }
}

/// Ephemeral transport key (ETK).
struct EphemeralTransportKey(x25519::SharedSecretHSalsa20);

/// Protocol context passed around to the various roles and states.
struct Context {
    is_nominator: bool,
    ak: AuthenticationKey,
}
impl Context {
    fn new(is_nominator: bool, ak: AuthenticationKey) -> Self {
        Self { is_nominator, ak }
    }
}

/// Path states of RID.
#[derive(VariantNames, DebugVariantNames)]
enum RidPathState {
    /// Briefly used internally when moving from one state to another.
    Invalid,

    /// Awaiting an `RrdToRid.Hello` to start the handshake.
    AwaitingHello { authentication_keys: rxdak::ForRid },

    /// Sent an `RidToRrd.AuthHello`, awaiting an `RrdToRid.Auth`.
    AwaitingAuth {
        authentication_keys: rxdak::ForRid,
        sent_at: Instant,
        local_challenge: Challenge,
        shared_etk: EphemeralTransportKey,
    },

    /// Expecting a `Nominate` to be sent or received (depending on the configuration).
    AwaitingNominate {
        transport_keys: rxdtk::ForRid,
        rph: RendezvousPathHash,
    },

    /// The connection path was `Nominate`d and can now be used by the ULP.
    Nominated { transport_keys: rxdtk::ForRid },

    /// The connection path closed.
    Closed,
}

/// RID path protocol.
struct RidPath {
    pid: u32,
    decoder: FrameDecoder,
    state: RidPathState,
}
impl RidPath {
    fn new(ak: &AuthenticationKey, pid: u32) -> Self {
        Self {
            pid,
            decoder: FrameDecoder::new(vec![]),
            state: RidPathState::AwaitingHello {
                authentication_keys: rxdak::ForRid::new(ak, pid),
            },
        }
    }

    fn process_frame(
        &mut self,
        ctx: &Context,
        mut incoming_frame: IncomingFrame,
    ) -> Result<PathProcessResult, RendezvousProtocolError> {
        trace!(state = ?self.state, ?incoming_frame, "Processing frame");
        if let RidPathState::Nominated { transport_keys } = &mut self.state {
            // Handle `Nominated` state where the transport can be used by the ULP.
            Self::handle_ulp_data(transport_keys, incoming_frame).map(|incoming_ulp_data| PathProcessResult {
                state_update: None,
                outgoing_frame: None,
                incoming_ulp_data: Some(incoming_ulp_data),
            })
        } else {
            // Handle `Closed` state
            if let RidPathState::Closed = &self.state {
                return Err(RendezvousProtocolError::PathClosed(self.pid));
            }

            // Handle states that should immediately transition into another state
            //
            // IMPORTANT: All match arms must be infallible!
            match core::mem::replace(&mut self.state, RidPathState::Invalid) {
                RidPathState::AwaitingHello {
                    mut authentication_keys,
                } => {
                    // Handle `RrdToRid.Hello`, create `RidToRrd.AuthHello` and update state.
                    Self::handle_hello(&mut authentication_keys, &mut incoming_frame).map(
                        |(local_challenge, shared_etk, outgoing_frame)| {
                            (
                                RidPathState::AwaitingAuth {
                                    authentication_keys,
                                    sent_at: Instant::now(),
                                    local_challenge,
                                    shared_etk,
                                },
                                PathProcessResult {
                                    state_update: None,
                                    outgoing_frame: Some(outgoing_frame),
                                    incoming_ulp_data: None,
                                },
                            )
                        },
                    )
                },

                RidPathState::AwaitingAuth {
                    mut authentication_keys,
                    sent_at,
                    local_challenge,
                    shared_etk,
                } => {
                    // Calculate RTT
                    let measured_rtt = Instant::now().duration_since(sent_at);

                    // Handle `RrdToRid.Auth` and update state.
                    Self::handle_auth(&mut authentication_keys, &local_challenge, &mut incoming_frame).map(
                        |()| {
                            let (transport_keys, rph) =
                                rxdtk::ForRid::new(&ctx.ak, authentication_keys, shared_etk);
                            (
                                RidPathState::AwaitingNominate { transport_keys, rph },
                                PathProcessResult {
                                    state_update: Some(PathStateUpdate::AwaitingNominate { measured_rtt }),
                                    outgoing_frame: None,
                                    incoming_ulp_data: None,
                                },
                            )
                        },
                    )
                },

                RidPathState::AwaitingNominate {
                    mut transport_keys,
                    rph,
                } => {
                    // Check if the remote side is allowed to `Nominate`.
                    if ctx.is_nominator {
                        return Err(RendezvousProtocolError::UnexpectedFrame);
                    }

                    // Handle `Nominate` and update state.
                    Self::handle_nominate(&mut transport_keys, &mut incoming_frame).map(|()| {
                        (
                            RidPathState::Nominated { transport_keys },
                            PathProcessResult {
                                state_update: Some(PathStateUpdate::Nominated { rph }),
                                outgoing_frame: None,
                                incoming_ulp_data: None,
                            },
                        )
                    })
                },

                // States that must have been covered by code above
                RidPathState::Invalid | RidPathState::Nominated { .. } | RidPathState::Closed => {
                    unreachable!("State should have been handled")
                },
            }
            .map(|(state, result)| {
                self.state = state;
                debug!(state = ?self.state, "Changed state");
                result
            })
        }
        .map_err(|error| {
            self.state = RidPathState::Closed;
            warn!(?error, state = ?self.state, "Closed due to error");
            error
        })
    }

    fn nominate(&mut self) -> Result<PathProcessResult, RendezvousProtocolError> {
        // Ensure we are in the correct state to nominate
        if !matches!(&self.state, RidPathState::AwaitingNominate { .. }) {
            return Err(RendezvousProtocolError::InvalidStateForNomination(
                self.state.variant_name(),
            ));
        }

        // Nominate
        if let RidPathState::AwaitingNominate {
            mut transport_keys,
            rph,
        } = core::mem::replace(&mut self.state, RidPathState::Invalid)
        {
            Self::create_nominate(&mut transport_keys).map(|outgoing_frame| {
                (
                    RidPathState::Nominated { transport_keys },
                    PathProcessResult {
                        state_update: Some(PathStateUpdate::Nominated { rph }),
                        outgoing_frame: Some(outgoing_frame),
                        incoming_ulp_data: None,
                    },
                )
            })
        } else {
            unreachable!("Expected AwaitingNominate state")
        }
        .map(|(state, result)| {
            self.state = state;
            debug!(state = ?self.state, "Changed state");
            result
        })
        .map_err(|error| {
            self.state = RidPathState::Closed;
            warn!(?error, state = ?self.state, "Closed due to error");
            error
        })
    }

    fn create_ulp_frame(
        &mut self,
        outgoing_data: Vec<u8>,
    ) -> Result<PathProcessResult, RendezvousProtocolError> {
        match &mut self.state {
            RidPathState::Nominated { transport_keys } => {
                Self::create_ulp_data(transport_keys, outgoing_data).map(|outgoing_frame| PathProcessResult {
                    state_update: None,
                    outgoing_frame: Some(outgoing_frame),
                    incoming_ulp_data: None,
                })
            },
            _ => Err(RendezvousProtocolError::NominationRequired),
        }
    }

    fn handle_hello(
        keys: &mut rxdak::ForRid,
        incoming_frame: &mut IncomingFrame,
    ) -> Result<(Challenge, EphemeralTransportKey, OutgoingFrame), RendezvousProtocolError> {
        // Decrypt and decode into a `RrdToRid.Hello`
        let (remote_challenge, remote_etk) = {
            keys.rrdak.decrypt(&mut incoming_frame.0)?;
            let hello = protobuf::handshake::rrd_to_rid::Hello::decode(incoming_frame.0.as_ref())?;

            // Validate `RrdToRid.Hello`
            let remote_challenge = Challenge(hello.challenge.as_slice().try_into().map_err(|_| {
                RendezvousProtocolError::InvalidRrdHelloMessage(format!(
                    "Expected 16 challenge bytes, got {}",
                    hello.challenge.len()
                ))
            })?);
            let remote_etk =
                x25519::PublicKey::from(<[u8; 32]>::try_from(hello.etk.as_ref()).map_err(|_| {
                    RendezvousProtocolError::InvalidRrdHelloMessage(format!(
                        "Invalid remote ETK, got {} bytes",
                        hello.etk.len()
                    ))
                })?);
            (remote_challenge, remote_etk)
        };

        // Encode and encrypt `RidToRrd.AuthHello`
        let (local_challenge, shared_etk, outgoing_frame) = {
            // Generate a challenge
            let local_challenge = Challenge::random();

            // Generate local part of ETK
            let local_etk = x25519::EphemeralSecret::random_from_rng(rand::thread_rng());

            // Encode and encrypt `RidToRrd.AuthHello`
            let local_auth_hello = protobuf::handshake::rid_to_rrd::AuthHello {
                response: remote_challenge.0.to_vec(),
                challenge: local_challenge.0.to_vec(),
                etk: x25519::PublicKey::from(&local_etk).as_bytes().to_vec(),
            };
            let mut outgoing_data = local_auth_hello.encode_to_vec();
            keys.ridak.encrypt(&mut outgoing_data)?;

            // Derive ETK
            let shared_etk = EphemeralTransportKey(local_etk.diffie_hellman(&remote_etk).into());

            (local_challenge, shared_etk, OutgoingFrame(outgoing_data))
        };

        // Done
        Ok((local_challenge, shared_etk, outgoing_frame))
    }

    fn handle_auth(
        keys: &mut rxdak::ForRid,
        local_challenge: &Challenge,
        incoming_frame: &mut IncomingFrame,
    ) -> Result<(), RendezvousProtocolError> {
        // Decrypt and decode into a `RrdToRid.Auth`
        keys.rrdak.decrypt(&mut incoming_frame.0)?;
        let remote_auth = protobuf::handshake::rrd_to_rid::Auth::decode(incoming_frame.0.as_ref())?;

        // Validate `RrdToRid.Auth`
        if remote_auth.response.as_ref() != local_challenge.0 {
            return Err(RendezvousProtocolError::InvalidRrdAuthMessage(format!(
                "Challenge response of {} bytes does not match",
                remote_auth.response.len()
            )));
        }

        // Done
        Ok(())
    }

    fn create_nominate(keys: &mut rxdtk::ForRid) -> Result<OutgoingFrame, RendezvousProtocolError> {
        // Encode and encrypt a `Nominate`
        let local_nominate = protobuf::Nominate {};
        let mut outgoing_data = local_nominate.encode_to_vec();
        keys.ridtk.encrypt(&mut outgoing_data)?;
        Ok(OutgoingFrame(outgoing_data))
    }

    fn handle_nominate(
        keys: &mut rxdtk::ForRid,
        incoming_frame: &mut IncomingFrame,
    ) -> Result<(), RendezvousProtocolError> {
        // Decrypt and decode into a `Nominate`
        keys.rrdtk.decrypt(&mut incoming_frame.0)?;
        let _ = protobuf::Nominate::decode(incoming_frame.0.as_ref())?;
        Ok(())
    }

    fn create_ulp_data(
        keys: &mut rxdtk::ForRid,
        mut outgoing_data: Vec<u8>,
    ) -> Result<OutgoingFrame, RendezvousProtocolError> {
        // Encode and encrypt ULP data
        keys.ridtk.encrypt(&mut outgoing_data)?;
        Ok(OutgoingFrame(outgoing_data))
    }

    fn handle_ulp_data(
        keys: &mut rxdtk::ForRid,
        mut incoming_frame: IncomingFrame,
    ) -> Result<Vec<u8>, RendezvousProtocolError> {
        // Decrypt and decode ULP data
        keys.rrdtk.decrypt(&mut incoming_frame.0)?;
        Ok(incoming_frame.0)
    }
}

/// Path states of RID.
#[derive(VariantNames, DebugVariantNames)]
enum RrdPathState {
    /// Briefly used internally when moving from one state to another.
    Invalid,

    /// Sent an `RrdtoRid.Hello`, awaiting an `RidToRrd.AuthHello`.
    AwaitingAuthHello {
        authentication_keys: rxdak::ForRrd,
        sent_at: Instant,
        local_challenge: Challenge,
        local_etk: x25519::EphemeralSecret,
    },

    /// Expecting a `Nominate` to be sent or received (depending on the configuration).
    AwaitingNominate {
        transport_keys: rxdtk::ForRrd,
        rph: RendezvousPathHash,
    },

    /// The connection path was `Nominate`d and can now be used by the ULP.
    Nominated { transport_keys: rxdtk::ForRrd },

    /// The connection path closed.
    Closed,
}

/// RRD path protocol.
struct RrdPath {
    pid: u32,
    decoder: FrameDecoder,
    state: RrdPathState,
}
impl RrdPath {
    fn new(ak: &AuthenticationKey, pid: u32) -> (Self, OutgoingFrame) {
        // Create initial state
        let mut authentication_keys = rxdak::ForRrd::new(ak, pid);
        let (local_challenge, local_etk, outgoing_frame) = Self::create_hello(&mut authentication_keys);

        // Create path
        let path = Self {
            pid,
            decoder: FrameDecoder::new(vec![]),
            state: RrdPathState::AwaitingAuthHello {
                authentication_keys,
                sent_at: Instant::now(),
                local_challenge,
                local_etk,
            },
        };
        (path, outgoing_frame)
    }

    fn process_frame(
        &mut self,
        ctx: &Context,
        mut incoming_frame: IncomingFrame,
    ) -> Result<PathProcessResult, RendezvousProtocolError> {
        trace!(state = ?self.state, ?incoming_frame, "Processing frame");
        if let RrdPathState::Nominated { transport_keys } = &mut self.state {
            // Handle `Nominated` state where the transport can be used by the ULP.
            Self::handle_ulp_data(transport_keys, incoming_frame).map(|incoming_ulp_data| PathProcessResult {
                state_update: None,
                outgoing_frame: None,
                incoming_ulp_data: Some(incoming_ulp_data),
            })
        } else {
            // Handle `Closed` state
            if let RrdPathState::Closed = &self.state {
                return Err(RendezvousProtocolError::PathClosed(self.pid));
            }

            // Handle states that should immediately transition into another state
            //
            // IMPORTANT: All match arms must be infallible!
            match core::mem::replace(&mut self.state, RrdPathState::Invalid) {
                RrdPathState::AwaitingAuthHello {
                    mut authentication_keys,
                    sent_at,
                    local_challenge,
                    local_etk,
                } => {
                    // Calculate RTT
                    let measured_rtt = Instant::now().duration_since(sent_at);

                    // Handle `RidToRrd.AuthHello` and update state.
                    Self::handle_auth_hello(
                        &mut authentication_keys,
                        &local_challenge,
                        local_etk,
                        &mut incoming_frame,
                    )
                    .map(|(shared_etk, outgoing_frame)| {
                        let (transport_keys, rph) =
                            rxdtk::ForRrd::new(&ctx.ak, authentication_keys, shared_etk);
                        (
                            RrdPathState::AwaitingNominate { transport_keys, rph },
                            PathProcessResult {
                                state_update: Some(PathStateUpdate::AwaitingNominate { measured_rtt }),
                                outgoing_frame: Some(outgoing_frame),
                                incoming_ulp_data: None,
                            },
                        )
                    })
                },

                RrdPathState::AwaitingNominate {
                    mut transport_keys,
                    rph,
                } => {
                    // Check if the remote side is allowed to `Nominate`.
                    if ctx.is_nominator {
                        return Err(RendezvousProtocolError::UnexpectedFrame);
                    }

                    // Handle `Nominate` and update state.
                    Self::handle_nominate(&mut transport_keys, &mut incoming_frame).map(|()| {
                        (
                            RrdPathState::Nominated { transport_keys },
                            PathProcessResult {
                                state_update: Some(PathStateUpdate::Nominated { rph }),
                                outgoing_frame: None,
                                incoming_ulp_data: None,
                            },
                        )
                    })
                },

                // States that must have been covered by code above
                RrdPathState::Invalid | RrdPathState::Nominated { .. } | RrdPathState::Closed => {
                    unreachable!("State should have been handled")
                },
            }
            .map(|(state, result)| {
                self.state = state;
                debug!(state = ?self.state, "Changed state");
                result
            })
        }
        .map_err(|error| {
            self.state = RrdPathState::Closed;
            warn!(?error, state = ?self.state, "Closed due to error");
            error
        })
    }

    fn nominate(&mut self) -> Result<PathProcessResult, RendezvousProtocolError> {
        // Ensure we are in the correct state to nominate
        if !matches!(&self.state, RrdPathState::AwaitingNominate { .. }) {
            return Err(RendezvousProtocolError::InvalidStateForNomination(
                self.state.variant_name(),
            ));
        }

        // Nominate
        if let RrdPathState::AwaitingNominate {
            mut transport_keys,
            rph,
        } = core::mem::replace(&mut self.state, RrdPathState::Invalid)
        {
            Self::create_nominate(&mut transport_keys).map(|outgoing_frame| {
                (
                    RrdPathState::Nominated { transport_keys },
                    PathProcessResult {
                        state_update: Some(PathStateUpdate::Nominated { rph }),
                        outgoing_frame: Some(outgoing_frame),
                        incoming_ulp_data: None,
                    },
                )
            })
        } else {
            unreachable!("Expected AwaitingNominate state")
        }
        .map(|(state, result)| {
            self.state = state;
            debug!(state = ?self.state, "Changed state");
            result
        })
        .map_err(|error| {
            self.state = RrdPathState::Closed;
            warn!(?error, state = ?self.state, "Closed due to error");
            error
        })
    }

    fn create_ulp_frame(
        &mut self,
        outgoing_data: Vec<u8>,
    ) -> Result<PathProcessResult, RendezvousProtocolError> {
        match &mut self.state {
            RrdPathState::Nominated { transport_keys } => {
                Self::create_ulp_data(transport_keys, outgoing_data).map(|outgoing_frame| PathProcessResult {
                    state_update: None,
                    outgoing_frame: Some(outgoing_frame),
                    incoming_ulp_data: None,
                })
            },
            _ => Err(RendezvousProtocolError::NominationRequired),
        }
    }

    fn create_hello(keys: &mut rxdak::ForRrd) -> (Challenge, x25519::EphemeralSecret, OutgoingFrame) {
        // Generate a challenge
        let local_challenge = Challenge::random();

        // Generate local part of ETK
        let local_etk = x25519::EphemeralSecret::random_from_rng(rand::thread_rng());

        // Encode and encrypt `RrdToRid.Hello`
        let local_hello = protobuf::handshake::rrd_to_rid::Hello {
            challenge: local_challenge.0.to_vec(),
            etk: x25519::PublicKey::from(&local_etk).as_bytes().to_vec(),
        };
        let mut outgoing_data = local_hello.encode_to_vec();
        keys.rrdak
            .encrypt(&mut outgoing_data)
            .expect("Encrypting initial RrdToRid.Hello should work");

        (local_challenge, local_etk, OutgoingFrame(outgoing_data))
    }

    fn handle_auth_hello(
        keys: &mut rxdak::ForRrd,
        local_challenge: &Challenge,
        local_etk: x25519::EphemeralSecret,
        incoming_frame: &mut IncomingFrame,
    ) -> Result<(EphemeralTransportKey, OutgoingFrame), RendezvousProtocolError> {
        // Decrypt and decode into a `RidToRrd.AuthHello`
        let (remote_challenge, remote_etk) = {
            keys.ridak.decrypt(&mut incoming_frame.0)?;
            let remote_auth_hello =
                protobuf::handshake::rid_to_rrd::AuthHello::decode(incoming_frame.0.as_ref())?;

            // Validate `RidToRrd.AuthHello`
            if remote_auth_hello.response != local_challenge.0 {
                return Err(RendezvousProtocolError::InvalidRidAuthHelloMessage(format!(
                    "Challenge response of {} bytes does not match",
                    remote_auth_hello.response.len()
                )));
            }
            let remote_challenge =
                Challenge(remote_auth_hello.challenge.as_slice().try_into().map_err(|_| {
                    RendezvousProtocolError::InvalidRrdHelloMessage(format!(
                        "Expected 16 challenge bytes, got {}",
                        remote_auth_hello.challenge.len()
                    ))
                })?);
            let remote_etk = x25519::PublicKey::from(
                <[u8; 32]>::try_from(remote_auth_hello.etk.as_ref()).map_err(|_| {
                    RendezvousProtocolError::InvalidRidAuthHelloMessage(format!(
                        "Invalid remote ETK, got {} bytes",
                        remote_auth_hello.etk.len()
                    ))
                })?,
            );
            (remote_challenge, remote_etk)
        };

        // Encode and encrypt `RrdToRid.Auth`
        let (shared_etk, outgoing_frame) = {
            // Encode and encrypt `RrdToRid.Auth`
            let local_auth = protobuf::handshake::rrd_to_rid::Auth {
                response: remote_challenge.0.to_vec(),
            };
            let mut outgoing_data = local_auth.encode_to_vec();
            keys.rrdak.encrypt(&mut outgoing_data)?;

            // Derive ETK
            let shared_etk = EphemeralTransportKey(local_etk.diffie_hellman(&remote_etk).into());

            (shared_etk, OutgoingFrame(outgoing_data))
        };

        // Done
        Ok((shared_etk, outgoing_frame))
    }

    fn create_nominate(keys: &mut rxdtk::ForRrd) -> Result<OutgoingFrame, RendezvousProtocolError> {
        // Encode and encrypt a `Nominate`
        let local_nominate = protobuf::Nominate {};
        let mut outgoing_data = local_nominate.encode_to_vec();
        keys.rrdtk.encrypt(&mut outgoing_data)?;
        Ok(OutgoingFrame(outgoing_data))
    }

    fn handle_nominate(
        keys: &mut rxdtk::ForRrd,
        incoming_frame: &mut IncomingFrame,
    ) -> Result<(), RendezvousProtocolError> {
        // Decrypt and decode into a `Nominate`
        keys.ridtk.decrypt(&mut incoming_frame.0)?;
        let _ = protobuf::Nominate::decode(incoming_frame.0.as_ref())?;
        Ok(())
    }

    fn create_ulp_data(
        keys: &mut rxdtk::ForRrd,
        mut outgoing_data: Vec<u8>,
    ) -> Result<OutgoingFrame, RendezvousProtocolError> {
        // Encode and encrypt ULP data
        keys.rrdtk.encrypt(&mut outgoing_data)?;
        Ok(OutgoingFrame(outgoing_data))
    }

    fn handle_ulp_data(
        keys: &mut rxdtk::ForRrd,
        mut incoming_frame: IncomingFrame,
    ) -> Result<Vec<u8>, RendezvousProtocolError> {
        // Decrypt and decode ULP data
        keys.ridtk.decrypt(&mut incoming_frame.0)?;
        Ok(incoming_frame.0)
    }
}

trait Path: Send {
    fn add_chunks(&mut self, chunks: &[&[u8]]) -> Result<(), RendezvousProtocolError>;

    fn process_frame(&mut self, ctx: &Context) -> Result<Option<PathProcessResult>, RendezvousProtocolError>;

    fn nominate(&mut self) -> Result<PathProcessResult, RendezvousProtocolError>;

    fn create_ulp_frame(
        &mut self,
        outgoing_data: Vec<u8>,
    ) -> Result<PathProcessResult, RendezvousProtocolError>;
}

#[duplicate_item(
    path_type   path_state_type;
    [ RidPath ] [ RidPathState ];
    [ RrdPath ] [ RrdPathState ];
)]
impl Path for path_type {
    #[inline]
    fn add_chunks(&mut self, chunks: &[&[u8]]) -> Result<(), RendezvousProtocolError> {
        let length = self.decoder.add_chunks(chunks);

        // Check if the frame exceeds the maximum supported length of this state
        let max_length = if matches!(&self.state, path_state_type::Nominated { .. }) {
            FrameDecoder::MAX_LENGTH_AFTER_NOMINATION
        } else {
            FrameDecoder::MAX_LENGTH_BEFORE_NOMINATION
        };
        if length > max_length {
            return Err(RendezvousProtocolError::OversizedFrame(length));
        }
        Ok(())
    }

    fn process_frame(&mut self, ctx: &Context) -> Result<Option<PathProcessResult>, RendezvousProtocolError> {
        self.decoder
            .next_frame_and_then(|incoming_frame| IncomingFrame(incoming_frame.to_vec()))
            .map(|incoming_frame| self.process_frame(ctx, incoming_frame))
            .transpose()
    }

    #[inline]
    fn nominate(&mut self) -> Result<PathProcessResult, RendezvousProtocolError> {
        self.nominate()
    }

    #[inline]
    fn create_ulp_frame(
        &mut self,
        outgoing_data: Vec<u8>,
    ) -> Result<PathProcessResult, RendezvousProtocolError> {
        self.create_ulp_frame(outgoing_data)
    }
}

/// Internal protocol state.
#[derive(DebugVariantNames, VariantNames)]
enum ProtocolState {
    /// The paths are currently racing, meaning we are in the _handshake and nomination phase_.
    RacingPaths(HashMap<u32, Box<dyn Path>>),

    /// One path has been nominated, all other paths have been discarded, meaning we are in the
    /// _ULP phase_.
    Nominated { pid: u32, path: Box<dyn Path> },
}

/// Connection Rendezvous Protocol state machine.
///
/// The protocol state machine can be constructed from a formerly exchanged a `RendezvousInit` and
/// the associated roles by using [`RendezvousProtocol::new_as_rid`] and
/// [`RendezvousProtocol::new_as_rrd`].
///
/// Any interaction with the protocol state machine that changes the internal state will yield a
/// [`PathProcessResult`] that must be handled according to its documentation.
///
/// The protocol goes through exactly two phases:
///
/// - The _handshake and nomination phase_ where all paths are racing the handshake simultaneously until one
///   has been nominated by the nominator.
/// - The _ULP phase_ where ULP frames can be exchanged on the nominated path.
///
/// The following steps are defined as the _Path Awaiting Nomination Steps_:
///
/// 1. Let `path` be the associated path.
/// 2. If the protocol did not take the role of the nominator, abort the protocol due to an error and abort
///    these steps.
/// 3. If `path` is the only path, run [`RendezvousProtocol::nominate_path`] for `path` and abort these steps.
/// 4. (Unreachable / TODO(LIB-10): As of today, only one path is expected to be used.)
///
/// When a path closed, run the following steps:
///
/// 1. Let `path` be the path that closed (initiated locally or remotely).
/// 2. If `path` is marked as _nominated_, abort the protocol normally with any available close information
///    and abort these steps.
/// 3. If `path` is marked as _disregarded_, log a notice and abort these steps.
/// 4. If `path` is the last path that closed (i.e. all other paths already closed or there are no other
///    paths), log a warning that all paths closed before nomination, abort the protocol normally with any
///    available close information and abort these steps.
/// 5. Log a warning that `path` closed before nomination.
///
/// When receiving data on a path:
///
/// 1. Run [`RendezvousProtocol::add_chunks`] with the respective path's PID.
/// 2. In a loop, run [`RendezvousProtocol::process_frame`] with the respective path's PID and handle the
///    result until it no longer produces a [`PathProcessResult`].
///
/// When the protocol is being aborted:
///
/// 1. Let `cause` be an error or any information associated to normal closure.
/// 2. Log the protocol abort due to `cause` as a notice or an error respectively.
/// 3. Tear down the protocol state machine.
/// 4. Close all remaining paths exceptionally.
/// 5. Hand off `cause` to the ULP.
#[derive(Educe)]
#[educe(Debug)]
pub struct RendezvousProtocol {
    #[educe(Debug(ignore))]
    ctx: Context,
    state: ProtocolState,
}

// TODO(LIB-11): Add construction of the `RendezvousInit` from the paths here.
impl RendezvousProtocol {
    /// Create a new Connection Rendezvous Protocol as the Rendezvous Initiator Device (RID) from a
    /// formerly exchanged `RendezvousInit`.
    ///
    /// `pids` must contain the set of available pre-initiated paths with their associated Path IDs
    /// (PID).
    ///
    /// Returns the protocol state machine instance.
    #[tracing::instrument(skip_all, fields(?is_nominator, ?pids))]
    pub fn new_as_rid(is_nominator: bool, ak: AuthenticationKey, pids: &[u32]) -> Self {
        debug!("Creating D2D rendezvous protocol");
        let ctx = Context::new(is_nominator, ak);

        // Create paths
        let racing_paths = pids
            .iter()
            .map(|pid| {
                let path = RidPath::new(&ctx.ak, *pid);
                (*pid, Box::new(path) as Box<dyn Path>)
            })
            .collect();

        // Create protocol
        Self {
            ctx,
            state: ProtocolState::RacingPaths(racing_paths),
        }
    }

    /// Create a new Connection Rendezvous Protocol as the Rendezvous Responder Device (RRD) from a
    /// formerly exchanged `RendezvousInit`.
    ///
    /// `pids` must contain the set of available pre-initiated paths with their associated Path IDs
    /// (PID).
    ///
    /// Returns a tuple of the protocol state machine instance and a list of PIDs and outgoing
    /// frames to be enqueued on the respective paths immediately.
    #[tracing::instrument(skip_all, fields(?is_nominator, ?pids))]
    pub fn new_as_rrd(
        is_nominator: bool,
        ak: AuthenticationKey,
        pids: &[u32],
    ) -> (Self, Vec<(u32, OutgoingFrame)>) {
        debug!("Creating protocol");
        let ctx = Context::new(is_nominator, ak);
        let mut outgoing_frames = vec![];

        // Create paths
        let racing_paths = pids
            .iter()
            .map(|pid| {
                let (path, outgoing_frame) = RrdPath::new(&ctx.ak, *pid);
                outgoing_frames.push((*pid, outgoing_frame));
                (*pid, Box::new(path) as Box<dyn Path>)
            })
            .collect();

        // Create protocol
        let protocol = Self {
            ctx,
            state: ProtocolState::RacingPaths(racing_paths),
        };
        (protocol, outgoing_frames)
    }

    /// Return whether the protocol took the role of the nominator.
    #[must_use]
    pub const fn is_nominator(&self) -> bool {
        self.ctx.is_nominator
    }

    /// Return the nominated path's PID, if available.
    #[must_use]
    pub const fn nominated_path(&self) -> Option<u32> {
        if let ProtocolState::Nominated { pid, .. } = &self.state {
            Some(*pid)
        } else {
            None
        }
    }

    /// Add chunks received on the specified path. The chunks may or may not contain complete frames
    /// or even contain multiple complete frames.
    ///
    /// # Errors
    ///
    /// Returns [`RendezvousProtocolError::UnknownOrDroppedPath`] if the path associated to `pid`
    /// could not be found.
    #[tracing::instrument(skip_all, fields(
        ?self, ?pid,
        chunks_byte_length = chunks.iter().map(|chunk| chunk.len()).sum::<usize>(),
    ))]
    pub fn add_chunks(&mut self, pid: u32, chunks: &[&[u8]]) -> Result<(), RendezvousProtocolError> {
        let path = Self::lookup_path(&mut self.state, pid)?;
        path.add_chunks(chunks)
    }

    /// Process any available buffered complete frame for the specified path.
    ///
    /// # Errors
    ///
    /// Returns [`RendezvousProtocolError`] for a plethora of reasons, e.g. if the path associated
    /// to `pid` could not be found, an incoming frame could not be decoded or decrypted, or an
    /// unexpected message was received, or, as a response to it, another outgoing frame could not
    /// be encrypted.
    #[tracing::instrument(skip_all, fields(?self, ?pid))]
    pub fn process_frame(&mut self, pid: u32) -> Result<Option<PathProcessResult>, RendezvousProtocolError> {
        let path = Self::lookup_path(&mut self.state, pid)?;

        // Decode and process the next frame, if any can be decoded
        let result = path.process_frame(&self.ctx)?;
        trace!(?result, "Processed frame");
        let Some(result) = result else {
            return Ok(result);
        };

        // Update state if the path was nominated and we are still racing paths
        if let (ProtocolState::RacingPaths(racing_paths), Some(PathStateUpdate::Nominated { .. })) =
            (&mut self.state, &result.state_update)
        {
            // Nominate the path
            let path = racing_paths
                .remove(&pid)
                .ok_or(RendezvousProtocolError::UnknownOrDroppedPath(pid))?;
            debug!(
                dropped_pids = ?racing_paths.keys(),
                "Remote nominated, dropping all other paths"
            );
            self.state = ProtocolState::Nominated { pid, path };
        }

        // Return the result
        Ok(Some(result))
    }

    /// Nominate a path.
    ///
    /// # Errors
    ///
    /// Returns [`RendezvousProtocolError`] if the protocol did not take the role of the nominator,
    /// the path associated to `pid` could not be found, the path is not ready to be nominated or
    /// nomination already happened.
    #[tracing::instrument(skip_all, fields(?self, ?pid))]
    pub fn nominate_path(&mut self, pid: u32) -> Result<PathProcessResult, RendezvousProtocolError> {
        // Ensure we're allowed to nominate
        if !self.ctx.is_nominator {
            return Err(RendezvousProtocolError::NominateNotAllowed);
        }

        // Nominate if the paths are still racing
        match &mut self.state {
            ProtocolState::RacingPaths(racing_paths) => {
                // Attempt to nominate the path
                let mut path = racing_paths
                    .remove(&pid)
                    .ok_or(RendezvousProtocolError::UnknownOrDroppedPath(pid))?;
                let result = path.nominate()?;
                debug!(
                    dropped_pids = ?racing_paths.keys(),
                    "Local nominated, dropping all other paths"
                );
                self.state = ProtocolState::Nominated { pid, path };
                Ok(result)
            },

            ProtocolState::Nominated { pid, .. } => {
                // Nomination already happened
                Err(RendezvousProtocolError::NominationAlreadyDone(*pid))
            },
        }
    }

    /// Create a ULP frame to be encrypted and sent as an outgoing frame on the nominated path.
    ///
    /// # Errors
    ///
    /// Returns [`RendezvousProtocolError`] if nomination of a path is still pending or the ULP
    /// frame could not be encrypted or encoded.
    #[tracing::instrument(skip_all, fields(?self, outgoing_data_length = outgoing_data.len()))]
    pub fn create_ulp_frame(
        &mut self,
        outgoing_data: Vec<u8>,
    ) -> Result<PathProcessResult, RendezvousProtocolError> {
        match &mut self.state {
            ProtocolState::RacingPaths(..) => Err(RendezvousProtocolError::NominationRequired),
            ProtocolState::Nominated { path, .. } => path.create_ulp_frame(outgoing_data),
        }
    }

    fn lookup_path(
        state: &mut ProtocolState,
        pid: u32,
    ) -> Result<&mut Box<dyn Path>, RendezvousProtocolError> {
        // Lookup path based on the current state.
        let path = match state {
            // The nomination race is still ongoing. Lookup the path by its PID.
            ProtocolState::RacingPaths(racing_paths) => racing_paths
                .get_mut(&pid)
                .ok_or(RendezvousProtocolError::UnknownOrDroppedPath(pid))?,

            // There's only one nominated path. Ensure it's the correct one.
            ProtocolState::Nominated {
                pid: nominated_pid,
                path,
            } => {
                if pid != *nominated_pid {
                    return Err(RendezvousProtocolError::UnknownOrDroppedPath(pid));
                }
                path
            },
        };
        Ok(path)
    }
}

[Dauer der Verarbeitung: 0.31 Sekunden, vorverarbeitet 2026-04-27]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....
    

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge