|
|
|
|
Quelle mod.rs
Sprache: unbekannt
|
|
Spracherkennung für: .rs vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]
//! Implementation of the _Connection Rendezvous Protocol_.
use std::collections::HashMap;
use duplicate::duplicate_item;
use educe::Educe;
use libthreema_macros::{DebugVariantNames, VariantNames};
use prost::Message as _;
use rand::{self, Rng as _};
use tracing::{debug, trace, warn};
use zeroize::ZeroizeOnDrop;
pub use crate::d2d_rendezvous::frame::{IncomingFrame, OutgoingFrame};
use crate::{
crypto::x25519,
d2d_rendezvous::frame::FrameDecoder,
protobuf::d2d_rendezvous as protobuf,
utils::time::{Duration, Instant},
};
mod frame;
mod rxdak;
mod rxdtk;
mod rxdxk;
/// An error occurred while running the Connection Rendezvous Protocol.
///
/// Note: Errors can occur when using the API incorrectly or when the remote party behaves
/// incorrectly. Since the Connection Rendezvous Protocol is short-lived and all involved parties
/// are required to behave on all paths, none of these errors are considered recoverable.
///
/// When encountering an error:
///
/// 1. Let `error` be the provided [`RendezvousProtocolError`].
/// 2. Abort the protocol due to `error`.
///
/// TODO(LIB-30): Align errors with other protocols
#[derive(Debug, thiserror::Error)]
#[cfg_attr(feature = "uniffi", derive(uniffi::Error), uniffi(flat_error))]
pub enum RendezvousProtocolError {
/// Invalid parameter provided by foreign code.
#[cfg(feature = "uniffi")]
#[error("Invalid parameter: {0}")]
InvalidParameter(&'static str),
/// Exhausted the available sequence numbers to use for sending/receiving frames.
#[error("Sequence number would overflow")]
SequenceNumberOverflow,
/// Oversized frame received.
#[error("Oversized frame of {0} bytes")]
OversizedFrame(usize),
/// Unable to decrypt a frame's payload.
#[error("Decryption failed")]
DecryptionFailed,
/// Unable to encrypt a frame's payload.
#[error("Encryption failed")]
EncryptionFailed,
/// Unable to decode a protobuf message.
#[error("Decoding failed: {0}")]
ProtobufDecodeFailed(#[from] prost::DecodeError),
/// Incoming RRD's `Hello` message is invalid.
#[error("Invalid RRD Hello message: {0}")]
InvalidRrdHelloMessage(String),
/// Incoming RID's `AuthHello` message is invalid.
#[error("Invalid RID AuthHello message: {0}")]
InvalidRidAuthHelloMessage(String),
/// Incoming RRD's `Auth` message is invalid.
#[error("Invalid RRD Auth message: {0}")]
InvalidRrdAuthMessage(String),
/// Unexpected frame received (e.g. during the nomination phase where only one role is allowed
/// to send frames).
#[error("Frame received unexpectedly")]
UnexpectedFrame,
/// Unable to find the referenced path. It was either never created or already dropped due to
/// nomination of another path.
#[error("Unknown or dropped path with PID {0}")]
UnknownOrDroppedPath(u32),
/// The referenced path has been closed (most likely due to a previous error encountered on the
/// path).
#[error("Path with PID {0} is closed")]
PathClosed(u32),
/// The local role does not allow for nomination.
#[error("Nomination is not allowed for the local role")]
NominateNotAllowed,
/// Nomination already occurred for a path.
#[error("Nomination already occurred for PID {0}")]
NominationAlreadyDone(u32),
/// Nomination is not allowed in the current state.
#[error("Nomination is not allowed in state '{0}'")]
InvalidStateForNomination(&'static str),
/// Nomination is required before sending ULP data.
#[error("Nomination is required before sending ULP data")]
NominationRequired,
}
/// Authentication Key (AK).
#[derive(ZeroizeOnDrop)]
pub struct AuthenticationKey(pub [u8; 32]);
/// Rendezvous Path Hash (RPH), derived from the Shared Transport Key (STK).
pub struct RendezvousPathHash(pub [u8; 32]);
/// A path state update.
#[derive(DebugVariantNames, VariantNames)]
pub enum PathStateUpdate {
/// The handshake on this path was successful and is await nomination (or being dropped).
AwaitingNominate {
/// RTT measured during the handshake, to be used by the nominator to select a path.
measured_rtt: Duration,
},
/// The path has been nominated, allowing for ULP frames to be exchanged.
Nominated {
/// The Rendezvous Path Hash (RPH) of the nominated path.
rph: RendezvousPathHash,
},
}
/// Result returned when interacting with the protocol state machine. The result is associated to
/// the path whose PID was used when calling a function that yielded this result.
///
/// When handling this result, run the following steps:
///
/// 1. Let `path` be the context of the path whose PID was used in the function call that yielded this result.
/// 2. If the current phase is the _handshake and nominaton phase_:
/// 1. If `incoming_ulp_data` is present, abort the protocol due to an error and abort these steps.
/// 2. If `outgoing_frame` is present, enqueue it to be sent on `path`.
/// 3. If `state_update` is [`PathStateUpdate::AwaitingNominate`] and the protocol took the role of the
/// nominator, run the _Path Awaiting Nomination Steps_ with `path` and abort these steps.
/// 4. If `state_update` is [`PathStateUpdate::Nominated`]:
/// 1. Mark `path` as _nominated_.
/// 2. If the protocol took the role of the nominee, mark all other paths except `path` as _disregarded_
/// and close them (for WebSocket, use close code `1000`).
/// 3. If the protocol took the rule of the nominator, mark all other paths except `path` as
/// _disregarded_ but keep them open with a timeout of 15s after which they should be closed (for
/// WebSocket, use close code `1000`).[^close-race]
/// 4. Enter the _ULP phase_. At this point, the ULP may start creating frames through
/// [`RendezvousProtocol::create_ulp_frame`].
/// 5. (Unreachable)
/// 3. If the current phase is the _ULP phase_:
/// 1. If `path` is not marked as _nominated_, abort the protocol due to an error and abort these steps.
/// 2. If `state_update` is present, abort the protocol due to an error and abort these steps.
/// 3. If `outgoing_frame` is present, enqueue it to be sent on `path`.
/// 4. If `incoming_ulp_data` is present, hand it off to the ULP.
/// 4. (Unreachable)
///
/// [^close-race]: This prevents a race condition between RID nominating a path and path close
/// detection on RRD's side.
#[derive(Debug)]
pub struct PathProcessResult {
/// The path's state updated.
pub state_update: Option<PathStateUpdate>,
/// An outgoing frame is ready to be sent on the path.
pub outgoing_frame: Option<OutgoingFrame>,
/// An incoming frame has been reassembled and is ready to be handed off to the ULP.
pub incoming_ulp_data: Option<Vec<u8>>,
}
/// 16 byte random authentication challenge
struct Challenge([u8; 16]);
impl Challenge {
fn random() -> Self {
let mut challenge = Self([0_u8; 16]);
rand::thread_rng().fill(&mut challenge.0);
challenge
}
}
/// Ephemeral transport key (ETK).
struct EphemeralTransportKey(x25519::SharedSecretHSalsa20);
/// Protocol context passed around to the various roles and states.
struct Context {
is_nominator: bool,
ak: AuthenticationKey,
}
impl Context {
fn new(is_nominator: bool, ak: AuthenticationKey) -> Self {
Self { is_nominator, ak }
}
}
/// Path states of RID.
#[derive(VariantNames, DebugVariantNames)]
enum RidPathState {
/// Briefly used internally when moving from one state to another.
Invalid,
/// Awaiting an `RrdToRid.Hello` to start the handshake.
AwaitingHello { authentication_keys: rxdak::ForRid },
/// Sent an `RidToRrd.AuthHello`, awaiting an `RrdToRid.Auth`.
AwaitingAuth {
authentication_keys: rxdak::ForRid,
sent_at: Instant,
local_challenge: Challenge,
shared_etk: EphemeralTransportKey,
},
/// Expecting a `Nominate` to be sent or received (depending on the configuration).
AwaitingNominate {
transport_keys: rxdtk::ForRid,
rph: RendezvousPathHash,
},
/// The connection path was `Nominate`d and can now be used by the ULP.
Nominated { transport_keys: rxdtk::ForRid },
/// The connection path closed.
Closed,
}
/// RID path protocol.
struct RidPath {
pid: u32,
decoder: FrameDecoder,
state: RidPathState,
}
impl RidPath {
fn new(ak: &AuthenticationKey, pid: u32) -> Self {
Self {
pid,
decoder: FrameDecoder::new(vec![]),
state: RidPathState::AwaitingHello {
authentication_keys: rxdak::ForRid::new(ak, pid),
},
}
}
fn process_frame(
&mut self,
ctx: &Context,
mut incoming_frame: IncomingFrame,
) -> Result<PathProcessResult, RendezvousProtocolError> {
trace!(state = ?self.state, ?incoming_frame, "Processing frame");
if let RidPathState::Nominated { transport_keys } = &mut self.state {
// Handle `Nominated` state where the transport can be used by the ULP.
Self::handle_ulp_data(transport_keys, incoming_frame).map(|incoming_ulp_data| PathProcessResult {
state_update: None,
outgoing_frame: None,
incoming_ulp_data: Some(incoming_ulp_data),
})
} else {
// Handle `Closed` state
if let RidPathState::Closed = &self.state {
return Err(RendezvousProtocolError::PathClosed(self.pid));
}
// Handle states that should immediately transition into another state
//
// IMPORTANT: All match arms must be infallible!
match core::mem::replace(&mut self.state, RidPathState::Invalid) {
RidPathState::AwaitingHello {
mut authentication_keys,
} => {
// Handle `RrdToRid.Hello`, create `RidToRrd.AuthHello` and update state.
Self::handle_hello(&mut authentication_keys, &mut incoming_frame).map(
|(local_challenge, shared_etk, outgoing_frame)| {
(
RidPathState::AwaitingAuth {
authentication_keys,
sent_at: Instant::now(),
local_challenge,
shared_etk,
},
PathProcessResult {
state_update: None,
outgoing_frame: Some(outgoing_frame),
incoming_ulp_data: None,
},
)
},
)
},
RidPathState::AwaitingAuth {
mut authentication_keys,
sent_at,
local_challenge,
shared_etk,
} => {
// Calculate RTT
let measured_rtt = Instant::now().duration_since(sent_at);
// Handle `RrdToRid.Auth` and update state.
Self::handle_auth(&mut authentication_keys, &local_challenge, &mut incoming_frame).map(
|()| {
let (transport_keys, rph) =
rxdtk::ForRid::new(&ctx.ak, authentication_keys, shared_etk);
(
RidPathState::AwaitingNominate { transport_keys, rph },
PathProcessResult {
state_update: Some(PathStateUpdate::AwaitingNominate { measured_rtt }),
outgoing_frame: None,
incoming_ulp_data: None,
},
)
},
)
},
RidPathState::AwaitingNominate {
mut transport_keys,
rph,
} => {
// Check if the remote side is allowed to `Nominate`.
if ctx.is_nominator {
return Err(RendezvousProtocolError::UnexpectedFrame);
}
// Handle `Nominate` and update state.
Self::handle_nominate(&mut transport_keys, &mut incoming_frame).map(|()| {
(
RidPathState::Nominated { transport_keys },
PathProcessResult {
state_update: Some(PathStateUpdate::Nominated { rph }),
outgoing_frame: None,
incoming_ulp_data: None,
},
)
})
},
// States that must have been covered by code above
RidPathState::Invalid | RidPathState::Nominated { .. } | RidPathState::Closed => {
unreachable!("State should have been handled")
},
}
.map(|(state, result)| {
self.state = state;
debug!(state = ?self.state, "Changed state");
result
})
}
.map_err(|error| {
self.state = RidPathState::Closed;
warn!(?error, state = ?self.state, "Closed due to error");
error
})
}
fn nominate(&mut self) -> Result<PathProcessResult, RendezvousProtocolError> {
// Ensure we are in the correct state to nominate
if !matches!(&self.state, RidPathState::AwaitingNominate { .. }) {
return Err(RendezvousProtocolError::InvalidStateForNomination(
self.state.variant_name(),
));
}
// Nominate
if let RidPathState::AwaitingNominate {
mut transport_keys,
rph,
} = core::mem::replace(&mut self.state, RidPathState::Invalid)
{
Self::create_nominate(&mut transport_keys).map(|outgoing_frame| {
(
RidPathState::Nominated { transport_keys },
PathProcessResult {
state_update: Some(PathStateUpdate::Nominated { rph }),
outgoing_frame: Some(outgoing_frame),
incoming_ulp_data: None,
},
)
})
} else {
unreachable!("Expected AwaitingNominate state")
}
.map(|(state, result)| {
self.state = state;
debug!(state = ?self.state, "Changed state");
result
})
.map_err(|error| {
self.state = RidPathState::Closed;
warn!(?error, state = ?self.state, "Closed due to error");
error
})
}
fn create_ulp_frame(
&mut self,
outgoing_data: Vec<u8>,
) -> Result<PathProcessResult, RendezvousProtocolError> {
match &mut self.state {
RidPathState::Nominated { transport_keys } => {
Self::create_ulp_data(transport_keys, outgoing_data).map(|outgoing_frame| PathProcessResult {
state_update: None,
outgoing_frame: Some(outgoing_frame),
incoming_ulp_data: None,
})
},
_ => Err(RendezvousProtocolError::NominationRequired),
}
}
fn handle_hello(
keys: &mut rxdak::ForRid,
incoming_frame: &mut IncomingFrame,
) -> Result<(Challenge, EphemeralTransportKey, OutgoingFrame), RendezvousProtocolError> {
// Decrypt and decode into a `RrdToRid.Hello`
let (remote_challenge, remote_etk) = {
keys.rrdak.decrypt(&mut incoming_frame.0)?;
let hello = protobuf::handshake::rrd_to_rid::Hello::decode(incoming_frame.0.as_ref())?;
// Validate `RrdToRid.Hello`
let remote_challenge = Challenge(hello.challenge.as_slice().try_into().map_err(|_| {
RendezvousProtocolError::InvalidRrdHelloMessage(format!(
"Expected 16 challenge bytes, got {}",
hello.challenge.len()
))
})?);
let remote_etk =
x25519::PublicKey::from(<[u8; 32]>::try_from(hello.etk.as_ref()).map_err(|_| {
RendezvousProtocolError::InvalidRrdHelloMessage(format!(
"Invalid remote ETK, got {} bytes",
hello.etk.len()
))
})?);
(remote_challenge, remote_etk)
};
// Encode and encrypt `RidToRrd.AuthHello`
let (local_challenge, shared_etk, outgoing_frame) = {
// Generate a challenge
let local_challenge = Challenge::random();
// Generate local part of ETK
let local_etk = x25519::EphemeralSecret::random_from_rng(rand::thread_rng());
// Encode and encrypt `RidToRrd.AuthHello`
let local_auth_hello = protobuf::handshake::rid_to_rrd::AuthHello {
response: remote_challenge.0.to_vec(),
challenge: local_challenge.0.to_vec(),
etk: x25519::PublicKey::from(&local_etk).as_bytes().to_vec(),
};
let mut outgoing_data = local_auth_hello.encode_to_vec();
keys.ridak.encrypt(&mut outgoing_data)?;
// Derive ETK
let shared_etk = EphemeralTransportKey(local_etk.diffie_hellman(&remote_etk).into());
(local_challenge, shared_etk, OutgoingFrame(outgoing_data))
};
// Done
Ok((local_challenge, shared_etk, outgoing_frame))
}
fn handle_auth(
keys: &mut rxdak::ForRid,
local_challenge: &Challenge,
incoming_frame: &mut IncomingFrame,
) -> Result<(), RendezvousProtocolError> {
// Decrypt and decode into a `RrdToRid.Auth`
keys.rrdak.decrypt(&mut incoming_frame.0)?;
let remote_auth = protobuf::handshake::rrd_to_rid::Auth::decode(incoming_frame.0.as_ref())?;
// Validate `RrdToRid.Auth`
if remote_auth.response.as_ref() != local_challenge.0 {
return Err(RendezvousProtocolError::InvalidRrdAuthMessage(format!(
"Challenge response of {} bytes does not match",
remote_auth.response.len()
)));
}
// Done
Ok(())
}
fn create_nominate(keys: &mut rxdtk::ForRid) -> Result<OutgoingFrame, RendezvousProtocolError> {
// Encode and encrypt a `Nominate`
let local_nominate = protobuf::Nominate {};
let mut outgoing_data = local_nominate.encode_to_vec();
keys.ridtk.encrypt(&mut outgoing_data)?;
Ok(OutgoingFrame(outgoing_data))
}
fn handle_nominate(
keys: &mut rxdtk::ForRid,
incoming_frame: &mut IncomingFrame,
) -> Result<(), RendezvousProtocolError> {
// Decrypt and decode into a `Nominate`
keys.rrdtk.decrypt(&mut incoming_frame.0)?;
let _ = protobuf::Nominate::decode(incoming_frame.0.as_ref())?;
Ok(())
}
fn create_ulp_data(
keys: &mut rxdtk::ForRid,
mut outgoing_data: Vec<u8>,
) -> Result<OutgoingFrame, RendezvousProtocolError> {
// Encode and encrypt ULP data
keys.ridtk.encrypt(&mut outgoing_data)?;
Ok(OutgoingFrame(outgoing_data))
}
fn handle_ulp_data(
keys: &mut rxdtk::ForRid,
mut incoming_frame: IncomingFrame,
) -> Result<Vec<u8>, RendezvousProtocolError> {
// Decrypt and decode ULP data
keys.rrdtk.decrypt(&mut incoming_frame.0)?;
Ok(incoming_frame.0)
}
}
/// Path states of RID.
#[derive(VariantNames, DebugVariantNames)]
enum RrdPathState {
/// Briefly used internally when moving from one state to another.
Invalid,
/// Sent an `RrdtoRid.Hello`, awaiting an `RidToRrd.AuthHello`.
AwaitingAuthHello {
authentication_keys: rxdak::ForRrd,
sent_at: Instant,
local_challenge: Challenge,
local_etk: x25519::EphemeralSecret,
},
/// Expecting a `Nominate` to be sent or received (depending on the configuration).
AwaitingNominate {
transport_keys: rxdtk::ForRrd,
rph: RendezvousPathHash,
},
/// The connection path was `Nominate`d and can now be used by the ULP.
Nominated { transport_keys: rxdtk::ForRrd },
/// The connection path closed.
Closed,
}
/// RRD path protocol.
struct RrdPath {
pid: u32,
decoder: FrameDecoder,
state: RrdPathState,
}
impl RrdPath {
fn new(ak: &AuthenticationKey, pid: u32) -> (Self, OutgoingFrame) {
// Create initial state
let mut authentication_keys = rxdak::ForRrd::new(ak, pid);
let (local_challenge, local_etk, outgoing_frame) = Self::create_hello(&mut authentication_keys);
// Create path
let path = Self {
pid,
decoder: FrameDecoder::new(vec![]),
state: RrdPathState::AwaitingAuthHello {
authentication_keys,
sent_at: Instant::now(),
local_challenge,
local_etk,
},
};
(path, outgoing_frame)
}
fn process_frame(
&mut self,
ctx: &Context,
mut incoming_frame: IncomingFrame,
) -> Result<PathProcessResult, RendezvousProtocolError> {
trace!(state = ?self.state, ?incoming_frame, "Processing frame");
if let RrdPathState::Nominated { transport_keys } = &mut self.state {
// Handle `Nominated` state where the transport can be used by the ULP.
Self::handle_ulp_data(transport_keys, incoming_frame).map(|incoming_ulp_data| PathProcessResult {
state_update: None,
outgoing_frame: None,
incoming_ulp_data: Some(incoming_ulp_data),
})
} else {
// Handle `Closed` state
if let RrdPathState::Closed = &self.state {
return Err(RendezvousProtocolError::PathClosed(self.pid));
}
// Handle states that should immediately transition into another state
//
// IMPORTANT: All match arms must be infallible!
match core::mem::replace(&mut self.state, RrdPathState::Invalid) {
RrdPathState::AwaitingAuthHello {
mut authentication_keys,
sent_at,
local_challenge,
local_etk,
} => {
// Calculate RTT
let measured_rtt = Instant::now().duration_since(sent_at);
// Handle `RidToRrd.AuthHello` and update state.
Self::handle_auth_hello(
&mut authentication_keys,
&local_challenge,
local_etk,
&mut incoming_frame,
)
.map(|(shared_etk, outgoing_frame)| {
let (transport_keys, rph) =
rxdtk::ForRrd::new(&ctx.ak, authentication_keys, shared_etk);
(
RrdPathState::AwaitingNominate { transport_keys, rph },
PathProcessResult {
state_update: Some(PathStateUpdate::AwaitingNominate { measured_rtt }),
outgoing_frame: Some(outgoing_frame),
incoming_ulp_data: None,
},
)
})
},
RrdPathState::AwaitingNominate {
mut transport_keys,
rph,
} => {
// Check if the remote side is allowed to `Nominate`.
if ctx.is_nominator {
return Err(RendezvousProtocolError::UnexpectedFrame);
}
// Handle `Nominate` and update state.
Self::handle_nominate(&mut transport_keys, &mut incoming_frame).map(|()| {
(
RrdPathState::Nominated { transport_keys },
PathProcessResult {
state_update: Some(PathStateUpdate::Nominated { rph }),
outgoing_frame: None,
incoming_ulp_data: None,
},
)
})
},
// States that must have been covered by code above
RrdPathState::Invalid | RrdPathState::Nominated { .. } | RrdPathState::Closed => {
unreachable!("State should have been handled")
},
}
.map(|(state, result)| {
self.state = state;
debug!(state = ?self.state, "Changed state");
result
})
}
.map_err(|error| {
self.state = RrdPathState::Closed;
warn!(?error, state = ?self.state, "Closed due to error");
error
})
}
fn nominate(&mut self) -> Result<PathProcessResult, RendezvousProtocolError> {
// Ensure we are in the correct state to nominate
if !matches!(&self.state, RrdPathState::AwaitingNominate { .. }) {
return Err(RendezvousProtocolError::InvalidStateForNomination(
self.state.variant_name(),
));
}
// Nominate
if let RrdPathState::AwaitingNominate {
mut transport_keys,
rph,
} = core::mem::replace(&mut self.state, RrdPathState::Invalid)
{
Self::create_nominate(&mut transport_keys).map(|outgoing_frame| {
(
RrdPathState::Nominated { transport_keys },
PathProcessResult {
state_update: Some(PathStateUpdate::Nominated { rph }),
outgoing_frame: Some(outgoing_frame),
incoming_ulp_data: None,
},
)
})
} else {
unreachable!("Expected AwaitingNominate state")
}
.map(|(state, result)| {
self.state = state;
debug!(state = ?self.state, "Changed state");
result
})
.map_err(|error| {
self.state = RrdPathState::Closed;
warn!(?error, state = ?self.state, "Closed due to error");
error
})
}
fn create_ulp_frame(
&mut self,
outgoing_data: Vec<u8>,
) -> Result<PathProcessResult, RendezvousProtocolError> {
match &mut self.state {
RrdPathState::Nominated { transport_keys } => {
Self::create_ulp_data(transport_keys, outgoing_data).map(|outgoing_frame| PathProcessResult {
state_update: None,
outgoing_frame: Some(outgoing_frame),
incoming_ulp_data: None,
})
},
_ => Err(RendezvousProtocolError::NominationRequired),
}
}
fn create_hello(keys: &mut rxdak::ForRrd) -> (Challenge, x25519::EphemeralSecret, OutgoingFrame) {
// Generate a challenge
let local_challenge = Challenge::random();
// Generate local part of ETK
let local_etk = x25519::EphemeralSecret::random_from_rng(rand::thread_rng());
// Encode and encrypt `RrdToRid.Hello`
let local_hello = protobuf::handshake::rrd_to_rid::Hello {
challenge: local_challenge.0.to_vec(),
etk: x25519::PublicKey::from(&local_etk).as_bytes().to_vec(),
};
let mut outgoing_data = local_hello.encode_to_vec();
keys.rrdak
.encrypt(&mut outgoing_data)
.expect("Encrypting initial RrdToRid.Hello should work");
(local_challenge, local_etk, OutgoingFrame(outgoing_data))
}
fn handle_auth_hello(
keys: &mut rxdak::ForRrd,
local_challenge: &Challenge,
local_etk: x25519::EphemeralSecret,
incoming_frame: &mut IncomingFrame,
) -> Result<(EphemeralTransportKey, OutgoingFrame), RendezvousProtocolError> {
// Decrypt and decode into a `RidToRrd.AuthHello`
let (remote_challenge, remote_etk) = {
keys.ridak.decrypt(&mut incoming_frame.0)?;
let remote_auth_hello =
protobuf::handshake::rid_to_rrd::AuthHello::decode(incoming_frame.0.as_ref())?;
// Validate `RidToRrd.AuthHello`
if remote_auth_hello.response != local_challenge.0 {
return Err(RendezvousProtocolError::InvalidRidAuthHelloMessage(format!(
"Challenge response of {} bytes does not match",
remote_auth_hello.response.len()
)));
}
let remote_challenge =
Challenge(remote_auth_hello.challenge.as_slice().try_into().map_err(|_| {
RendezvousProtocolError::InvalidRrdHelloMessage(format!(
"Expected 16 challenge bytes, got {}",
remote_auth_hello.challenge.len()
))
})?);
let remote_etk = x25519::PublicKey::from(
<[u8; 32]>::try_from(remote_auth_hello.etk.as_ref()).map_err(|_| {
RendezvousProtocolError::InvalidRidAuthHelloMessage(format!(
"Invalid remote ETK, got {} bytes",
remote_auth_hello.etk.len()
))
})?,
);
(remote_challenge, remote_etk)
};
// Encode and encrypt `RrdToRid.Auth`
let (shared_etk, outgoing_frame) = {
// Encode and encrypt `RrdToRid.Auth`
let local_auth = protobuf::handshake::rrd_to_rid::Auth {
response: remote_challenge.0.to_vec(),
};
let mut outgoing_data = local_auth.encode_to_vec();
keys.rrdak.encrypt(&mut outgoing_data)?;
// Derive ETK
let shared_etk = EphemeralTransportKey(local_etk.diffie_hellman(&remote_etk).into());
(shared_etk, OutgoingFrame(outgoing_data))
};
// Done
Ok((shared_etk, outgoing_frame))
}
fn create_nominate(keys: &mut rxdtk::ForRrd) -> Result<OutgoingFrame, RendezvousProtocolError> {
// Encode and encrypt a `Nominate`
let local_nominate = protobuf::Nominate {};
let mut outgoing_data = local_nominate.encode_to_vec();
keys.rrdtk.encrypt(&mut outgoing_data)?;
Ok(OutgoingFrame(outgoing_data))
}
fn handle_nominate(
keys: &mut rxdtk::ForRrd,
incoming_frame: &mut IncomingFrame,
) -> Result<(), RendezvousProtocolError> {
// Decrypt and decode into a `Nominate`
keys.ridtk.decrypt(&mut incoming_frame.0)?;
let _ = protobuf::Nominate::decode(incoming_frame.0.as_ref())?;
Ok(())
}
fn create_ulp_data(
keys: &mut rxdtk::ForRrd,
mut outgoing_data: Vec<u8>,
) -> Result<OutgoingFrame, RendezvousProtocolError> {
// Encode and encrypt ULP data
keys.rrdtk.encrypt(&mut outgoing_data)?;
Ok(OutgoingFrame(outgoing_data))
}
fn handle_ulp_data(
keys: &mut rxdtk::ForRrd,
mut incoming_frame: IncomingFrame,
) -> Result<Vec<u8>, RendezvousProtocolError> {
// Decrypt and decode ULP data
keys.ridtk.decrypt(&mut incoming_frame.0)?;
Ok(incoming_frame.0)
}
}
trait Path: Send {
fn add_chunks(&mut self, chunks: &[&[u8]]) -> Result<(), RendezvousProtocolError>;
fn process_frame(&mut self, ctx: &Context) -> Result<Option<PathProcessResult>, RendezvousProtocolError>;
fn nominate(&mut self) -> Result<PathProcessResult, RendezvousProtocolError>;
fn create_ulp_frame(
&mut self,
outgoing_data: Vec<u8>,
) -> Result<PathProcessResult, RendezvousProtocolError>;
}
#[duplicate_item(
path_type path_state_type;
[ RidPath ] [ RidPathState ];
[ RrdPath ] [ RrdPathState ];
)]
impl Path for path_type {
#[inline]
fn add_chunks(&mut self, chunks: &[&[u8]]) -> Result<(), RendezvousProtocolError> {
let length = self.decoder.add_chunks(chunks);
// Check if the frame exceeds the maximum supported length of this state
let max_length = if matches!(&self.state, path_state_type::Nominated { .. }) {
FrameDecoder::MAX_LENGTH_AFTER_NOMINATION
} else {
FrameDecoder::MAX_LENGTH_BEFORE_NOMINATION
};
if length > max_length {
return Err(RendezvousProtocolError::OversizedFrame(length));
}
Ok(())
}
fn process_frame(&mut self, ctx: &Context) -> Result<Option<PathProcessResult>, RendezvousProtocolError> {
self.decoder
.next_frame_and_then(|incoming_frame| IncomingFrame(incoming_frame.to_vec()))
.map(|incoming_frame| self.process_frame(ctx, incoming_frame))
.transpose()
}
#[inline]
fn nominate(&mut self) -> Result<PathProcessResult, RendezvousProtocolError> {
self.nominate()
}
#[inline]
fn create_ulp_frame(
&mut self,
outgoing_data: Vec<u8>,
) -> Result<PathProcessResult, RendezvousProtocolError> {
self.create_ulp_frame(outgoing_data)
}
}
/// Internal protocol state.
#[derive(DebugVariantNames, VariantNames)]
enum ProtocolState {
/// The paths are currently racing, meaning we are in the _handshake and nomination phase_.
RacingPaths(HashMap<u32, Box<dyn Path>>),
/// One path has been nominated, all other paths have been discarded, meaning we are in the
/// _ULP phase_.
Nominated { pid: u32, path: Box<dyn Path> },
}
/// Connection Rendezvous Protocol state machine.
///
/// The protocol state machine can be constructed from a formerly exchanged a `RendezvousInit` and
/// the associated roles by using [`RendezvousProtocol::new_as_rid`] and
/// [`RendezvousProtocol::new_as_rrd`].
///
/// Any interaction with the protocol state machine that changes the internal state will yield a
/// [`PathProcessResult`] that must be handled according to its documentation.
///
/// The protocol goes through exactly two phases:
///
/// - The _handshake and nomination phase_ where all paths are racing the handshake simultaneously until one
/// has been nominated by the nominator.
/// - The _ULP phase_ where ULP frames can be exchanged on the nominated path.
///
/// The following steps are defined as the _Path Awaiting Nomination Steps_:
///
/// 1. Let `path` be the associated path.
/// 2. If the protocol did not take the role of the nominator, abort the protocol due to an error and abort
/// these steps.
/// 3. If `path` is the only path, run [`RendezvousProtocol::nominate_path`] for `path` and abort these steps.
/// 4. (Unreachable / TODO(LIB-10): As of today, only one path is expected to be used.)
///
/// When a path closed, run the following steps:
///
/// 1. Let `path` be the path that closed (initiated locally or remotely).
/// 2. If `path` is marked as _nominated_, abort the protocol normally with any available close information
/// and abort these steps.
/// 3. If `path` is marked as _disregarded_, log a notice and abort these steps.
/// 4. If `path` is the last path that closed (i.e. all other paths already closed or there are no other
/// paths), log a warning that all paths closed before nomination, abort the protocol normally with any
/// available close information and abort these steps.
/// 5. Log a warning that `path` closed before nomination.
///
/// When receiving data on a path:
///
/// 1. Run [`RendezvousProtocol::add_chunks`] with the respective path's PID.
/// 2. In a loop, run [`RendezvousProtocol::process_frame`] with the respective path's PID and handle the
/// result until it no longer produces a [`PathProcessResult`].
///
/// When the protocol is being aborted:
///
/// 1. Let `cause` be an error or any information associated to normal closure.
/// 2. Log the protocol abort due to `cause` as a notice or an error respectively.
/// 3. Tear down the protocol state machine.
/// 4. Close all remaining paths exceptionally.
/// 5. Hand off `cause` to the ULP.
#[derive(Educe)]
#[educe(Debug)]
pub struct RendezvousProtocol {
#[educe(Debug(ignore))]
ctx: Context,
state: ProtocolState,
}
// TODO(LIB-11): Add construction of the `RendezvousInit` from the paths here.
impl RendezvousProtocol {
/// Create a new Connection Rendezvous Protocol as the Rendezvous Initiator Device (RID) from a
/// formerly exchanged `RendezvousInit`.
///
/// `pids` must contain the set of available pre-initiated paths with their associated Path IDs
/// (PID).
///
/// Returns the protocol state machine instance.
#[tracing::instrument(skip_all, fields(?is_nominator, ?pids))]
pub fn new_as_rid(is_nominator: bool, ak: AuthenticationKey, pids: &[u32]) -> Self {
debug!("Creating D2D rendezvous protocol");
let ctx = Context::new(is_nominator, ak);
// Create paths
let racing_paths = pids
.iter()
.map(|pid| {
let path = RidPath::new(&ctx.ak, *pid);
(*pid, Box::new(path) as Box<dyn Path>)
})
.collect();
// Create protocol
Self {
ctx,
state: ProtocolState::RacingPaths(racing_paths),
}
}
/// Create a new Connection Rendezvous Protocol as the Rendezvous Responder Device (RRD) from a
/// formerly exchanged `RendezvousInit`.
///
/// `pids` must contain the set of available pre-initiated paths with their associated Path IDs
/// (PID).
///
/// Returns a tuple of the protocol state machine instance and a list of PIDs and outgoing
/// frames to be enqueued on the respective paths immediately.
#[tracing::instrument(skip_all, fields(?is_nominator, ?pids))]
pub fn new_as_rrd(
is_nominator: bool,
ak: AuthenticationKey,
pids: &[u32],
) -> (Self, Vec<(u32, OutgoingFrame)>) {
debug!("Creating protocol");
let ctx = Context::new(is_nominator, ak);
let mut outgoing_frames = vec![];
// Create paths
let racing_paths = pids
.iter()
.map(|pid| {
let (path, outgoing_frame) = RrdPath::new(&ctx.ak, *pid);
outgoing_frames.push((*pid, outgoing_frame));
(*pid, Box::new(path) as Box<dyn Path>)
})
.collect();
// Create protocol
let protocol = Self {
ctx,
state: ProtocolState::RacingPaths(racing_paths),
};
(protocol, outgoing_frames)
}
/// Return whether the protocol took the role of the nominator.
#[must_use]
pub const fn is_nominator(&self) -> bool {
self.ctx.is_nominator
}
/// Return the nominated path's PID, if available.
#[must_use]
pub const fn nominated_path(&self) -> Option<u32> {
if let ProtocolState::Nominated { pid, .. } = &self.state {
Some(*pid)
} else {
None
}
}
/// Add chunks received on the specified path. The chunks may or may not contain complete frames
/// or even contain multiple complete frames.
///
/// # Errors
///
/// Returns [`RendezvousProtocolError::UnknownOrDroppedPath`] if the path associated to `pid`
/// could not be found.
#[tracing::instrument(skip_all, fields(
?self, ?pid,
chunks_byte_length = chunks.iter().map(|chunk| chunk.len()).sum::<usize>(),
))]
pub fn add_chunks(&mut self, pid: u32, chunks: &[&[u8]]) -> Result<(), RendezvousProtocolError> {
let path = Self::lookup_path(&mut self.state, pid)?;
path.add_chunks(chunks)
}
/// Process any available buffered complete frame for the specified path.
///
/// # Errors
///
/// Returns [`RendezvousProtocolError`] for a plethora of reasons, e.g. if the path associated
/// to `pid` could not be found, an incoming frame could not be decoded or decrypted, or an
/// unexpected message was received, or, as a response to it, another outgoing frame could not
/// be encrypted.
#[tracing::instrument(skip_all, fields(?self, ?pid))]
pub fn process_frame(&mut self, pid: u32) -> Result<Option<PathProcessResult>, RendezvousProtocolError> {
let path = Self::lookup_path(&mut self.state, pid)?;
// Decode and process the next frame, if any can be decoded
let result = path.process_frame(&self.ctx)?;
trace!(?result, "Processed frame");
let Some(result) = result else {
return Ok(result);
};
// Update state if the path was nominated and we are still racing paths
if let (ProtocolState::RacingPaths(racing_paths), Some(PathStateUpdate::Nominated { .. })) =
(&mut self.state, &result.state_update)
{
// Nominate the path
let path = racing_paths
.remove(&pid)
.ok_or(RendezvousProtocolError::UnknownOrDroppedPath(pid))?;
debug!(
dropped_pids = ?racing_paths.keys(),
"Remote nominated, dropping all other paths"
);
self.state = ProtocolState::Nominated { pid, path };
}
// Return the result
Ok(Some(result))
}
/// Nominate a path.
///
/// # Errors
///
/// Returns [`RendezvousProtocolError`] if the protocol did not take the role of the nominator,
/// the path associated to `pid` could not be found, the path is not ready to be nominated or
/// nomination already happened.
#[tracing::instrument(skip_all, fields(?self, ?pid))]
pub fn nominate_path(&mut self, pid: u32) -> Result<PathProcessResult, RendezvousProtocolError> {
// Ensure we're allowed to nominate
if !self.ctx.is_nominator {
return Err(RendezvousProtocolError::NominateNotAllowed);
}
// Nominate if the paths are still racing
match &mut self.state {
ProtocolState::RacingPaths(racing_paths) => {
// Attempt to nominate the path
let mut path = racing_paths
.remove(&pid)
.ok_or(RendezvousProtocolError::UnknownOrDroppedPath(pid))?;
let result = path.nominate()?;
debug!(
dropped_pids = ?racing_paths.keys(),
"Local nominated, dropping all other paths"
);
self.state = ProtocolState::Nominated { pid, path };
Ok(result)
},
ProtocolState::Nominated { pid, .. } => {
// Nomination already happened
Err(RendezvousProtocolError::NominationAlreadyDone(*pid))
},
}
}
/// Create a ULP frame to be encrypted and sent as an outgoing frame on the nominated path.
///
/// # Errors
///
/// Returns [`RendezvousProtocolError`] if nomination of a path is still pending or the ULP
/// frame could not be encrypted or encoded.
#[tracing::instrument(skip_all, fields(?self, outgoing_data_length = outgoing_data.len()))]
pub fn create_ulp_frame(
&mut self,
outgoing_data: Vec<u8>,
) -> Result<PathProcessResult, RendezvousProtocolError> {
match &mut self.state {
ProtocolState::RacingPaths(..) => Err(RendezvousProtocolError::NominationRequired),
ProtocolState::Nominated { path, .. } => path.create_ulp_frame(outgoing_data),
}
}
fn lookup_path(
state: &mut ProtocolState,
pid: u32,
) -> Result<&mut Box<dyn Path>, RendezvousProtocolError> {
// Lookup path based on the current state.
let path = match state {
// The nomination race is still ongoing. Lookup the path by its PID.
ProtocolState::RacingPaths(racing_paths) => racing_paths
.get_mut(&pid)
.ok_or(RendezvousProtocolError::UnknownOrDroppedPath(pid))?,
// There's only one nominated path. Ensure it's the correct one.
ProtocolState::Nominated {
pid: nominated_pid,
path,
} => {
if pid != *nominated_pid {
return Err(RendezvousProtocolError::UnknownOrDroppedPath(pid));
}
path
},
};
Ok(path)
}
}
[Dauer der Verarbeitung: 0.31 Sekunden, vorverarbeitet 2026-04-27]
|
2026-05-26
|
|
|
|
|