Quellcodebibliothek Statistik Leitseite products/sources/formale Sprachen/C/Firefox/ipc/chromium/src/mojo/core/ports/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 75 kB image not shown  

Quelle  node.cc   Sprache: C

 
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "mojo/core/ports/node.h"

#include <string.h>

#include <algorithm>
#include <atomic>
#include <memory>
#include <utility>
#include <vector>

#include "mozilla/Mutex.h"
#include "mozilla/RandomNum.h"
#include "nsTArray.h"

#include "base/logging.h"
#include "mojo/core/ports/event.h"
#include "mojo/core/ports/node_delegate.h"
#include "mojo/core/ports/port_locker.h"

namespace mojo {
namespace core {
namespace ports {

namespace {

int DebugError(const char* message, int error_code) {
  NOTREACHED() << "Oops: " << message;
  return error_code;
}

#define OOPS(x) DebugError(#x, x)

bool CanAcceptMoreMessages(const Port* port) {
  // Have we already doled out the last message (i.e., do we expect to NOT
  // receive further messages)?
  uint64_t next_sequence_num = port->message_queue.next_sequence_num();
  if (port->state == Port::kClosed) {
    return false;
  }
  if (port->peer_closed || port->remove_proxy_on_last_message) {
    if (port->peer_lost_unexpectedly) {
      return port->message_queue.HasNextMessage();
    }
    if (port->last_sequence_num_to_receive == next_sequence_num - 1) {
      return false;
    }
  }
  return true;
}

void GenerateRandomPortName(PortName* name) {
  // FIXME: Chrome uses a cache to avoid extra calls to the system RNG when
  // generating port names to keep this overhead down. If this method starts
  // showing up on profiles we should consider doing the same.
  *name = PortName{mozilla::RandomUint64OrDie(), mozilla::RandomUint64OrDie()};
}

}  // namespace

Node::Node(const NodeName& name, NodeDelegate* delegate)
    : name_(name), delegate_(this, delegate) {}

Node::~Node() {
  if (!ports_.empty()) {
    DLOG(WARNING) << "Unclean shutdown for node " << name_;
  }
}

bool Node::CanShutdownCleanly(ShutdownPolicy policy) {
  PortLocker::AssertNoPortsLockedOnCurrentThread();
  mozilla::MutexAutoLock ports_lock(ports_lock_);

  if (policy == ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS) {
#ifdef DEBUG
    for (auto& entry : ports_) {
      DVLOG(2) << "Port " << entry.first << " referencing node "
               << entry.second->peer_node_name << " is blocking shutdown of "
               << "node " << name_ << " (state=" << entry.second->state << ")";
    }
#endif
    return ports_.empty();
  }

  DCHECK_EQ(policy, ShutdownPolicy::ALLOW_LOCAL_PORTS);

  // NOTE: This is not efficient, though it probably doesn't need to be since
  // relatively few ports should be open during shutdown and shutdown doesn't
  // need to be blazingly fast.
  bool can_shutdown = true;
  for (auto& entry : ports_) {
    PortRef port_ref(entry.first, entry.second);
    SinglePortLocker locker(&port_ref);
    auto* port = locker.port();
    if (port->peer_node_name != name_ && port->state != Port::kReceiving) {
      can_shutdown = false;
#ifdef DEBUG
      DVLOG(2) << "Port " << entry.first << " referencing node "
               << port->peer_node_name << " is blocking shutdown of "
               << "node " << name_ << " (state=" << port->state << ")";
#else
      // Exit early when not debugging.
      break;
#endif
    }
  }

  return can_shutdown;
}

int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
  PortLocker::AssertNoPortsLockedOnCurrentThread();
  mozilla::MutexAutoLock lock(ports_lock_);
  auto iter = ports_.find(port_name);
  if (iter == ports_.end()) {
    return ERROR_PORT_UNKNOWN;
  }

#if defined(ANDROID) && defined(__aarch64__)
  // Workaround for https://crbug.com/665869.
  std::atomic_thread_fence(std::memory_order_seq_cst);
#endif

  *port_ref = PortRef(port_name, iter->second);
  return OK;
}

int Node::CreateUninitializedPort(PortRef* port_ref) {
  PortName port_name;
  GenerateRandomPortName(&port_name);

  RefPtr<Port> port(new Port(kInitialSequenceNum, kInitialSequenceNum));
  int rv = AddPortWithName(port_name, port);
  if (rv != OK) {
    return rv;
  }

  *port_ref = PortRef(port_name, std::move(port));
  return OK;
}

int Node::InitializePort(const PortRef& port_ref,
                         const NodeName& peer_node_name,
                         const PortName& peer_port_name,
                         const NodeName& prev_node_name,
                         const PortName& prev_port_name) {
  {
    // Must be acquired for UpdatePortPeerAddress below.
    PortLocker::AssertNoPortsLockedOnCurrentThread();
    mozilla::MutexAutoLock ports_lock(ports_lock_);

    SinglePortLocker locker(&port_ref);
    auto* port = locker.port();
    if (port->state != Port::kUninitialized) {
      return ERROR_PORT_STATE_UNEXPECTED;
    }

    port->state = Port::kReceiving;
    UpdatePortPeerAddress(port_ref.name(), port, peer_node_name,
                          peer_port_name);

    port->prev_node_name = prev_node_name;
    port->prev_port_name = prev_port_name;
  }

  delegate_->PortStatusChanged(port_ref);

  return OK;
}

int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
  int rv;

  rv = CreateUninitializedPort(port0_ref);
  if (rv != OK) {
    return rv;
  }

  rv = CreateUninitializedPort(port1_ref);
  if (rv != OK) {
    return rv;
  }

  rv = InitializePort(*port0_ref, name_, port1_ref->name(), name_,
                      port1_ref->name());
  if (rv != OK) {
    return rv;
  }

  rv = InitializePort(*port1_ref, name_, port0_ref->name(), name_,
                      port0_ref->name());
  if (rv != OK) {
    return rv;
  }

  return OK;
}

int Node::SetUserData(const PortRef& port_ref, RefPtr<UserData> user_data) {
  SinglePortLocker locker(&port_ref);
  auto* port = locker.port();
  if (port->state == Port::kClosed) {
    return ERROR_PORT_STATE_UNEXPECTED;
  }

  port->user_data = std::move(user_data);

  return OK;
}

int Node::GetUserData(const PortRef& port_ref, RefPtr<UserData>* user_data) {
  SinglePortLocker locker(&port_ref);
  auto* port = locker.port();
  if (port->state == Port::kClosed) {
    return ERROR_PORT_STATE_UNEXPECTED;
  }

  *user_data = port->user_data;

  return OK;
}

int Node::ClosePort(const PortRef& port_ref) {
  std::vector<mozilla::UniquePtr<UserMessageEvent>> undelivered_messages;
  NodeName peer_node_name;
  PortName peer_port_name;
  uint64_t sequence_num = 0;
  uint64_t last_sequence_num = 0;
  bool was_initialized = false;
  {
    SinglePortLocker locker(&port_ref);
    auto* port = locker.port();
    switch (port->state) {
      case Port::kUninitialized:
        break;

      case Port::kReceiving:
        was_initialized = true;
        port->state = Port::kClosed;

        // We pass along the sequence number of the last message sent from this
        // port to allow the peer to have the opportunity to consume all inbound
        // messages before notifying the embedder that this port is closed.
        last_sequence_num = port->next_sequence_num_to_send - 1;

        peer_node_name = port->peer_node_name;
        peer_port_name = port->peer_port_name;

        sequence_num = port->next_control_sequence_num_to_send++;

        // If the port being closed still has unread messages, then we need to
        // take care to close those ports so as to avoid leaking memory.
        port->message_queue.TakeAllMessages(&undelivered_messages);
        port->TakePendingMessages(undelivered_messages);
        break;

      default:
        return ERROR_PORT_STATE_UNEXPECTED;
    }
  }

  ErasePort(port_ref.name());

  if (was_initialized) {
    DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@"
             << name_ << " to " << peer_port_name << "@" << peer_node_name;
    delegate_->ForwardEvent(
        peer_node_name,
        mozilla::MakeUnique<ObserveClosureEvent>(
            peer_port_name, port_ref.name(), sequence_num, last_sequence_num));
    for (const auto& message : undelivered_messages) {
      for (size_t i = 0; i < message->num_ports(); ++i) {
        PortRef ref;
        if (GetPort(message->ports()[i], &ref) == OK) {
          ClosePort(ref);
        }
      }
    }
  }
  return OK;
}

int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
  SinglePortLocker locker(&port_ref);
  auto* port = locker.port();
  if (port->state != Port::kReceiving) {
    return ERROR_PORT_STATE_UNEXPECTED;
  }

  // TODO(sroettger): include messages pending sender verification here?
  port_status->has_messages = port->message_queue.HasNextMessage();
  port_status->receiving_messages = CanAcceptMoreMessages(port);
  port_status->peer_closed = port->peer_closed;
  port_status->peer_remote = port->peer_node_name != name_;
  port_status->queued_message_count =
      port->message_queue.queued_message_count();
  port_status->queued_num_bytes = port->message_queue.queued_num_bytes();
  port_status->unacknowledged_message_count =
      port->next_sequence_num_to_send - port->last_sequence_num_acknowledged -
      1;

#ifdef FUZZING_SNAPSHOT
  port_status->peer_node_name = port->peer_node_name;
#endif

  return OK;
}

int Node::GetMessage(const PortRef& port_ref,
                     mozilla::UniquePtr<UserMessageEvent>* message,
                     MessageFilter* filter) {
  *message = nullptr;

  DVLOG(4) << "GetMessage for " << port_ref.name() << "@" << name_;

  NodeName peer_node_name;
  ScopedEvent ack_event;
  {
    SinglePortLocker locker(&port_ref);
    auto* port = locker.port();

    // This could also be treated like the port being unknown since the
    // embedder should no longer be referring to a port that has been sent.
    if (port->state != Port::kReceiving) {
      return ERROR_PORT_STATE_UNEXPECTED;
    }

    // Let the embedder get messages until there are no more before reporting
    // that the peer closed its end.
    if (!CanAcceptMoreMessages(port)) {
      return ERROR_PORT_PEER_CLOSED;
    }

    port->message_queue.GetNextMessage(message, filter);
    if (*message &&
        (*message)->sequence_num() == port->sequence_num_to_acknowledge) {
      peer_node_name = port->peer_node_name;
      ack_event = mozilla::MakeUnique<UserMessageReadAckEvent>(
          port->peer_port_name, port_ref.name(),
          port->next_control_sequence_num_to_send++,
          port->sequence_num_to_acknowledge);
    }
    if (*message) {
      // Message will be passed to the user, no need to block the queue.
      port->message_queue.MessageProcessed();
    }
  }

  if (ack_event) {
    delegate_->ForwardEvent(peer_node_name, std::move(ack_event));
  }

  // Allow referenced ports to trigger PortStatusChanged calls.
  if (*message) {
    for (size_t i = 0; i < (*message)->num_ports(); ++i) {
      PortRef new_port_ref;
      int rv = GetPort((*message)->ports()[i], &new_port_ref);

      DCHECK_EQ(OK, rv) << "Port " << new_port_ref.name() << "@" << name_
                        << " does not exist!";

      SinglePortLocker locker(&new_port_ref);
      DCHECK_EQ(locker.port()->state, Port::kReceiving);
      locker.port()->message_queue.set_signalable(true);
    }

    // The user may retransmit this message from another port. We reset the
    // sequence number so that the message will get a new one if that happens.
    (*message)->set_sequence_num(0);
  }

  return OK;
}

int Node::SendUserMessage(const PortRef& port_ref,
                          mozilla::UniquePtr<UserMessageEvent> message) {
  int rv = SendUserMessageInternal(port_ref, &message);
  if (rv != OK) {
    // If send failed, close all carried ports. Note that we're careful not to
    // close the sending port itself if it happened to be one of the encoded
    // ports (an invalid but possible condition.)
    for (size_t i = 0; i < message->num_ports(); ++i) {
      if (message->ports()[i] == port_ref.name()) {
        continue;
      }

      PortRef port;
      if (GetPort(message->ports()[i], &port) == OK) {
        ClosePort(port);
      }
    }
  }
  return rv;
}

int Node::SetAcknowledgeRequestInterval(
    const PortRef& port_ref, uint64_t sequence_num_acknowledge_interval) {
  NodeName peer_node_name;
  PortName peer_port_name;
  uint64_t sequence_num_to_request_ack = 0;
  uint64_t sequence_num = 0;
  {
    SinglePortLocker locker(&port_ref);
    auto* port = locker.port();
    if (port->state != Port::kReceiving) {
      return ERROR_PORT_STATE_UNEXPECTED;
    }

    port->sequence_num_acknowledge_interval = sequence_num_acknowledge_interval;
    if (!sequence_num_acknowledge_interval) {
      return OK;
    }

    peer_node_name = port->peer_node_name;
    peer_port_name = port->peer_port_name;

    sequence_num_to_request_ack = port->last_sequence_num_acknowledged +
                                  sequence_num_acknowledge_interval;
    sequence_num = port->next_control_sequence_num_to_send++;
  }

  delegate_->ForwardEvent(peer_node_name,
                          mozilla::MakeUnique<UserMessageReadAckRequestEvent>(
                              peer_port_name, port_ref.name(), sequence_num,
                              sequence_num_to_request_ack));
  return OK;
}

bool Node::IsEventFromPreviousPeer(const Event& event) {
  switch (event.type()) {
    case Event::Type::kUserMessage:
      return true;
    case Event::Type::kPortAccepted:
      // PortAccepted is sent by the next peer
      return false;
    case Event::Type::kObserveProxy:
      // ObserveProxy with an invalid port name is a broadcast event
      return event.port_name() != kInvalidPortName;
    case Event::Type::kObserveProxyAck:
      return true;
    case Event::Type::kObserveClosure:
      return true;
    case Event::Type::kMergePort:
      // MergePort is not from the previous peer
      return false;
    case Event::Type::kUserMessageReadAckRequest:
      return true;
    case Event::Type::kUserMessageReadAck:
      return true;
    case Event::Type::kUpdatePreviousPeer:
      return true;
    default:
      // No need to check unknown message types since AcceptPeer will return
      // an error.
      return false;
  }
}

int Node::AcceptEventInternal(const PortRef& port_ref,
                              const NodeName& from_node, ScopedEvent event) {
  switch (event->type()) {
    case Event::Type::kUserMessage:
      return OnUserMessage(port_ref, from_node,
                           Event::Cast<UserMessageEvent>(&event));
    case Event::Type::kPortAccepted:
      return OnPortAccepted(port_ref, Event::Cast<PortAcceptedEvent>(&event));
    case Event::Type::kObserveProxy:
      return OnObserveProxy(port_ref, Event::Cast<ObserveProxyEvent>(&event));
    case Event::Type::kObserveProxyAck:
      return OnObserveProxyAck(port_ref,
                               Event::Cast<ObserveProxyAckEvent>(&event));
    case Event::Type::kObserveClosure:
      return OnObserveClosure(port_ref,
                              Event::Cast<ObserveClosureEvent>(&event));
    case Event::Type::kMergePort:
      return OnMergePort(port_ref, Event::Cast<MergePortEvent>(&event));
    case Event::Type::kUserMessageReadAckRequest:
      return OnUserMessageReadAckRequest(
          port_ref, Event::Cast<UserMessageReadAckRequestEvent>(&event));
    case Event::Type::kUserMessageReadAck:
      return OnUserMessageReadAck(port_ref,
                                  Event::Cast<UserMessageReadAckEvent>(&event));
    case Event::Type::kUpdatePreviousPeer:
      return OnUpdatePreviousPeer(port_ref,
                                  Event::Cast<UpdatePreviousPeerEvent>(&event));
  }
  return OOPS(ERROR_NOT_IMPLEMENTED);
}

int Node::AcceptEvent(const NodeName& from_node, ScopedEvent event) {
  PortRef port_ref;
  GetPort(event->port_name(), &port_ref);

  DVLOG(2) << "AcceptEvent type: " << event->type() << ", "
           << event->from_port() << "@" << from_node << " => "
           << port_ref.name() << "@" << name_
           << " seq nr: " << event->control_sequence_num() << " port valid? "
           << port_ref.is_valid();

  if (!IsEventFromPreviousPeer(*event)) {
    DCHECK_EQ(event->control_sequence_num(), kInvalidSequenceNum);
    // Some events are not coming from the previous peer, e.g. broadcasts or
    // PortAccepted events. No need to check the sequence number or sender.
    return AcceptEventInternal(port_ref, from_node, std::move(event));
  }

  DCHECK_NE(event->control_sequence_num(), kInvalidSequenceNum);

  if (!port_ref.is_valid()) {
    // If we don't have a valid port, there's nothing for us to check. However,
    // we pass the ref on to AcceptEventInternal to make sure there's no race
    // where it becomes valid and we skipped the peer check.
    return AcceptEventInternal(port_ref, from_node, std::move(event));
  }

#ifndef FUZZING_SNAPSHOT
  // Before processing the event, verify the sender and sequence number.
  {
    SinglePortLocker locker(&port_ref);
    auto* port = locker.port();
    if (!port->IsNextEvent(from_node, *event)) {
      DVLOG(2) << "Buffering event (type " << event->type()
               << "): " << event->from_port() << "@" << from_node << " => "
               << port_ref.name() << "@" << name_
               << " seq nr: " << event->control_sequence_num() << " / "
               << port->next_control_sequence_num_to_receive << ", want "
               << port->prev_port_name << "@" << port->prev_node_name;

      port->BufferEvent(from_node, std::move(event));
      return OK;
    }
  }
#endif

  int ret = AcceptEventInternal(port_ref, from_node, std::move(event));

  // More events might have been enqueued during processing.
  while (true) {
    ScopedEvent next_event;
    NodeName next_from_node;
    {
      SinglePortLocker locker(&port_ref);
      auto* port = locker.port();
      // We always increment the control sequence number after we finished
      // processing the event. That way we ensure that the events are handled
      // in order without keeping a lock the whole time.
      port->next_control_sequence_num_to_receive++;
      port->NextEvent(&next_from_node, &next_event);

      if (next_event) {
        DVLOG(2) << "Handling buffered event (type " << next_event->type()
                 << "): " << next_event->from_port() << "@" << next_from_node
                 << " => " << port_ref.name() << "@" << name_
                 << " seq nr: " << next_event->control_sequence_num() << " / "
                 << port->next_control_sequence_num_to_receive;
      }
    }
    if (!next_event) {
      break;
    }
    AcceptEventInternal(port_ref, next_from_node, std::move(next_event));
  }

  return ret;
}

int Node::MergePorts(const PortRef& port_ref,
                     const NodeName& destination_node_name,
                     const PortName& destination_port_name) {
  PortName new_port_name;
  Event::PortDescriptor new_port_descriptor;
  PendingUpdatePreviousPeer pending_update_event{.from_port = port_ref.name()};
  {
    // Must be held for ConvertToProxy.
    PortLocker::AssertNoPortsLockedOnCurrentThread();
    mozilla::MutexAutoLock ports_locker(ports_lock_);

    SinglePortLocker locker(&port_ref);

    DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
             << " to " << destination_port_name << "@" << destination_node_name;

    // Send the port-to-merge over to the destination node so it can be merged
    // into the port cycle atomically there.
    new_port_name = port_ref.name();
    ConvertToProxy(locker.port(), destination_node_name, &new_port_name,
                   &new_port_descriptor, &pending_update_event);
  }

  delegate_->ForwardEvent(
      pending_update_event.receiver,
      mozilla::MakeUnique<UpdatePreviousPeerEvent>(
          pending_update_event.port, pending_update_event.from_port,
          pending_update_event.sequence_num, pending_update_event.new_prev_node,
          pending_update_event.new_prev_port));

  if (new_port_descriptor.peer_node_name == name_ &&
      destination_node_name != name_) {
    // Ensure that the locally retained peer of the new proxy gets a status
    // update so it notices that its peer is now remote.
    PortRef local_peer;
    if (GetPort(new_port_descriptor.peer_port_name, &local_peer) == OK) {
      delegate_->PortStatusChanged(local_peer);
    }
  }

  delegate_->ForwardEvent(
      destination_node_name,
      mozilla::MakeUnique<MergePortEvent>(destination_port_name,
                                          kInvalidPortName, kInvalidSequenceNum,
                                          new_port_name, new_port_descriptor));
  return OK;
}

int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) {
  DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_
           << " and " << port1_ref.name() << "@" << name_;
  return MergePortsInternal(port0_ref, port1_ref,
                            true /* allow_close_on_bad_state */);
}

int Node::LostConnectionToNode(const NodeName& node_name) {
  // We can no longer send events to the given node. We also can't expect any
  // PortAccepted events.

  DVLOG(1) << "Observing lost connection from node " << name_ << " to node "
           << node_name;

  DestroyAllPortsWithPeer(node_name, kInvalidPortName);
  return OK;
}

int Node::OnUserMessage(const PortRef& port_ref, const NodeName& from_node,
                        mozilla::UniquePtr<UserMessageEvent> message) {
#ifdef DEBUG
  std::ostringstream ports_buf;
  for (size_t i = 0; i < message->num_ports(); ++i) {
    if (i > 0) {
      ports_buf << ",";
    }
    ports_buf << message->ports()[i];
  }

  DVLOG(4) << "OnUserMessage " << message->sequence_num()
           << " [ports=" << ports_buf.str() << "] at " << message->port_name()
           << "@" << name_;
#endif

  // Even if this port does not exist, cannot receive anymore messages or is
  // buffering or proxying messages, we still need these ports to be bound to
  // this node. When the message is forwarded, these ports will get transferred
  // following the usual method. If the message cannot be accepted, then the
  // newly bound ports will simply be closed.
  if (from_node != name_) {
    for (size_t i = 0; i < message->num_ports(); ++i) {
      Event::PortDescriptor& descriptor = message->port_descriptors()[i];
      int rv = AcceptPort(message->ports()[i], descriptor);
      if (rv != OK) {
        return rv;
      }
    }
  }

  bool has_next_message = false;
  bool message_accepted = false;
  bool should_forward_messages = false;
  if (port_ref.is_valid()) {
    SinglePortLocker locker(&port_ref);
    auto* port = locker.port();

    // Reject spurious messages if we've already received the last expected
    // message.
    if (CanAcceptMoreMessages(port)) {
      message_accepted = true;
      port->message_queue.AcceptMessage(std::move(message), &has_next_message);

      if (port->state == Port::kBuffering) {
        has_next_message = false;
      } else if (port->state == Port::kProxying) {
        has_next_message = false;
        should_forward_messages = true;
      }
    }
  }

  if (should_forward_messages) {
    int rv = ForwardUserMessagesFromProxy(port_ref);
    if (rv != OK) {
      return rv;
    }
    TryRemoveProxy(port_ref);
  }

  if (!message_accepted) {
    DVLOG(2) << "Message not accepted!\n";
    // Close all newly accepted ports as they are effectively orphaned.
    for (size_t i = 0; i < message->num_ports(); ++i) {
      PortRef attached_port_ref;
      if (GetPort(message->ports()[i], &attached_port_ref) == OK) {
        ClosePort(attached_port_ref);
      } else {
        DLOG(WARNING) << "Cannot close non-existent port!\n";
      }
    }
  } else if (has_next_message) {
    delegate_->PortStatusChanged(port_ref);
  }

  return OK;
}

int Node::OnPortAccepted(const PortRef& port_ref,
                         mozilla::UniquePtr<PortAcceptedEvent> event) {
  if (!port_ref.is_valid()) {
    return ERROR_PORT_UNKNOWN;
  }

#ifdef DEBUG
  {
    SinglePortLocker locker(&port_ref);
    DVLOG(2) << "PortAccepted at " << port_ref.name() << "@" << name_
             << " pointing to " << locker.port()->peer_port_name << "@"
             << locker.port()->peer_node_name;
  }
#endif

  return BeginProxying(port_ref);
}

int Node::OnObserveProxy(const PortRef& port_ref,
                         mozilla::UniquePtr<ObserveProxyEvent> event) {
  if (event->port_name() == kInvalidPortName) {
    // An ObserveProxy with an invalid target port name is a broadcast used to
    // inform ports when their peer (which was itself a proxy) has become
    // defunct due to unexpected node disconnection.
    //
    // Receiving ports affected by this treat it as equivalent to peer closure.
    // Proxies affected by this can be removed and will in turn broadcast their
    // own death with a similar message.
    DCHECK_EQ(event->proxy_target_node_name(), kInvalidNodeName);
    DCHECK_EQ(event->proxy_target_port_name(), kInvalidPortName);
    DestroyAllPortsWithPeer(event->proxy_node_name(), event->proxy_port_name());
    return OK;
  }

  // The port may have already been closed locally, in which case the
  // ObserveClosure message will contain the last_sequence_num field.
  // We can then silently ignore this message.
  if (!port_ref.is_valid()) {
    DVLOG(1) << "ObserveProxy: " << event->port_name() << "@" << name_
             << " not found";
    return OK;
  }

  DVLOG(2) << "ObserveProxy at " << port_ref.name() << "@" << name_
           << ", proxy at " << event->proxy_port_name() << "@"
           << event->proxy_node_name() << " pointing to "
           << event->proxy_target_port_name() << "@"
           << event->proxy_target_node_name();

  bool peer_changed = false;
  ScopedEvent event_to_forward;
  NodeName event_target_node;
  {
    // Must be acquired for UpdatePortPeerAddress below.
    PortLocker::AssertNoPortsLockedOnCurrentThread();
    mozilla::MutexAutoLock ports_locker(ports_lock_);

    SinglePortLocker locker(&port_ref);
    auto* port = locker.port();

    if (port->peer_node_name == event->proxy_node_name() &&
        port->peer_port_name == event->proxy_port_name()) {
      if (port->state == Port::kReceiving) {
        // Updating the port peer will reset the sequence num. Grab it now;
        uint64_t sequence_num = port->next_control_sequence_num_to_send++;
        UpdatePortPeerAddress(port_ref.name(), port,
                              event->proxy_target_node_name(),
                              event->proxy_target_port_name());
        event_target_node = event->proxy_node_name();
        event_to_forward = mozilla::MakeUnique<ObserveProxyAckEvent>(
            event->proxy_port_name(), port_ref.name(), sequence_num,
            port->next_sequence_num_to_send - 1);
        peer_changed = true;
        DVLOG(2) << "Forwarding ObserveProxyAck from " << event->port_name()
                 << "@" << name_ << " to " << event->proxy_port_name() << "@"
                 << event_target_node;
      } else {
        // As a proxy ourselves, we don't know how to honor the ObserveProxy
        // event or to populate the last_sequence_num field of ObserveProxyAck.
        // Afterall, another port could be sending messages to our peer now
        // that we've sent out our own ObserveProxy event.  Instead, we will
        // send an ObserveProxyAck indicating that the ObserveProxy event
        // should be re-sent (last_sequence_num set to kInvalidSequenceNum).
        // However, this has to be done after we are removed as a proxy.
        // Otherwise, we might just find ourselves back here again, which
        // would be akin to a busy loop.

        DVLOG(2) << "Delaying ObserveProxyAck to " << event->proxy_port_name()
                 << "@" << event->proxy_node_name();

        port->send_on_proxy_removal =
            mozilla::MakeUnique<std::pair<NodeName, ScopedEvent>>(
                event->proxy_node_name(),
                mozilla::MakeUnique<ObserveProxyAckEvent>(
                    event->proxy_port_name(), port_ref.name(),
                    kInvalidSequenceNum, kInvalidSequenceNum));
      }
    } else {
      // Forward this event along to our peer. Eventually, it should find the
      // port referring to the proxy.
      event_target_node = port->peer_node_name;
      event->set_port_name(port->peer_port_name);
      event->set_from_port(port_ref.name());
      event->set_control_sequence_num(
          port->next_control_sequence_num_to_send++);
      if (port->state == Port::kBuffering) {
        port->control_message_queue.push_back(
            {event_target_node, std::move(event)});
      } else {
        event_to_forward = std::move(event);
      }
    }
  }

  if (event_to_forward) {
    delegate_->ForwardEvent(event_target_node, std::move(event_to_forward));
  }

  if (peer_changed) {
    // Re-send ack and/or ack requests, as the previous peer proxy may not have
    // forwarded the previous request before it died.
    MaybeResendAck(port_ref);
    MaybeResendAckRequest(port_ref);

    delegate_->PortStatusChanged(port_ref);

    if (event->proxy_target_node_name() != name_) {
      delegate_->ObserveRemoteNode(event->proxy_target_node_name());
    }
  }

  return OK;
}

int Node::OnObserveProxyAck(const PortRef& port_ref,
                            mozilla::UniquePtr<ObserveProxyAckEvent> event) {
  DVLOG(2) << "ObserveProxyAck at " << event->port_name() << "@" << name_
           << " (last_sequence_num=" << event->last_sequence_num() << ")";

  if (!port_ref.is_valid()) {
    return ERROR_PORT_UNKNOWN;  // The port may have observed closure first.
  }

  bool try_remove_proxy_immediately;
  bool erase_port = false;
  {
    SinglePortLocker locker(&port_ref);
    auto* port = locker.port();

    if (port->state == Port::kProxying) {
      // If the last sequence number is invalid, this is a signal that we need
      // to retransmit the ObserveProxy event for this port rather than flagging
      // the the proxy for removal ASAP.
      try_remove_proxy_immediately =
          event->last_sequence_num() != kInvalidSequenceNum;
      if (try_remove_proxy_immediately) {
        // We can now remove this port once we have received and forwarded the
        // last message addressed to this port.
        port->remove_proxy_on_last_message = true;
        port->last_sequence_num_to_receive = event->last_sequence_num();
      }
    } else if (port->state == Port::kClosed) {
      erase_port = true;
    } else {
      return OOPS(ERROR_PORT_STATE_UNEXPECTED);
    }
  }

  if (erase_port) {
    ErasePort(port_ref.name());
    return OK;
  }

  if (try_remove_proxy_immediately) {
    TryRemoveProxy(port_ref);
  } else {
    InitiateProxyRemoval(port_ref);
  }

  return OK;
}

int Node::OnObserveClosure(const PortRef& port_ref,
                           mozilla::UniquePtr<ObserveClosureEvent> event) {
  // OK if the port doesn't exist, as it may have been closed already.
  if (!port_ref.is_valid()) {
    return OK;
  }

  // This message tells the port that it should no longer expect more messages
  // beyond last_sequence_num. This message is forwarded along until we reach
  // the receiving end, and this message serves as an equivalent to
  // ObserveProxyAck.

  bool notify_delegate = false;
  NodeName peer_node_name;
  bool try_remove_proxy = false;
  bool erase_port = false;
  {
    SinglePortLocker locker(&port_ref);
    auto* port = locker.port();

    port->peer_closed = true;
    port->last_sequence_num_to_receive = event->last_sequence_num();

    DVLOG(2) << "ObserveClosure at " << port_ref.name() << "@" << name_
             << " (state=" << port->state << ") pointing to "
             << port->peer_port_name << "@" << port->peer_node_name
             << " (last_sequence_num=" << event->last_sequence_num() << ")";

    // We always forward ObserveClosure, even beyond the receiving port which
    // cares about it. This ensures that any dead-end proxies beyond that port
    // are notified to remove themselves.

    if (port->state == Port::kReceiving) {
      notify_delegate = true;

      // When forwarding along the other half of the port cycle, this will only
      // reach dead-end proxies. Tell them we've sent our last message so they
      // can go away.
      //
      // TODO: Repurposing ObserveClosure for this has the desired result but
      // may be semantically confusing since the forwarding port is not actually
      // closed. Consider replacing this with a new event type.
      event->set_last_sequence_num(port->next_sequence_num_to_send - 1);

      // Treat the closure as an acknowledge that all sent messages have been
      // read from the other end.
      port->last_sequence_num_acknowledged =
          port->next_sequence_num_to_send - 1;
    } else if (port->state == Port::kClosed) {
      // This is the ack for a closed proxy port notification. Now it's fine to
      // delete the port.
      erase_port = true;
    } else {
      // We haven't yet reached the receiving peer of the closed port, so we'll
      // forward the message along as-is.
      // See about removing the port if it is a proxy as our peer won't be able
      // to participate in proxy removal.
      port->remove_proxy_on_last_message = true;
      if (port->state == Port::kProxying) {
        try_remove_proxy = true;
      }
    }

    DVLOG(2) << "Forwarding ObserveClosure from " << port_ref.name() << "@"
             << name_ << " to peer " << port->peer_port_name << "@"
             << port->peer_node_name
             << " (last_sequence_num=" << event->last_sequence_num() << ")";

    event->set_port_name(port->peer_port_name);
    event->set_from_port(port_ref.name());
    event->set_control_sequence_num(port->next_control_sequence_num_to_send++);
    peer_node_name = port->peer_node_name;

    if (port->state == Port::kBuffering) {
      port->control_message_queue.push_back({peer_node_name, std::move(event)});
    }
  }

  if (try_remove_proxy) {
    TryRemoveProxy(port_ref);
  }

  if (erase_port) {
    ErasePort(port_ref.name());
  }

  if (event) {
    delegate_->ForwardEvent(peer_node_name, std::move(event));
  }

  if (notify_delegate) {
    delegate_->PortStatusChanged(port_ref);
  }

  return OK;
}

int Node::OnMergePort(const PortRef& port_ref,
                      mozilla::UniquePtr<MergePortEvent> event) {
  DVLOG(1) << "MergePort at " << port_ref.name() << "@" << name_
           << " merging with proxy " << event->new_port_name() << "@" << name_
           << " pointing to " << event->new_port_descriptor().peer_port_name
           << "@" << event->new_port_descriptor().peer_node_name
           << " referred by "
           << event->new_port_descriptor().referring_port_name << "@"
           << event->new_port_descriptor().referring_node_name;

  // Accept the new port. This is now the receiving end of the other port cycle
  // to be merged with ours. Note that we always attempt to accept the new port
  // first as otherwise its peer receiving port could be left stranded
  // indefinitely.
  if (AcceptPort(event->new_port_name(), event->new_port_descriptor()) != OK) {
    if (port_ref.is_valid()) {
      ClosePort(port_ref);
    }
    return ERROR_PORT_STATE_UNEXPECTED;
  }

  PortRef new_port_ref;
  GetPort(event->new_port_name(), &new_port_ref);
  if (!port_ref.is_valid() && new_port_ref.is_valid()) {
    ClosePort(new_port_ref);
    return ERROR_PORT_UNKNOWN;
  }
  if (port_ref.is_valid() && !new_port_ref.is_valid()) {
    ClosePort(port_ref);
    return ERROR_PORT_UNKNOWN;
  }

  bool peer_allowed = true;
  {
    SinglePortLocker locker(&port_ref);
    auto* port = locker.port();
    if (!port->pending_merge_peer) {
      CHROMIUM_LOG(ERROR) << "MergePort called on unexpected port: "
                          << event->port_name();
      peer_allowed = false;
    } else {
      port->pending_merge_peer = false;
    }
  }
  if (!peer_allowed) {
    ClosePort(port_ref);
    return ERROR_PORT_STATE_UNEXPECTED;
  }

  return MergePortsInternal(port_ref, new_port_ref,
                            false /* allow_close_on_bad_state */);
}

int Node::OnUserMessageReadAckRequest(
    const PortRef& port_ref,
    mozilla::UniquePtr<UserMessageReadAckRequestEvent> event) {
  DVLOG(1) << "AckRequest " << port_ref.name() << "@" << name_ << " sequence "
           << event->sequence_num_to_acknowledge();

  if (!port_ref.is_valid()) {
    return ERROR_PORT_UNKNOWN;
  }

  NodeName peer_node_name;
  mozilla::UniquePtr<Event> event_to_send;
  {
    SinglePortLocker locker(&port_ref);
    auto* port = locker.port();

    peer_node_name = port->peer_node_name;
    if (port->state == Port::kProxying) {
      // Proxies simply forward the ack request to their peer.
      event->set_port_name(port->peer_port_name);
      event->set_from_port(port_ref.name());
      event->set_control_sequence_num(
          port->next_control_sequence_num_to_send++);
      event_to_send = std::move(event);
    } else {
      uint64_t current_sequence_num =
          port->message_queue.next_sequence_num() - 1;
      // Either this is requesting an ack for a sequence number already read, or
      // else for a sequence number that is yet to be read.
      if (current_sequence_num >= event->sequence_num_to_acknowledge()) {
        // If the current sequence number to read already exceeds the ack
        // request, send an ack immediately.
        event_to_send = mozilla::MakeUnique<UserMessageReadAckEvent>(
            port->peer_port_name, port_ref.name(),
            port->next_control_sequence_num_to_send++, current_sequence_num);

        if (port->state == Port::kBuffering) {
          port->control_message_queue.push_back(
              {peer_node_name, std::move(event_to_send)});
        }

        // This might be a late or duplicate acknowledge request, that's
        // requesting acknowledge for an already read message. There may already
        // have been a request for future reads, so take care not to back up
        // the requested acknowledge counter.
        if (current_sequence_num > port->sequence_num_to_acknowledge) {
          port->sequence_num_to_acknowledge = current_sequence_num;
        }
      } else {
        // This is request to ack a sequence number that hasn't been read yet.
        // The state of the port can either be that it already has a
        // future-requested ack, or not. Because ack requests aren't guaranteed
        // to arrive in order, store the earlier of the current  queued request
        // and the new one, if one was already requested.
        bool has_queued_ack_request =
            port->sequence_num_to_acknowledge > current_sequence_num;
        if (!has_queued_ack_request ||
            port->sequence_num_to_acknowledge >
                event->sequence_num_to_acknowledge()) {
          port->sequence_num_to_acknowledge =
              event->sequence_num_to_acknowledge();
        }
        return OK;
      }
    }
  }

  if (event_to_send) {
    delegate_->ForwardEvent(peer_node_name, std::move(event_to_send));
  }

  return OK;
}

int Node::OnUserMessageReadAck(
    const PortRef& port_ref,
    mozilla::UniquePtr<UserMessageReadAckEvent> event) {
  DVLOG(1) << "Acknowledge " << port_ref.name() << "@" << name_ << " sequence "
           << event->sequence_num_acknowledged();

  NodeName peer_node_name;
  ScopedEvent ack_request_event;
  if (port_ref.is_valid()) {
    SinglePortLocker locker(&port_ref);
    auto* port = locker.port();

    if (event->sequence_num_acknowledged() >= port->next_sequence_num_to_send) {
      // TODO(http://crbug.com/980952): This is a malformed event.
      //      This could return a new error "ERROR_MALFORMED_EVENT" which the
      //      delegate could use as a signal to drop the peer node.
      return OK;
    }

    // Keep the largest acknowledge seen.
    if (event->sequence_num_acknowledged() <=
        port->last_sequence_num_acknowledged) {
      // The acknowledge was late or a duplicate, it's safe to ignore it.
      return OK;
    }

    port->last_sequence_num_acknowledged = event->sequence_num_acknowledged();
    // Send another ack request if the interval is non-zero and the peer has
    // not been closed.
    if (port->sequence_num_acknowledge_interval && !port->peer_closed) {
      peer_node_name = port->peer_node_name;
      ack_request_event = mozilla::MakeUnique<UserMessageReadAckRequestEvent>(
          port->peer_port_name, port_ref.name(),
          port->next_control_sequence_num_to_send++,
          port->last_sequence_num_acknowledged +
              port->sequence_num_acknowledge_interval);
      DCHECK_NE(port->state, Port::kBuffering);
    }
  }
  if (ack_request_event) {
    delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event));
  }

  if (port_ref.is_valid()) {
    delegate_->PortStatusChanged(port_ref);
  }

  return OK;
}

int Node::OnUpdatePreviousPeer(
    const PortRef& port_ref,
    mozilla::UniquePtr<UpdatePreviousPeerEvent> event) {
  DVLOG(1) << "OnUpdatePreviousPeer port: " << event->port_name()
           << " changing to " << event->new_node_name()
           << ", port: " << event->from_port() << " => "
           << event->new_port_name();

  if (!port_ref.is_valid()) {
    return ERROR_PORT_UNKNOWN;
  }

  const NodeName& new_node_name = event->new_node_name();
  const PortName& new_port_name = event->new_port_name();
  DCHECK_NE(new_node_name, kInvalidNodeName);
  DCHECK_NE(new_port_name, kInvalidPortName);
  if (new_node_name == kInvalidNodeName || new_port_name == kInvalidPortName) {
    return ERROR_PORT_STATE_UNEXPECTED;
  }

  {
    SinglePortLocker locker(&port_ref);
    auto* port = locker.port();

    port->prev_node_name = new_node_name;
    port->prev_port_name = new_port_name;
    // The sequence number will get incremented after this event has been
    // handled.
    port->next_control_sequence_num_to_receive = kInitialSequenceNum - 1;
  }

  return OK;
}

int Node::AddPortWithName(const PortName& port_name, RefPtr<Port> port) {
  PortLocker::AssertNoPortsLockedOnCurrentThread();
  mozilla::MutexAutoLock lock(ports_lock_);
  if (port->peer_port_name != kInvalidPortName) {
    DCHECK_NE(kInvalidNodeName, port->peer_node_name);
    peer_port_maps_[port->peer_node_name][port->peer_port_name].emplace(
        port_name, PortRef(port_name, port));
  }
  if (!ports_.emplace(port_name, std::move(port)).second) {
    return OOPS(ERROR_PORT_EXISTS);  // Suggests a bad UUID generator.
  }
  DVLOG(2) << "Created port " << port_name << "@" << name_;
  return OK;
}

void Node::ErasePort(const PortName& port_name) {
  PortLocker::AssertNoPortsLockedOnCurrentThread();
  RefPtr<Port> port;
  {
    mozilla::MutexAutoLock lock(ports_lock_);
    auto it = ports_.find(port_name);
    if (it == ports_.end()) {
      return;
    }
    port = std::move(it->second);
    ports_.erase(it);

    RemoveFromPeerPortMap(port_name, port.get());
  }
  // NOTE: We are careful not to release the port's messages while holding any
  // locks, since they may run arbitrary user code upon destruction.
  std::vector<mozilla::UniquePtr<UserMessageEvent>> messages;
  {
    PortRef port_ref(port_name, std::move(port));
    SinglePortLocker locker(&port_ref);
    locker.port()->message_queue.TakeAllMessages(&messages);
  }
  DVLOG(2) << "Deleted port " << port_name << "@" << name_;
}

int Node::SendUserMessageInternal(
    const PortRef& port_ref, mozilla::UniquePtr<UserMessageEvent>* message) {
  mozilla::UniquePtr<UserMessageEvent>& m = *message;

  m->set_from_port(port_ref.name());

  for (size_t i = 0; i < m->num_ports(); ++i) {
    if (m->ports()[i] == port_ref.name()) {
      return ERROR_PORT_CANNOT_SEND_SELF;
    }
  }

  NodeName target_node;
  int rv = PrepareToForwardUserMessage(port_ref, Port::kReceiving,
                                       false /* ignore_closed_peer */, m.get(),
                                       &target_node);
  if (rv != OK) {
    return rv;
  }

  // Beyond this point there's no sense in returning anything but OK. Even if
  // message forwarding or acceptance fails, there's nothing the embedder can
  // do to recover. Assume that failure beyond this point must be treated as a
  // transport failure.

  DCHECK_NE(kInvalidNodeName, target_node);
  if (target_node != name_) {
    delegate_->ForwardEvent(target_node, std::move(m));
    return OK;
  }

  int accept_result = AcceptEvent(name_, std::move(m));
  if (accept_result != OK) {
    // See comment above for why we don't return an error in this case.
    DVLOG(2) << "AcceptEvent failed: " << accept_result;
  }

  return OK;
}

int Node::MergePortsInternal(const PortRef& port0_ref, const PortRef& port1_ref,
                             bool allow_close_on_bad_state) {
  const PortRef* port_refs[2] = {&port0_ref, &port1_ref};
  PendingUpdatePreviousPeer pending_update_events[2];
  uint64_t original_sequence_number[2];
  {
    // Needed to swap peer map entries below.
    PortLocker::AssertNoPortsLockedOnCurrentThread();
    mozilla::ReleasableMutexAutoLock ports_locker(ports_lock_);

    mozilla::Maybe<PortLocker> locker(std::in_place, port_refs, size_t(2));
    auto* port0 = locker->GetPort(port0_ref);
    auto* port1 = locker->GetPort(port1_ref);

    // There are several conditions which must be met before we'll consider
    // merging two ports:
    //
    // - They must both be in the kReceiving state
    // - They must not be each other's peer
    // - They must have never sent a user message
    //
    // If any of these criteria are not met, we fail early.
    if (port0->state != Port::kReceiving || port1->state != Port::kReceiving ||
        (port0->peer_node_name == name_ &&
         port0->peer_port_name == port1_ref.name()) ||
        (port1->peer_node_name == name_ &&
         port1->peer_port_name == port0_ref.name()) ||
        port0->next_sequence_num_to_send != kInitialSequenceNum ||
        port1->next_sequence_num_to_send != kInitialSequenceNum) {
      // On failure, we only close a port if it was at least properly in the
      // |kReceiving| state. This avoids getting the system in an inconsistent
      // state by e.g. closing a proxy abruptly.
      //
      // Note that we must release the port locks before closing ports.
      const bool close_port0 =
          port0->state == Port::kReceiving || allow_close_on_bad_state;
      const bool close_port1 =
          port1->state == Port::kReceiving || allow_close_on_bad_state;
      locker.reset();
      ports_locker.Unlock();
      if (close_port0) {
        ClosePort(port0_ref);
      }
      if (close_port1) {
        ClosePort(port1_ref);
      }
      return ERROR_PORT_STATE_UNEXPECTED;
    }

    pending_update_events[0] = {
        .receiver = port0->peer_node_name,
        .port = port0->peer_port_name,
        .from_port = port0_ref.name(),
        .sequence_num = port0->next_control_sequence_num_to_send++,
        .new_prev_node = name_,
        .new_prev_port = port1_ref.name()};
    pending_update_events[1] = {
        .receiver = port1->peer_node_name,
        .port = port1->peer_port_name,
        .from_port = port1_ref.name(),
        .sequence_num = port1->next_control_sequence_num_to_send++,
        .new_prev_node = name_,
        .new_prev_port = port0_ref.name()};

    // Swap the ports' peer information and switch them both to proxying mode.
    SwapPortPeers(port0_ref.name(), port0, port1_ref.name(), port1);
    port0->state = Port::kProxying;
    port1->state = Port::kProxying;
    original_sequence_number[0] = port0->next_control_sequence_num_to_send;
    original_sequence_number[1] = port1->next_control_sequence_num_to_send;
    port0->next_control_sequence_num_to_send = kInitialSequenceNum;
    port1->next_control_sequence_num_to_send = kInitialSequenceNum;
    if (port0->peer_closed) {
      port0->remove_proxy_on_last_message = true;
    }
    if (port1->peer_closed) {
      port1->remove_proxy_on_last_message = true;
    }
  }

  // Flush any queued messages from the new proxies and, if successful, complete
  // the merge by initiating proxy removals.
  if (ForwardUserMessagesFromProxy(port0_ref) == OK &&
      ForwardUserMessagesFromProxy(port1_ref) == OK) {
    // Send the prev peer updates out after the forwarding the user messages
    // succeeded. Otherwise, we won't be able to restore the previous state
    // below.
    for (const auto& pending_update_event : pending_update_events) {
      delegate_->ForwardEvent(
          pending_update_event.receiver,
          mozilla::MakeUnique<UpdatePreviousPeerEvent>(
              pending_update_event.port, pending_update_event.from_port,
              pending_update_event.sequence_num,
              pending_update_event.new_prev_node,
              pending_update_event.new_prev_port));
    }

    for (const autoconst port_ref : port_refs) {
      bool try_remove_proxy_immediately = false;
      ScopedEvent closure_event;
      NodeName closure_event_target_node;
      {
        SinglePortLocker locker(port_ref);
        auto* port = locker.port();
        DCHECK_EQ(port->state, Port::kProxying);
        try_remove_proxy_immediately = port->remove_proxy_on_last_message;
        if (try_remove_proxy_immediately || port->peer_closed) {
          // If either end of the port cycle is closed, we propagate an
          // ObserveClosure event.
          closure_event_target_node = port->peer_node_name;
          closure_event = mozilla::MakeUnique<ObserveClosureEvent>(
              port->peer_port_name, port_ref->name(),
              port->next_control_sequence_num_to_send++,
              port->last_sequence_num_to_receive);
        }
      }
      if (try_remove_proxy_immediately) {
        TryRemoveProxy(*port_ref);
      } else {
        InitiateProxyRemoval(*port_ref);
      }

      if (closure_event) {
        delegate_->ForwardEvent(closure_event_target_node,
                                std::move(closure_event));
      }
    }

    return OK;
  }

  // If we failed to forward proxied messages, we keep the system in a
  // consistent state by undoing the peer swap and closing the ports.
  {
    PortLocker::AssertNoPortsLockedOnCurrentThread();
    mozilla::MutexAutoLock ports_locker(ports_lock_);
    PortLocker locker(port_refs, 2);
    auto* port0 = locker.GetPort(port0_ref);
    auto* port1 = locker.GetPort(port1_ref);
    SwapPortPeers(port0_ref.name(), port0, port1_ref.name(), port1);
    port0->remove_proxy_on_last_message = false;
    port1->remove_proxy_on_last_message = false;
    DCHECK_EQ(Port::kProxying, port0->state);
    DCHECK_EQ(Port::kProxying, port1->state);
    port0->state = Port::kReceiving;
    port1->state = Port::kReceiving;
    port0->next_control_sequence_num_to_send = original_sequence_number[0];
    port1->next_control_sequence_num_to_send = original_sequence_number[1];
  }

  ClosePort(port0_ref);
  ClosePort(port1_ref);
  return ERROR_PORT_STATE_UNEXPECTED;
}

void Node::ConvertToProxy(Port* port, const NodeName& to_node_name,
                          PortName* port_name,
                          Event::PortDescriptor* port_descriptor,
                          PendingUpdatePreviousPeer* pending_update) {
  port->AssertLockAcquired();
  PortName local_port_name = *port_name;

  PortName new_port_name;
  GenerateRandomPortName(&new_port_name);

  pending_update->receiver = port->peer_node_name;
  pending_update->port = port->peer_port_name;
  pending_update->sequence_num = port->next_control_sequence_num_to_send++;
  pending_update->new_prev_node = to_node_name;
  pending_update->new_prev_port = new_port_name;

  // Make sure we don't send messages to the new peer until after we know it
  // exists. In the meantime, just buffer messages locally.
  DCHECK_EQ(port->state, Port::kReceiving);
  port->state = Port::kBuffering;

  // If we already know our peer is closed, we already know this proxy can
  // be removed once it receives and forwards its last expected message.
  if (port->peer_closed) {
    port->remove_proxy_on_last_message = true;
  }

  *port_name = new_port_name;

  port_descriptor->peer_node_name = port->peer_node_name;
  port_descriptor->peer_port_name = port->peer_port_name;
  port_descriptor->referring_node_name = name_;
  port_descriptor->referring_port_name = local_port_name;
  port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send;
  port_descriptor->next_sequence_num_to_receive =
      port->message_queue.next_sequence_num();
  port_descriptor->last_sequence_num_to_receive =
      port->last_sequence_num_to_receive;
  port_descriptor->peer_closed = port->peer_closed;
  memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding));

  // Configure the local port to point to the new port.
  UpdatePortPeerAddress(local_port_name, port, to_node_name, new_port_name);
}

int Node::AcceptPort(const PortName& port_name,
                     const Event::PortDescriptor& port_descriptor) {
  RefPtr<Port> port =
      mozilla::MakeRefPtr<Port>(port_descriptor.next_sequence_num_to_send,
                                port_descriptor.next_sequence_num_to_receive);
  port->state = Port::kReceiving;
  port->peer_node_name = port_descriptor.peer_node_name;
  port->peer_port_name = port_descriptor.peer_port_name;
  port->next_control_sequence_num_to_send = kInitialSequenceNum;
  port->next_control_sequence_num_to_receive = kInitialSequenceNum;
  port->prev_node_name = port_descriptor.referring_node_name;
  port->prev_port_name = port_descriptor.referring_port_name;
  port->last_sequence_num_to_receive =
      port_descriptor.last_sequence_num_to_receive;
  port->peer_closed = port_descriptor.peer_closed;

  DVLOG(2) << "Accepting port " << port_name
           << " [peer_closed=" << port->peer_closed
           << "; last_sequence_num_to_receive="
           << port->last_sequence_num_to_receive << "]";

  // A newly accepted port is not signalable until the message referencing the
  // new port finds its way to the consumer (see GetMessage).
  port->message_queue.set_signalable(false);

  int rv = AddPortWithName(port_name, std::move(port));
  if (rv != OK) {
    return rv;
  }

  // Allow referring port to forward messages.
  delegate_->ForwardEvent(port_descriptor.referring_node_name,
                          mozilla::MakeUnique<PortAcceptedEvent>(
                              port_descriptor.referring_port_name,
                              kInvalidPortName, kInvalidSequenceNum));

  if (port_descriptor.peer_node_name != name_) {
    delegate_->ObserveRemoteNode(port_descriptor.peer_node_name);
  }

  return OK;
}

int Node::PrepareToForwardUserMessage(const PortRef& forwarding_port_ref,
                                      Port::State expected_port_state,
                                      bool ignore_closed_peer,
                                      UserMessageEvent* message,
                                      NodeName* forward_to_node) {
  bool target_is_remote = false;
  std::vector<PendingUpdatePreviousPeer> peer_update_events;

  for (;;) {
    NodeName target_node_name;
    {
      SinglePortLocker locker(&forwarding_port_ref);
      target_node_name = locker.port()->peer_node_name;
    }

    // NOTE: This may call out to arbitrary user code, so it's important to call
    // it only while no port locks are held on the calling thread.
    if (target_node_name != name_) {
      if (!message->NotifyWillBeRoutedExternally()) {
        CHROMIUM_LOG(ERROR)
            << "NotifyWillBeRoutedExternally failed unexpectedly.";
        return ERROR_PORT_STATE_UNEXPECTED;
      }
    }

    // Must be held because ConvertToProxy needs to update |peer_port_maps_|.
    PortLocker::AssertNoPortsLockedOnCurrentThread();
    mozilla::MutexAutoLock ports_locker(ports_lock_);

    // Simultaneously lock the forwarding port as well as all attached ports.
    AutoTArray<PortRef, 4> attached_port_refs;
    AutoTArray<const PortRef*, 5> ports_to_lock;
    attached_port_refs.SetCapacity(message->num_ports());
    ports_to_lock.SetCapacity(message->num_ports() + 1);
    ports_to_lock.AppendElement(&forwarding_port_ref);
    for (size_t i = 0; i < message->num_ports(); ++i) {
      const PortName& attached_port_name = message->ports()[i];
      auto iter = ports_.find(attached_port_name);
      DCHECK(iter != ports_.end());
      attached_port_refs.AppendElement(
          PortRef(attached_port_name, iter->second));
      ports_to_lock.AppendElement(&attached_port_refs[i]);
    }
    PortLocker locker(ports_to_lock.Elements(), ports_to_lock.Length());
    auto* forwarding_port = locker.GetPort(forwarding_port_ref);

    if (forwarding_port->peer_node_name != target_node_name) {
      // The target node has already changed since we last held the lock.
      if (target_node_name == name_) {
        // If the target node was previously this local node, we need to restart
        // the loop, since that means we may now route the message externally.
        continue;
      }

      target_node_name = forwarding_port->peer_node_name;
    }
    target_is_remote = target_node_name != name_;

    if (forwarding_port->state != expected_port_state) {
      return ERROR_PORT_STATE_UNEXPECTED;
    }
    if (forwarding_port->peer_closed && !ignore_closed_peer) {
      return ERROR_PORT_PEER_CLOSED;
    }

    // Messages may already have a sequence number if they're being forwarded by
    // a proxy. Otherwise, use the next outgoing sequence number.
    if (message->sequence_num() == 0) {
      message->set_sequence_num(forwarding_port->next_sequence_num_to_send++);
    }
#ifdef DEBUG
    std::ostringstream ports_buf;
    for (size_t i = 0; i < message->num_ports(); ++i) {
      if (i > 0) {
        ports_buf << ",";
      }
      ports_buf << message->ports()[i];
    }
#endif

    if (message->num_ports() > 0) {
      // Sanity check to make sure we can actually send all the attached ports.
      // They must all be in the |kReceiving| state and must not be the sender's
      // own peer.
      DCHECK_EQ(message->num_ports(), attached_port_refs.Length());
      for (size_t i = 0; i < message->num_ports(); ++i) {
        auto* attached_port = locker.GetPort(attached_port_refs[i]);
        int error = OK;
        if (attached_port->state != Port::kReceiving) {
          error = ERROR_PORT_STATE_UNEXPECTED;
        } else if (attached_port_refs[i].name() ==
                   forwarding_port->peer_port_name) {
          error = ERROR_PORT_CANNOT_SEND_PEER;
        }

        if (error != OK) {
          // Not going to send. Backpedal on the sequence number.
          forwarding_port->next_sequence_num_to_send--;
          return error;
        }
      }

      if (target_is_remote) {
        // We only bother to proxy and rewrite ports in the event if it's
        // going to be routed to an external node. This substantially reduces
        // the amount of port churn in the system, as many port-carrying
        // events are routed at least 1 or 2 intra-node hops before (if ever)
        // being routed externally.
        Event::PortDescriptor* port_descriptors = message->port_descriptors();
        for (size_t i = 0; i < message->num_ports(); ++i) {
          auto* port = locker.GetPort(attached_port_refs[i]);
          PendingUpdatePreviousPeer update_event = {
              .from_port = attached_port_refs[i].name()};
          ConvertToProxy(port, target_node_name, message->ports() + i,
                         port_descriptors + i, &update_event);
          peer_update_events.push_back(update_event);
        }
      }
    }

#ifdef DEBUG
    DVLOG(4) << "Sending message " << message->sequence_num()
             << " [ports=" << ports_buf.str() << "]"
             << " from " << forwarding_port_ref.name() << "@" << name_ << " to "
             << forwarding_port->peer_port_name << "@" << target_node_name;
#endif

    *forward_to_node = target_node_name;
    message->set_port_name(forwarding_port->peer_port_name);
    message->set_from_port(forwarding_port_ref.name());
    message->set_control_sequence_num(
        forwarding_port->next_control_sequence_num_to_send++);
    break;
  }

  for (auto& pending_update_event : peer_update_events) {
    delegate_->ForwardEvent(
        pending_update_event.receiver,
        mozilla::MakeUnique<UpdatePreviousPeerEvent>(
            pending_update_event.port, pending_update_event.from_port,
            pending_update_event.sequence_num,
            pending_update_event.new_prev_node,
            pending_update_event.new_prev_port));
  }

  if (target_is_remote) {
    for (size_t i = 0; i < message->num_ports(); ++i) {
      // For any ports that were converted to proxies above, make sure their
      // prior local peer (if applicable) receives a status update so it can be
      // made aware of its peer's location.
      const Event::PortDescriptor& descriptor = message->port_descriptors()[i];
      if (descriptor.peer_node_name == name_) {
        PortRef local_peer;
        if (GetPort(descriptor.peer_port_name, &local_peer) == OK) {
          delegate_->PortStatusChanged(local_peer);
        }
      }
    }
  }

  return OK;
}

int Node::BeginProxying(const PortRef& port_ref) {
  std::vector<std::pair<NodeName, ScopedEvent>> control_message_queue;
  {
    SinglePortLocker locker(&port_ref);
    auto* port = locker.port();
    if (port->state != Port::kBuffering) {
      return OOPS(ERROR_PORT_STATE_UNEXPECTED);
    }
    port->state = Port::kProxying;
    std::swap(port->control_message_queue, control_message_queue);
  }

  for (auto& [control_message_node_name, control_message_event] :
       control_message_queue) {
    delegate_->ForwardEvent(control_message_node_name,
                            std::move(control_message_event));
  }
  control_message_queue.clear();

  int rv = ForwardUserMessagesFromProxy(port_ref);
  if (rv != OK) {
    return rv;
  }

  // Forward any pending acknowledge request.
  MaybeForwardAckRequest(port_ref);

  bool try_remove_proxy_immediately;
  {
    SinglePortLocker locker(&port_ref);
    auto* port = locker.port();
    if (port->state != Port::kProxying) {
      return OOPS(ERROR_PORT_STATE_UNEXPECTED);
    }

    try_remove_proxy_immediately = port->remove_proxy_on_last_message;
  }

  if (try_remove_proxy_immediately) {
    TryRemoveProxy(port_ref);
  } else {
    InitiateProxyRemoval(port_ref);
  }

  return OK;
}

int Node::ForwardUserMessagesFromProxy(const PortRef& port_ref) {
  for (;;) {
    // NOTE: We forward messages in sequential order here so that we maintain
    // the message queue's notion of next sequence number. That's useful for the
    // proxy removal process as we can tell when this port has seen all of the
    // messages it is expected to see.
    mozilla::UniquePtr<UserMessageEvent> message;
    {
      SinglePortLocker locker(&port_ref);
      locker.port()->message_queue.GetNextMessage(&message, nullptr);
      if (!message) {
        break;
      }
    }

    NodeName target_node;
    int rv = PrepareToForwardUserMessage(port_ref, Port::kProxying,
                                         true /* ignore_closed_peer */,
                                         message.get(), &target_node);
    {
      // Mark the message as processed after we ran PrepareToForwardUserMessage.
      // This is important to prevent another thread from deleting the port
      // before we grabbed a sequence number for the message.
      SinglePortLocker locker(&port_ref);
      locker.port()->message_queue.MessageProcessed();
    }
    if (rv != OK) {
      return rv;
    }

    delegate_->ForwardEvent(target_node, std::move(message));
  }
  return OK;
}

void Node::InitiateProxyRemoval(const PortRef& port_ref) {
  NodeName peer_node_name;
  PortName peer_port_name;
  uint64_t sequence_num;
  {
    SinglePortLocker locker(&port_ref);
    auto* port = locker.port();
    if (port->state == Port::kClosed) {
      return;
    }
    peer_node_name = port->peer_node_name;
    peer_port_name = port->peer_port_name;
    sequence_num = port->next_control_sequence_num_to_send++;
    DCHECK_EQ(port->state, Port::kProxying);
  }

  // To remove this node, we start by notifying the connected graph that we are
  // a proxy. This allows whatever port is referencing this node to skip it.
  // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if
  // the peer was closed in the meantime).
  delegate_->ForwardEvent(
      peer_node_name, mozilla::MakeUnique<ObserveProxyEvent>(
                          peer_port_name, port_ref.name(), sequence_num, name_,
                          port_ref.name(), peer_node_name, peer_port_name));
}

void Node::TryRemoveProxy(const PortRef& port_ref) {
  bool should_erase = false;
  NodeName removal_target_node;
  ScopedEvent removal_event;
  PendingUpdatePreviousPeer pending_update_event;

  {
    SinglePortLocker locker(&port_ref);
    auto* port = locker.port();
    if (port->state == Port::kClosed) {
      return;
    }
    DCHECK_EQ(port->state, Port::kProxying);

    // Make sure we have seen ObserveProxyAck before removing the port.
    if (!port->remove_proxy_on_last_message) {
      return;
    }

    if (!CanAcceptMoreMessages(port)) {
      DCHECK_EQ(port->message_queue.queued_message_count(), 0lu);
      should_erase = true;
      if (port->send_on_proxy_removal) {
        removal_target_node = port->send_on_proxy_removal->first;
--> --------------------

--> maximum size reached

--> --------------------

Messung V0.5
C=86 H=95 G=90

¤ Dauer der Verarbeitung: 0.19 Sekunden  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.