/* * Copyright 2004 The WebRTC Project Authors. All rights reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree.
*/
using ::webrtc::MutexLock; using ::webrtc::TaskQueueBase; using ::webrtc::TimeDelta;
#ifdefined(WEBRTC_WIN) const in_addr kInitialNextIPv4 = {{{0x01, 0, 0, 0}}}; #else // This value is entirely arbitrary, hence the lack of concern about endianness. const in_addr kInitialNextIPv4 = {0x01000000}; #endif // Starts at ::2 so as to not cause confusion with ::1. const in6_addr kInitialNextIPv6 = {
{{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2}}};
const uint32_t UDP_HEADER_SIZE = 28; // IP + UDP headers const uint32_t TCP_HEADER_SIZE = 40; // IP + TCP headers const uint32_t TCP_MSS = 1400; // Maximum segment size
// Note: The current algorithm doesn't work for sample sizes smaller than this. constint NUM_SAMPLES = 1000;
// Packets are passed between sockets as messages. We copy the data just like // the kernel does. class VirtualSocketPacket { public:
VirtualSocketPacket(constchar* data, size_t size, const SocketAddress& from)
: size_(size), consumed_(0), from_(from) {
RTC_DCHECK(nullptr != data);
data_ = newchar[size_];
memcpy(data_, data, size_);
}
VirtualSocket::SafetyBlock::~SafetyBlock() { // Ensure `SetNotAlive` was called and there is nothing left to cleanup.
RTC_DCHECK(!alive_);
RTC_DCHECK(posted_connects_.empty());
RTC_DCHECK(recv_buffer_.empty());
RTC_DCHECK(!listen_queue_.has_value());
}
// Cancel potential connects for (const SocketAddress& remote_addr : posted_connects_) { // Lookup remote side.
VirtualSocket* lookup_socket =
server->LookupConnection(local_addr, remote_addr); if (lookup_socket) { // Server socket, remote side is a socket retreived by accept. Accepted // sockets are not bound so we will not find it by looking in the // bindings table.
server->Disconnect(lookup_socket);
server->RemoveConnection(local_addr, remote_addr);
} else {
server->Disconnect(remote_addr);
}
}
posted_connects_.clear();
int VirtualSocket::Close() { if (!local_addr_.IsNil() && bound_) { // Remove from the binding table.
server_->Unbind(local_addr_, this);
bound_ = false;
}
int VirtualSocket::RecvFrom(void* pv,
size_t cb,
SocketAddress* paddr,
int64_t* timestamp) { if (timestamp) {
*timestamp = -1;
}
int data_read = safety_->RecvFrom(pv, cb, *paddr); if (data_read < 0) {
error_ = EAGAIN; return -1;
}
if (type_ == SOCK_STREAM) { bool was_full = (recv_buffer_size_ == server_->recv_buffer_capacity());
recv_buffer_size_ -= data_read; if (was_full) {
server_->SendTcp(remote_addr_);
}
}
return data_read;
}
int VirtualSocket::SafetyBlock::RecvFrom(void* buffer,
size_t size,
SocketAddress& addr) {
MutexLock lock(&mutex_); // If we don't have a packet, then either error or wait for one to arrive. if (recv_buffer_.empty()) { return -1;
}
// Return the packet at the front of the queue.
VirtualSocketPacket& packet = *recv_buffer_.front();
size_t data_read = std::min(size, packet.size());
memcpy(buffer, packet.data(), data_read);
addr = packet.from();
VirtualSocket* VirtualSocket::Accept(SocketAddress* paddr) {
SafetyBlock::AcceptResult result = safety_->Accept(); if (result.error != 0) {
error_ = result.error; return nullptr;
} if (paddr) {
*paddr = result.remote_addr;
} return result.socket.release();
}
VirtualSocket::SafetyBlock::AcceptResult VirtualSocket::SafetyBlock::Accept() {
AcceptResult result;
MutexLock lock(&mutex_);
RTC_DCHECK(alive_); if (!listen_queue_.has_value()) {
result.error = EINVAL; return result;
} while (!listen_queue_->empty()) { auto socket = std::make_unique<VirtualSocket>(socket_.server_, AF_INET,
socket_.type_);
// Set the new local address to the same as this server socket.
socket->SetLocalAddress(socket_.local_addr_); // Sockets made from a socket that 'was Any' need to inherit that.
socket->set_was_any(socket_.was_any());
SocketAddress remote_addr = listen_queue_->front();
listen_queue_->pop_front(); if (socket->InitiateConnect(remote_addr, false) != 0) { continue;
}
socket->CompleteConnect(remote_addr);
result.socket = std::move(socket);
result.remote_addr = remote_addr; return result;
}
result.error = EWOULDBLOCK; return result;
}
int VirtualSocket::GetError() const { return error_;
}
MutexLock lock(&mutex_);
RTC_DCHECK(alive_); // Save addresses of the pending connects to allow propertly disconnect them // if socket closes before delayed task below runs. // `posted_connects_` is an std::list, thus its iterators are valid while the // element is in the list. It can be removed either in the `Connect` just // below or by calling SetNotAlive function, thus inside `Connect` `it` should // be valid when alive_ == true. auto it = posted_connects_.insert(posted_connects_.end(), remote_addr); auto task = [safety = std::move(safety), it] { switch (safety->Connect(it)) { case Signal::kNone: break; case Signal::kReadEvent:
safety->socket_.SignalReadEvent(&safety->socket_); break; case Signal::kConnectEvent:
safety->socket_.SignalConnectEvent(&safety->socket_); break;
}
};
socket_.server_->msg_queue_->PostDelayedTask(std::move(task), delay);
}
int VirtualSocket::SendUdp(constvoid* pv,
size_t cb, const SocketAddress& addr) { // If we have not been assigned a local port, then get one. if (local_addr_.IsNil()) {
local_addr_ = server_->AssignBindAddress(
EmptySocketAddressWithFamily(addr.ipaddr().family())); int result = server_->Bind(this, local_addr_); if (result != 0) {
local_addr_.Clear();
error_ = EADDRINUSE; return result;
}
}
// Send the data in a message to the appropriate socket. return server_->SendUdp(this, static_cast<constchar*>(pv), cb, addr);
}
void VirtualSocket::OnSocketServerReadyToSend() { if (ready_to_send_) { // This socket didn't encounter EWOULDBLOCK, so there's nothing to do. return;
} if (type_ == SOCK_DGRAM) {
ready_to_send_ = true;
SignalWriteEvent(this);
} else {
RTC_DCHECK(type_ == SOCK_STREAM); // This will attempt to empty the full send buffer, and will fire // SignalWriteEvent if successful.
server_->SendTcp(this);
}
}
void VirtualSocket::UpdateSend(size_t data_size) {
size_t new_buffer_size = send_buffer_.size() - data_size; // Avoid undefined access beyond the last element of the vector. // This only happens when new_buffer_size is 0. if (data_size < send_buffer_.size()) { // memmove is required for potentially overlapping source/destination.
memmove(&send_buffer_[0], &send_buffer_[data_size], new_buffer_size);
}
send_buffer_.resize(new_buffer_size);
}
int64_t VirtualSocket::UpdateOrderedDelivery(int64_t ts) { // Ensure that new packets arrive after previous ones
ts = std::max(ts, last_delivery_time_); // A socket should not have both ordered and unordered delivery, so its last // delivery time only needs to be updated when it has ordered delivery.
last_delivery_time_ = ts; return ts;
}
void VirtualSocketServer::SetSendingBlocked(bool blocked) {
{
webrtc::MutexLock lock(&mutex_); if (blocked == sending_blocked_) { // Unchanged; nothing to do. return;
}
sending_blocked_ = blocked;
} if (!blocked) { // Sending was blocked, but is now unblocked. This signal gives sockets a // chance to fire SignalWriteEvent, and for TCP, send buffered data.
SignalReadyToSend();
}
}
bool VirtualSocketServer::Wait(webrtc::TimeDelta max_wait_duration, bool process_io) {
RTC_DCHECK_RUN_ON(msg_queue_); if (stop_on_idle_ && Thread::Current()->empty()) { returnfalse;
} // Note: we don't need to do anything with `process_io` since we don't have // any real I/O. Received packets come in the form of queued messages, so // Thread will ensure WakeUp is called if another thread sends a // packet.
wakeup_.Wait(max_wait_duration); returntrue;
}
bool VirtualSocketServer::ProcessMessagesUntilIdle() {
RTC_DCHECK_RUN_ON(msg_queue_);
stop_on_idle_ = true; while (!msg_queue_->empty()) { if (fake_clock_) { // If using a fake clock, advance it in millisecond increments until the // queue is empty.
fake_clock_->AdvanceTime(webrtc::TimeDelta::Millis(1));
} else { // Otherwise, run a normal message loop.
msg_queue_->ProcessMessages(Thread::kForever);
}
}
stop_on_idle_ = false; return !msg_queue_->IsQuitting();
}
bool VirtualSocketServer::CloseTcpConnections( const SocketAddress& addr_local, const SocketAddress& addr_remote) {
VirtualSocket* socket = LookupConnection(addr_local, addr_remote); if (!socket) { returnfalse;
} // Signal the close event on the local connection first.
socket->SignalCloseEvent(socket, 0);
// Trigger the remote connection's close event.
socket->Close();
returntrue;
}
int VirtualSocketServer::Bind(VirtualSocket* socket, const SocketAddress& addr) {
RTC_DCHECK(nullptr != socket); // Address must be completely specified at this point
RTC_DCHECK(!IPIsUnspec(addr.ipaddr()));
RTC_DCHECK(addr.port() != 0);
// Normalize the address (turns v6-mapped addresses into v4-addresses).
SocketAddress normalized(addr.ipaddr().Normalized(), addr.port());
// Normalize the IP.
SocketAddress addr;
addr.SetIP(app_addr.ipaddr().Normalized());
// If the IP appears in `alternative_address_mapping_`, meaning the test has // configured sockets bound to this IP to actually use another IP, replace // the IP here. auto alternative = alternative_address_mapping_.find(addr.ipaddr()); if (alternative != alternative_address_mapping_.end()) {
addr.SetIP(alternative->second);
}
if (app_addr.port() != 0) {
addr.SetPort(app_addr.port());
} else { // Assign a port. for (int i = 0; i < kEphemeralPortCount; ++i) {
addr.SetPort(GetNextPort()); if (bindings_->find(addr) == bindings_->end()) { break;
}
}
}
return addr;
}
VirtualSocket* VirtualSocketServer::LookupBinding(const SocketAddress& addr) {
SocketAddress normalized(addr.ipaddr().Normalized(), addr.port());
AddressMap::iterator it = bindings_->find(normalized); if (it != bindings_->end()) { return it->second;
}
IPAddress default_ip = GetDefaultSourceAddress(addr.ipaddr().family()); if (!IPIsUnspec(default_ip) && addr.ipaddr() == default_ip) { // If we can't find a binding for the packet which is sent to the interface // corresponding to the default route, it should match a binding with the // correct port to the any address.
SocketAddress sock_addr =
EmptySocketAddressWithFamily(addr.ipaddr().family());
sock_addr.SetPort(addr.port()); return LookupBinding(sock_addr);
}
int VirtualSocketServer::Connect(VirtualSocket* socket, const SocketAddress& remote_addr, bool use_delay) {
RTC_DCHECK(msg_queue_);
TimeDelta delay = TimeDelta::Millis(use_delay ? GetTransitDelay(socket) : 0);
VirtualSocket* remote = LookupBinding(remote_addr); if (!CanInteractWith(socket, remote)) {
RTC_LOG(LS_INFO) << "Address family mismatch between "
<< socket->GetLocalAddress().ToString() << " and "
<< remote_addr.ToString(); return -1;
} if (remote != nullptr) {
remote->PostConnect(delay, socket->GetLocalAddress());
} else {
RTC_LOG(LS_INFO) << "No one listening at " << remote_addr.ToString();
socket->PostDisconnect(delay);
} return 0;
}
bool VirtualSocketServer::Disconnect(VirtualSocket* socket) { if (!socket || !msg_queue_) returnfalse;
// If we simulate packets being delayed, we should simulate the // equivalent of a FIN being delayed as well.
socket->PostDisconnect(TimeDelta::Millis(GetTransitDelay(socket))); returntrue;
}
bool VirtualSocketServer::Disconnect(const SocketAddress& local_addr, const SocketAddress& remote_addr) { // Disconnect remote socket, check if it is a child of a server socket.
VirtualSocket* socket = LookupConnection(local_addr, remote_addr); if (!socket) { // Not a server socket child, then see if it is bound. // TODO(tbd): If this is indeed a server socket that has no // children this will cause the server socket to be // closed. This might lead to unexpected results, how to fix this?
socket = LookupBinding(remote_addr);
}
Disconnect(socket);
// Remove mapping for both directions.
RemoveConnection(remote_addr, local_addr);
RemoveConnection(local_addr, remote_addr); return socket != nullptr;
}
// See if we want to drop this packet. if (data_size > max_udp_payload_) {
RTC_LOG(LS_VERBOSE) << "Dropping too large UDP payload of size "
<< data_size << ", UDP payload limit is "
<< max_udp_payload_; // Return as if send was successful; packet disappears. return data_size;
}
if (Random() < drop_prob_) {
RTC_LOG(LS_VERBOSE) << "Dropping packet: bad luck"; returnstatic_cast<int>(data_size);
}
}
VirtualSocket* recipient = LookupBinding(remote_addr); if (!recipient) { // Make a fake recipient for address family checking.
std::unique_ptr<VirtualSocket> dummy_socket(
CreateSocket(AF_INET, SOCK_DGRAM));
dummy_socket->SetLocalAddress(remote_addr); if (!CanInteractWith(socket, dummy_socket.get())) {
RTC_LOG(LS_VERBOSE) << "Incompatible address families: "
<< socket->GetLocalAddress().ToString() << " and "
<< remote_addr.ToString(); return -1;
}
RTC_LOG(LS_VERBOSE) << "No one listening at " << remote_addr.ToString(); returnstatic_cast<int>(data_size);
}
// Determine whether we have enough bandwidth to accept this packet. To do // this, we need to update the send queue. Once we know it's current size, // we know whether we can fit this packet. // // NOTE: There are better algorithms for maintaining such a queue (such as // "Derivative Random Drop"); however, this algorithm is a more accurate // simulation of what a normal network would do.
{
webrtc::MutexLock lock(&mutex_);
size_t packet_size = data_size + UDP_HEADER_SIZE; if (network_size + packet_size > network_capacity_) {
RTC_LOG(LS_VERBOSE) << "Dropping packet: network capacity exceeded"; returnstatic_cast<int>(data_size);
}
}
void VirtualSocketServer::SendTcp(VirtualSocket* socket) {
{
webrtc::MutexLock lock(&mutex_);
++sent_packets_; if (sending_blocked_) { // Eventually the socket's buffer will fill and VirtualSocket::SendTcp // will set EWOULDBLOCK. return;
}
}
// TCP can't send more data than will fill up the receiver's buffer. // We track the data that is in the buffer plus data in flight using the // recipient's recv_buffer_size_. Anything beyond that must be stored in the // sender's buffer. We will trigger the buffered data to be sent when data // is read from the recv_buffer.
// Lookup the local/remote pair in the connections table.
VirtualSocket* recipient =
LookupConnection(socket->GetLocalAddress(), socket->GetRemoteAddress()); if (!recipient) {
RTC_LOG(LS_VERBOSE) << "Sending data to no one."; return;
}
// Find the delay for crossing the many virtual hops of the network.
uint32_t transit_delay = GetTransitDelay(sender);
// When the incoming packet is from a binding of the any address, translate it // to the default route here such that the recipient will see the default // route.
SocketAddress sender_addr = sender->GetLocalAddress();
IPAddress default_ip = GetDefaultSourceAddress(sender_addr.ipaddr().family()); if (sender_addr.IsAnyIP() && !IPIsUnspec(default_ip)) {
sender_addr.SetIP(default_ip);
}
std::unique_ptr<VirtualSocketServer::Function>
VirtualSocketServer::CreateDistribution(uint32_t mean,
uint32_t stddev,
uint32_t samples) { auto f = std::make_unique<Function>();
if (0 == stddev) {
f->push_back(Point(mean, 1.0));
} else { double start = 0; if (mean >= 4 * static_cast<double>(stddev))
start = mean - 4 * static_cast<double>(stddev); double end = mean + 4 * static_cast<double>(stddev);
for (uint32_t i = 0; i < samples; i++) { double x = start + (end - start) * i / (samples - 1); double y = Normal(x, mean, stddev);
f->push_back(Point(x, y));
}
} return Resample(Invert(Accumulate(std::move(f))), 0, 1, samples);
}
uint32_t VirtualSocketServer::GetTransitDelay(Socket* socket) { // Use the delay based on the address if it is set. auto iter = delay_by_ip_.find(socket->GetLocalAddress().ipaddr()); if (iter != delay_by_ip_.end()) { returnstatic_cast<uint32_t>(iter->second);
} // Otherwise, use the delay from the distribution distribution.
size_t index = rand() % delay_dist_->size(); double delay = (*delay_dist_)[index].second; // RTC_LOG_F(LS_INFO) << "random[" << index << "] = " << delay; returnstatic_cast<uint32_t>(delay);
}
bool VirtualSocketServer::CanInteractWith(VirtualSocket* local,
VirtualSocket* remote) { if (!local || !remote) { returnfalse;
}
IPAddress local_ip = local->GetLocalAddress().ipaddr();
IPAddress remote_ip = remote->GetLocalAddress().ipaddr();
IPAddress local_normalized = local_ip.Normalized();
IPAddress remote_normalized = remote_ip.Normalized(); // Check if the addresses are the same family after Normalization (turns // mapped IPv6 address into IPv4 addresses). // This will stop unmapped V6 addresses from talking to mapped V6 addresses. if (local_normalized.family() == remote_normalized.family()) { returntrue;
}
// If ip1 is IPv4 and ip2 is :: and ip2 is not IPV6_V6ONLY. int remote_v6_only = 0;
remote->GetOption(Socket::OPT_IPV6_V6ONLY, &remote_v6_only); if (local_ip.family() == AF_INET && !remote_v6_only && IPIsAny(remote_ip)) { returntrue;
} // Same check, backwards. int local_v6_only = 0;
local->GetOption(Socket::OPT_IPV6_V6ONLY, &local_v6_only); if (remote_ip.family() == AF_INET && !local_v6_only && IPIsAny(local_ip)) { returntrue;
}
// Check to see if either socket was explicitly bound to IPv6-any. // These sockets can talk with anyone. if (local_ip.family() == AF_INET6 && local->was_any()) { returntrue;
} if (remote_ip.family() == AF_INET6 && remote->was_any()) { returntrue;
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.