Quellcodebibliothek Statistik Leitseite products/sources/formale Sprachen/C/Firefox/ipc/chromium/gtest/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 52 kB image not shown  

Quelle  ports_unittest.cc   Sprache: C

 
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <inttypes.h>
#include <stdio.h>

#include <map>
#include <utility>

#include "base/logging.h"
#include "base/waitable_event.h"
#include "base/thread.h"
#include "base/string_util.h"
#include "mojo/core/ports/event.h"
#include "mojo/core/ports/node.h"
#include "mojo/core/ports/node_delegate.h"
#include "mojo/core/ports/port_locker.h"
#include "mojo/core/ports/user_message.h"
#include "testing/gtest/include/gtest/gtest.h"

#include "mozilla/Mutex.h"

namespace mojo {
namespace core {
namespace ports {
namespace test {

namespace {

// TODO(rockot): Remove this unnecessary alias.
using ScopedMessage = mozilla::UniquePtr<UserMessageEvent>;

class TestMessage : public UserMessage {
 public:
  static const TypeInfo kUserMessageTypeInfo;

  explicit TestMessage(const std::string& payload)
      : UserMessage(&kUserMessageTypeInfo), payload_(payload) {}
  ~TestMessage() override = default;

  const std::string& payload() const { return payload_; }

 private:
  std::string payload_;
};

const UserMessage::TypeInfo TestMessage::kUserMessageTypeInfo = {};

ScopedMessage NewUserMessageEvent(const std::string& payload,
                                  size_t num_ports) {
  auto event = mozilla::MakeUnique<UserMessageEvent>(num_ports);
  event->AttachMessage(mozilla::MakeUnique<TestMessage>(payload));
  return event;
}

bool MessageEquals(const ScopedMessage& message, const std::string& s) {
  return message->GetMessage<TestMessage>()->payload() == s;
}

class TestNode;

class MessageRouter {
 public:
  virtual ~MessageRouter() = default;

  virtual void ForwardEvent(TestNode* from_node, const NodeName& node_name,
                            ScopedEvent event) = 0;
  virtual void BroadcastEvent(TestNode* from_node, ScopedEvent event) = 0;
};

class TestNode : public NodeDelegate {
 public:
  explicit TestNode(uint64_t id)
      : node_name_(id, 1),
        node_(node_name_, this),
        node_thread_(StringPrintf("Node %" PRIu64 " thread", id).c_str()),
        events_available_event_(/* manual_reset */ false,
                                /* initially_signaled */ false),
        idle_event_(/* manual_reset */ true, /* initially_signaled */ true) {}

  ~TestNode() override {
    StopWhenIdle();
    node_thread_.Stop();
  }

  const NodeName& name() const { return node_name_; }

  // NOTE: Node is thread-safe.
  Node& node() { return node_; }

  base::WaitableEvent& idle_event() { return idle_event_; }

  bool IsIdle() {
    mozilla::MutexAutoLock lock(lock_);
    return started_ && !dispatching_ &&
           (incoming_events_.empty() || (block_on_event_ && blocked_));
  }

  void BlockOnEvent(Event::Type type) {
    mozilla::MutexAutoLock lock(lock_);
    blocked_event_type_ = type;
    block_on_event_ = true;
  }

  void Unblock() {
    mozilla::MutexAutoLock lock(lock_);
    block_on_event_ = false;
    events_available_event_.Signal();
  }

  void Start(MessageRouter* router) {
    router_ = router;
    node_thread_.Start();
    node_thread_.message_loop()->PostTask(mozilla::NewNonOwningRunnableMethod(
        "TestNode::ProcessEvents"this, &TestNode::ProcessEvents));
  }

  void StopWhenIdle() {
    mozilla::MutexAutoLock lock(lock_);
    should_quit_ = true;
    events_available_event_.Signal();
  }

  void WakeUp() { events_available_event_.Signal(); }

  int SendStringMessage(const PortRef& port, const std::string& s) {
    return node_.SendUserMessage(port, NewUserMessageEvent(s, 0));
  }

  int SendMultipleMessages(const PortRef& port, size_t num_messages) {
    for (size_t i = 0; i < num_messages; ++i) {
      int result = SendStringMessage(port, "");
      if (result != OK) {
        return result;
      }
    }
    return OK;
  }

  int SendStringMessageWithPort(const PortRef& port, const std::string& s,
                                const PortName& sent_port_name) {
    auto event = NewUserMessageEvent(s, 1);
    event->ports()[0] = sent_port_name;
    return node_.SendUserMessage(port, std::move(event));
  }

  int SendStringMessageWithPort(const PortRef& port, const std::string& s,
                                const PortRef& sent_port) {
    return SendStringMessageWithPort(port, s, sent_port.name());
  }

  void set_drop_messages(bool value) {
    mozilla::MutexAutoLock lock(lock_);
    drop_messages_ = value;
  }

  void set_save_messages(bool value) {
    mozilla::MutexAutoLock lock(lock_);
    save_messages_ = value;
  }

  bool ReadMessage(const PortRef& port, ScopedMessage* message) {
    return node_.GetMessage(port, message, nullptr) == OK && *message;
  }

  bool ReadMultipleMessages(const PortRef& port, size_t num_messages) {
    for (size_t i = 0; i < num_messages; ++i) {
      ScopedMessage message;
      if (!ReadMessage(port, &message)) {
        return false;
      }
    }
    return true;
  }

  bool GetSavedMessage(ScopedMessage* message) {
    mozilla::MutexAutoLock lock(lock_);
    if (saved_messages_.empty()) {
      message->reset();
      return false;
    }
    std::swap(*message, saved_messages_.front());
    saved_messages_.pop();
    return true;
  }

  void EnqueueEvent(const NodeName& from_node, ScopedEvent event) {
    idle_event_.Reset();

    // NOTE: This may be called from ForwardMessage and thus must not reenter
    // |node_|.
    mozilla::MutexAutoLock lock(lock_);
    incoming_events_.push({from_node, std::move(event)});
    events_available_event_.Signal();
  }

  void ForwardEvent(const NodeName& node_name, ScopedEvent event) override {
    {
      mozilla::MutexAutoLock lock(lock_);
      if (drop_messages_) {
        DVLOG(1) << "Dropping ForwardMessage from node " << node_name_ << " to "
                 << node_name;

        mozilla::MutexAutoUnlock unlock(lock_);
        ClosePortsInEvent(event.get());
        return;
      }
    }

    DCHECK(router_);
    DVLOG(1) << "ForwardEvent from node " << node_name_ << " to " << node_name;
    router_->ForwardEvent(this, node_name, std::move(event));
  }

  void BroadcastEvent(ScopedEvent event) override {
    router_->BroadcastEvent(this, std::move(event));
  }

  void PortStatusChanged(const PortRef& port) override {
    // The port may be closed, in which case we ignore the notification.
    mozilla::MutexAutoLock lock(lock_);
    if (!save_messages_) {
      return;
    }

    for (;;) {
      ScopedMessage message;
      {
        mozilla::MutexAutoUnlock unlock(lock_);
        if (!ReadMessage(port, &message)) {
          break;
        }
      }

      saved_messages_.emplace(std::move(message));
    }
  }

  void ObserveRemoteNode(const NodeName& node) override {
    DCHECK(node != node_name_);
  }

  void ClosePortsInEvent(Event* event) {
    if (event->type() != Event::Type::kUserMessage) {
      return;
    }

    UserMessageEvent* message_event = static_cast<UserMessageEvent*>(event);
    for (size_t i = 0; i < message_event->num_ports(); ++i) {
      PortRef port;
      ASSERT_EQ(OK, node_.GetPort(message_event->ports()[i], &port));
      EXPECT_EQ(OK, node_.ClosePort(port));
    }
  }

  uint64_t GetUnacknowledgedMessageCount(const PortRef& port_ref) {
    PortStatus status{};
    if (node_.GetStatus(port_ref, &status) != OK) {
      return 0;
    }

    return status.unacknowledged_message_count;
  }

  void AllowPortMerge(const PortRef& port_ref) {
    SinglePortLocker locker(&port_ref);
    locker.port()->pending_merge_peer = true;
  }

 private:
  void ProcessEvents() {
    for (;;) {
      events_available_event_.Wait();
      mozilla::MutexAutoLock lock(lock_);

      if (should_quit_) {
        return;
      }

      dispatching_ = true;
      while (!incoming_events_.empty()) {
        if (block_on_event_ &&
            incoming_events_.front().second->type() == blocked_event_type_) {
          blocked_ = true;
          // Go idle if we hit a blocked event type.
          break;
        }
        blocked_ = false;

        auto node_event_pair = std::move(incoming_events_.front());
        incoming_events_.pop();

        // NOTE: AcceptMessage() can re-enter this object to call any of the
        // NodeDelegate interface methods.
        mozilla::MutexAutoUnlock unlock(lock_);
        node_.AcceptEvent(node_event_pair.first,
                          std::move(node_event_pair.second));
      }

      dispatching_ = false;
      started_ = true;
      idle_event_.Signal();
    };
  }

  const NodeName node_name_;
  Node node_;
  MessageRouter* router_ = nullptr;

  base::Thread node_thread_;
  base::WaitableEvent events_available_event_;
  base::WaitableEvent idle_event_;

  // Guards fields below.
  mozilla::Mutex lock_ MOZ_UNANNOTATED{"TestNode"};
  bool started_ = false;
  bool dispatching_ = false;
  bool should_quit_ = false;
  bool drop_messages_ = false;
  bool save_messages_ = false;
  bool blocked_ = false;
  bool block_on_event_ = false;
  Event::Type blocked_event_type_{};
  std::queue<std::pair<NodeName, ScopedEvent>> incoming_events_;
  std::queue<ScopedMessage> saved_messages_;
};

class PortsTest : public testing::Test, public MessageRouter {
 public:
  void AddNode(TestNode* node) {
    {
      mozilla::MutexAutoLock lock(lock_);
      nodes_[node->name()] = node;
    }
    node->Start(this);
  }

  void RemoveNode(TestNode* node) {
    {
      mozilla::MutexAutoLock lock(lock_);
      nodes_.erase(node->name());
    }

    for (const auto& entry : nodes_) {
      entry.second->node().LostConnectionToNode(node->name());
    }
  }

  // Waits until all known Nodes are idle. Message forwarding and processing
  // is handled in such a way that idleness is a stable state: once all nodes in
  // the system are idle, they will remain idle until the test explicitly
  // initiates some further event (e.g. sending a message, closing a port, or
  // removing a Node).
  void WaitForIdle() {
    for (;;) {
      mozilla::MutexAutoLock global_lock(global_lock_);
      bool all_nodes_idle = true;
      for (const auto& entry : nodes_) {
        if (!entry.second->IsIdle()) {
          all_nodes_idle = false;
        }
        entry.second->WakeUp();
      }
      if (all_nodes_idle) {
        return;
      }

      // Wait for any Node to signal that it's idle.
      mozilla::MutexAutoUnlock global_unlock(global_lock_);
      std::vector<base::WaitableEvent*> events;
      for (const auto& entry : nodes_) {
        events.push_back(&entry.second->idle_event());
      }
      base::WaitableEvent::WaitMany(events.data(), events.size());
    }
  }

  void CreatePortPair(TestNode* node0, PortRef* port0, TestNode* node1,
                      PortRef* port1) {
    if (node0 == node1) {
      EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1));
    } else {
      EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0));
      EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1));
      EXPECT_EQ(
          OK, node0->node().InitializePort(*port0, node1->name(), port1->name(),
                                           node1->name(), port1->name()));
      EXPECT_EQ(
          OK, node1->node().InitializePort(*port1, node0->name(), port0->name(),
                                           node0->name(), port0->name()));
    }
  }

 private:
  // MessageRouter:
  void ForwardEvent(TestNode* from_node, const NodeName& node_name,
                    ScopedEvent event) override {
    mozilla::MutexAutoLock global_lock(global_lock_);
    mozilla::MutexAutoLock lock(lock_);
    // Drop messages from nodes that have been removed.
    if (nodes_.find(from_node->name()) == nodes_.end()) {
      from_node->ClosePortsInEvent(event.get());
      return;
    }

    auto it = nodes_.find(node_name);
    if (it == nodes_.end()) {
      DVLOG(1) << "Node not found: " << node_name;
      return;
    }

    // Serialize and de-serialize all forwarded events.
    size_t buf_size = event->GetSerializedSize();
    mozilla::UniquePtr<char[]> buf(new char[buf_size]);
    event->Serialize(buf.get());
    ScopedEvent copy = Event::Deserialize(buf.get(), buf_size);
    // This should always succeed unless serialization or deserialization
    // is broken. In that case, the loss of events should cause a test failure.
    ASSERT_TRUE(copy);

    // Also copy the payload for user messages.
    if (event->type() == Event::Type::kUserMessage) {
      UserMessageEvent* message_event =
          static_cast<UserMessageEvent*>(event.get());
      UserMessageEvent* message_copy =
          static_cast<UserMessageEvent*>(copy.get());

      message_copy->AttachMessage(mozilla::MakeUnique<TestMessage>(
          message_event->GetMessage<TestMessage>()->payload()));
    }

    it->second->EnqueueEvent(from_node->name(), std::move(event));
  }

  void BroadcastEvent(TestNode* from_node, ScopedEvent event) override {
    mozilla::MutexAutoLock global_lock(global_lock_);
    mozilla::MutexAutoLock lock(lock_);

    // Drop messages from nodes that have been removed.
    if (nodes_.find(from_node->name()) == nodes_.end()) {
      return;
    }

    for (const auto& entry : nodes_) {
      TestNode* node = entry.second;
      // Broadcast doesn't deliver to the local node.
      if (node == from_node) {
        continue;
      }
      node->EnqueueEvent(from_node->name(), event->CloneForBroadcast());
    }
  }

  // Acquired before any operation which makes a Node busy, and before testing
  // if all nodes are idle.
  mozilla::Mutex global_lock_ MOZ_UNANNOTATED{"PortsTest Global Lock"};

  mozilla::Mutex lock_ MOZ_UNANNOTATED{"PortsTest Lock"};
  std::map<NodeName, TestNode*> nodes_;
};

}  // namespace

TEST_F(PortsTest, Basic1) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
  EXPECT_EQ(OK, node0.node().ClosePort(a0));

  EXPECT_EQ(OK, node0.node().ClosePort(x0));
  EXPECT_EQ(OK, node1.node().ClosePort(x1));

  WaitForIdle();

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, Basic2) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);

  PortRef b0, b1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1));
  EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again"));

  EXPECT_EQ(OK, node0.node().ClosePort(b0));

  EXPECT_EQ(OK, node0.node().ClosePort(x0));
  EXPECT_EQ(OK, node1.node().ClosePort(x1));

  WaitForIdle();

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, Basic3) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));

  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
  EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again"));

  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0));

  PortRef b0, b1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1));
  EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz"));

  EXPECT_EQ(OK, node0.node().ClosePort(b0));

  EXPECT_EQ(OK, node0.node().ClosePort(x0));
  EXPECT_EQ(OK, node1.node().ClosePort(x1));

  WaitForIdle();

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, LostConnectionToNode1) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);
  node1.set_drop_messages(true);

  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);

  // Transfer a port to node1 and simulate a lost connection to node1.

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));

  WaitForIdle();

  RemoveNode(&node1);

  WaitForIdle();

  EXPECT_EQ(OK, node0.node().ClosePort(a0));
  EXPECT_EQ(OK, node0.node().ClosePort(x0));
  EXPECT_EQ(OK, node1.node().ClosePort(x1));

  WaitForIdle();

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, LostConnectionToNode2) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1));

  WaitForIdle();

  node1.set_drop_messages(true);

  RemoveNode(&node1);

  WaitForIdle();

  // a0 should have eventually detected peer closure after node loss.
  ScopedMessage message;
  EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
            node0.node().GetMessage(a0, &message, nullptr));
  EXPECT_FALSE(message);

  EXPECT_EQ(OK, node0.node().ClosePort(a0));

  EXPECT_EQ(OK, node0.node().ClosePort(x0));

  EXPECT_EQ(OK, node1.node().GetMessage(x1, &message, nullptr));
  EXPECT_TRUE(message);
  node1.ClosePortsInEvent(message.get());

  EXPECT_EQ(OK, node1.node().ClosePort(x1));

  WaitForIdle();

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) {
  // Tests that a proxy gets cleaned up when its indirect peer lives on a lost
  // node.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  TestNode node2(2);
  AddNode(&node2);

  // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
  PortRef A, B, C, D;
  CreatePortPair(&node0, &A, &node1, &B);
  CreatePortPair(&node1, &C, &node2, &D);

  // Create E-F and send F over A to node 1.
  PortRef E, F;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F));

  WaitForIdle();

  ScopedMessage message;
  ASSERT_TRUE(node1.ReadMessage(B, &message));
  ASSERT_EQ(1u, message->num_ports());

  EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F));

  // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1
  // will trivially become aware of the loss, and this test verifies that the
  // port A on node 0 will eventually also become aware of it.

  // Make sure node2 stops processing events when it encounters an ObserveProxy.
  node2.BlockOnEvent(Event::Type::kObserveProxy);

  EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F));
  WaitForIdle();

  // Simulate node 1 and 2 disconnecting.
  EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name()));

  // Let node2 continue processing events and wait for everyone to go idle.
  node2.Unblock();
  WaitForIdle();

  // Port F should be gone.
  EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F));

  // Port E should have detected peer closure despite the fact that there is
  // no longer a continuous route from F to E over which the event could travel.
  PortStatus status{};
  EXPECT_EQ(OK, node0.node().GetStatus(E, &status));
  EXPECT_TRUE(status.peer_closed);

  EXPECT_EQ(OK, node0.node().ClosePort(A));
  EXPECT_EQ(OK, node1.node().ClosePort(B));
  EXPECT_EQ(OK, node1.node().ClosePort(C));
  EXPECT_EQ(OK, node0.node().ClosePort(E));

  WaitForIdle();

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, LostConnectionToNodeAfterSendingObserveProxy) {
  // Tests that a proxy gets cleaned up after a node disconnect if the
  // previous port already received the ObserveProxy event.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  TestNode node2(2);
  AddNode(&node2);

  // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
  PortRef A, B, C, D;
  CreatePortPair(&node0, &A, &node1, &B);
  CreatePortPair(&node1, &C, &node2, &D);

  // Create E-F and send F over A to node 1.
  PortRef E, F;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F));

  WaitForIdle();

  ScopedMessage message;
  ASSERT_TRUE(node1.ReadMessage(B, &message));
  ASSERT_EQ(1u, message->num_ports());

  EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F));

  // Send F over C to node 2 and then simulate node 2 loss from node 1 after
  // node 0 received the ObserveProxy event. Node 1 needs to clean up the
  // closed proxy while the node 0 to node 2 connection is still intact.
  node0.BlockOnEvent(Event::Type::kObserveProxy);

  EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F));
  WaitForIdle();

  // Simulate node 1 and 2 disconnecting.
  EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name()));

  // Let node2 continue processing events and wait for everyone to go idle.
  node0.Unblock();
  WaitForIdle();

  // Port F should be gone.
  EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F));

  EXPECT_EQ(OK, node0.node().ClosePort(A));
  EXPECT_EQ(OK, node1.node().ClosePort(B));
  EXPECT_EQ(OK, node1.node().ClosePort(C));
  EXPECT_EQ(OK, node0.node().ClosePort(E));

  WaitForIdle();

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) {
  // Tests that a proxy gets cleaned up when its direct peer lives on a lost
  // node and it's predecessor lives on the same node.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  PortRef A, B;
  CreatePortPair(&node0, &A, &node1, &B);

  PortRef C, D;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));

  // Send D but block node0 on an ObserveProxy event.
  node0.BlockOnEvent(Event::Type::kObserveProxy);
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D));

  // node0 won't collapse the proxy but node1 will receive the message before
  // going idle.
  WaitForIdle();

  ScopedMessage message;
  ASSERT_TRUE(node1.ReadMessage(B, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef E;
  EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E));

  RemoveNode(&node1);

  node0.Unblock();
  WaitForIdle();

  // Port C should have detected peer closure.
  PortStatus status{};
  EXPECT_EQ(OK, node0.node().GetStatus(C, &status));
  EXPECT_TRUE(status.peer_closed);

  EXPECT_EQ(OK, node0.node().ClosePort(A));
  EXPECT_EQ(OK, node1.node().ClosePort(B));
  EXPECT_EQ(OK, node0.node().ClosePort(C));
  EXPECT_EQ(OK, node1.node().ClosePort(E));

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, GetMessage1) {
  TestNode node(0);
  AddNode(&node);

  PortRef a0, a1;
  EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));

  ScopedMessage message;
  EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
  EXPECT_FALSE(message);

  EXPECT_EQ(OK, node.node().ClosePort(a1));

  WaitForIdle();

  EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
            node.node().GetMessage(a0, &message, nullptr));
  EXPECT_FALSE(message);

  EXPECT_EQ(OK, node.node().ClosePort(a0));

  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, GetMessage2) {
  TestNode node(0);
  AddNode(&node);

  PortRef a0, a1;
  EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));

  EXPECT_EQ(OK, node.SendStringMessage(a1, "1"));

  ScopedMessage message;
  EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));

  ASSERT_TRUE(message);
  EXPECT_TRUE(MessageEquals(message, "1"));

  EXPECT_EQ(OK, node.node().ClosePort(a0));
  EXPECT_EQ(OK, node.node().ClosePort(a1));

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, GetMessage3) {
  TestNode node(0);
  AddNode(&node);

  PortRef a0, a1;
  EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));

  const char* kStrings[] = {"1""2""3"};

  for (auto& kString : kStrings) {
    EXPECT_EQ(OK, node.SendStringMessage(a1, kString));
  }

  ScopedMessage message;
  for (auto& kString : kStrings) {
    EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
    ASSERT_TRUE(message);
    EXPECT_TRUE(MessageEquals(message, kString));
  }

  EXPECT_EQ(OK, node.node().ClosePort(a0));
  EXPECT_EQ(OK, node.node().ClosePort(a1));

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, Delegation1) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);

  // In this test, we send a message to a port that has been moved.

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1));
  WaitForIdle();

  ScopedMessage message;
  ASSERT_TRUE(node1.ReadMessage(x1, &message));
  ASSERT_EQ(1u, message->num_ports());
  EXPECT_TRUE(MessageEquals(message, "a1"));

  // This is "a1" from the point of view of node1.
  PortName a2_name = message->ports()[0];
  EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name));
  EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello"));

  WaitForIdle();

  ASSERT_TRUE(node0.ReadMessage(x0, &message));
  ASSERT_EQ(1u, message->num_ports());
  EXPECT_TRUE(MessageEquals(message, "a2"));

  // This is "a2" from the point of view of node1.
  PortName a3_name = message->ports()[0];

  PortRef a3;
  EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3));

  ASSERT_TRUE(node0.ReadMessage(a3, &message));
  EXPECT_EQ(0u, message->num_ports());
  EXPECT_TRUE(MessageEquals(message, "hello"));

  EXPECT_EQ(OK, node0.node().ClosePort(a0));
  EXPECT_EQ(OK, node0.node().ClosePort(a3));

  EXPECT_EQ(OK, node0.node().ClosePort(x0));
  EXPECT_EQ(OK, node1.node().ClosePort(x1));

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, Delegation2) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  for (int i = 0; i < 100; ++i) {
    // Setup pipe a<->b between node0 and node1.
    PortRef A, B;
    CreatePortPair(&node0, &A, &node1, &B);

    PortRef C, D;
    EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));

    PortRef E, F;
    EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));

    node1.set_save_messages(true);

    // Pass D over A to B.
    EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D));

    // Pass F over C to D.
    EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F));

    // This message should find its way to node1.
    EXPECT_EQ(OK, node0.SendStringMessage(E, "hello"));

    WaitForIdle();

    EXPECT_EQ(OK, node0.node().ClosePort(C));
    EXPECT_EQ(OK, node0.node().ClosePort(E));

    EXPECT_EQ(OK, node0.node().ClosePort(A));
    EXPECT_EQ(OK, node1.node().ClosePort(B));

    bool got_hello = false;
    ScopedMessage message;
    while (node1.GetSavedMessage(&message)) {
      node1.ClosePortsInEvent(message.get());
      if (MessageEquals(message, "hello")) {
        got_hello = true;
        break;
      }
    }

    EXPECT_TRUE(got_hello);

    WaitForIdle();  // Because closing ports may have generated tasks.
  }

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, SendUninitialized) {
  TestNode node(0);
  AddNode(&node);

  PortRef x0;
  EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0));
  EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops"));
  EXPECT_EQ(OK, node.node().ClosePort(x0));
  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, SendFailure) {
  TestNode node(0);
  AddNode(&node);

  node.set_save_messages(true);

  PortRef A, B;
  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));

  // Try to send A over itself.

  EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF,
            node.SendStringMessageWithPort(A, "oops", A));

  // Try to send B over A.

  EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER,
            node.SendStringMessageWithPort(A, "nope", B));

  // B should be closed immediately.
  EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B));

  WaitForIdle();

  // There should have been no messages accepted.
  ScopedMessage message;
  EXPECT_FALSE(node.GetSavedMessage(&message));

  EXPECT_EQ(OK, node.node().ClosePort(A));

  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, DontLeakUnreceivedPorts) {
  TestNode node(0);
  AddNode(&node);

  PortRef A, B, C, D;
  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));

  EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));

  EXPECT_EQ(OK, node.node().ClosePort(C));
  EXPECT_EQ(OK, node.node().ClosePort(A));
  EXPECT_EQ(OK, node.node().ClosePort(B));

  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) {
  TestNode node(0);
  AddNode(&node);

  PortRef A, B, C, D;
  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));

  EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));

  ScopedMessage message;
  EXPECT_TRUE(node.ReadMessage(B, &message));
  ASSERT_EQ(1u, message->num_ports());
  EXPECT_TRUE(MessageEquals(message, "foo"));
  PortRef E;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));

  EXPECT_TRUE(
      node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));

  WaitForIdle();

  EXPECT_TRUE(
      node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
  EXPECT_FALSE(node.node().CanShutdownCleanly());

  EXPECT_EQ(OK, node.node().ClosePort(A));
  EXPECT_EQ(OK, node.node().ClosePort(B));
  EXPECT_EQ(OK, node.node().ClosePort(C));
  EXPECT_EQ(OK, node.node().ClosePort(E));

  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, ProxyCollapse1) {
  TestNode node(0);
  AddNode(&node);

  PortRef A, B;
  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));

  PortRef X, Y;
  EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));

  ScopedMessage message;

  // Send B and receive it as C.
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
  ASSERT_TRUE(node.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef C;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));

  // Send C and receive it as D.
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
  ASSERT_TRUE(node.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef D;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));

  // Send D and receive it as E.
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D));
  ASSERT_TRUE(node.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef E;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));

  EXPECT_EQ(OK, node.node().ClosePort(X));
  EXPECT_EQ(OK, node.node().ClosePort(Y));

  EXPECT_EQ(OK, node.node().ClosePort(A));
  EXPECT_EQ(OK, node.node().ClosePort(E));

  // The node should not idle until all proxies are collapsed.
  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, ProxyCollapse2) {
  TestNode node(0);
  AddNode(&node);

  PortRef A, B;
  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));

  PortRef X, Y;
  EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));

  ScopedMessage message;

  // Send B and A to create proxies in each direction.
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));

  EXPECT_EQ(OK, node.node().ClosePort(X));
  EXPECT_EQ(OK, node.node().ClosePort(Y));

  // At this point we have a scenario with:
  //
  // D -> [B] -> C -> [A]
  //
  // Ensure that the proxies can collapse. The sent ports will be closed
  // eventually as a result of Y's closure.

  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, SendWithClosedPeer) {
  // This tests that if a port is sent when its peer is already known to be
  // closed, the newly created port will be aware of that peer closure, and the
  // proxy will eventually collapse.

  TestNode node(0);
  AddNode(&node);

  // Send a message from A to B, then close A.
  PortRef A, B;
  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node.SendStringMessage(A, "hey"));
  EXPECT_EQ(OK, node.node().ClosePort(A));

  // Now send B over X-Y as new port C.
  PortRef X, Y;
  EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
  ScopedMessage message;
  ASSERT_TRUE(node.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef C;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));

  EXPECT_EQ(OK, node.node().ClosePort(X));
  EXPECT_EQ(OK, node.node().ClosePort(Y));

  WaitForIdle();

  // C should have received the message originally sent to B, and it should also
  // be aware of A's closure.

  ASSERT_TRUE(node.ReadMessage(C, &message));
  EXPECT_TRUE(MessageEquals(message, "hey"));

  PortStatus status{};
  EXPECT_EQ(OK, node.node().GetStatus(C, &status));
  EXPECT_FALSE(status.receiving_messages);
  EXPECT_FALSE(status.has_messages);
  EXPECT_TRUE(status.peer_closed);

  node.node().ClosePort(C);

  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, SendWithClosedPeerSent) {
  // This tests that if a port is closed while some number of proxies are still
  // routing messages (directly or indirectly) to it, that the peer port is
  // eventually notified of the closure, and the dead-end proxies will
  // eventually be removed.

  TestNode node(0);
  AddNode(&node);

  PortRef X, Y;
  EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));

  PortRef A, B;
  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));

  ScopedMessage message;

  // Send A as new port C.
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));

  ASSERT_TRUE(node.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef C;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));

  // Send C as new port D.
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));

  ASSERT_TRUE(node.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef D;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));

  // Send a message to B through D, then close D.
  EXPECT_EQ(OK, node.SendStringMessage(D, "hey"));
  EXPECT_EQ(OK, node.node().ClosePort(D));

  // Now send B as new port E.

  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
  EXPECT_EQ(OK, node.node().ClosePort(X));

  ASSERT_TRUE(node.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef E;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));

  EXPECT_EQ(OK, node.node().ClosePort(Y));

  WaitForIdle();

  // E should receive the message originally sent to B, and it should also be
  // aware of D's closure.

  ASSERT_TRUE(node.ReadMessage(E, &message));
  EXPECT_TRUE(MessageEquals(message, "hey"));

  PortStatus status{};
  EXPECT_EQ(OK, node.node().GetStatus(E, &status));
  EXPECT_FALSE(status.receiving_messages);
  EXPECT_FALSE(status.has_messages);
  EXPECT_TRUE(status.peer_closed);

  EXPECT_EQ(OK, node.node().ClosePort(E));

  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, MergePorts) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));

  // Write a message on A.
  EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));

  // Initiate a merge between B and C.
  node1.AllowPortMerge(C);
  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));

  WaitForIdle();

  // Expect all proxies to be gone once idle.
  EXPECT_TRUE(
      node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
  EXPECT_TRUE(
      node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));

  // Expect D to have received the message sent on A.
  ScopedMessage message;
  ASSERT_TRUE(node1.ReadMessage(D, &message));
  EXPECT_TRUE(MessageEquals(message, "hey"));

  EXPECT_EQ(OK, node0.node().ClosePort(A));
  EXPECT_EQ(OK, node1.node().ClosePort(D));

  // No more ports should be open.
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, MergePortWithClosedPeer1) {
  // This tests that the right thing happens when initiating a merge on a port
  // whose peer has already been closed.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));

  // Write a message on A.
  EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));

  // Close A.
  EXPECT_EQ(OK, node0.node().ClosePort(A));

  // Initiate a merge between B and C.
  node1.AllowPortMerge(C);
  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));

  WaitForIdle();

  // Expect all proxies to be gone once idle. node0 should have no ports since
  // A was explicitly closed.
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(
      node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));

  // Expect D to have received the message sent on A.
  ScopedMessage message;
  ASSERT_TRUE(node1.ReadMessage(D, &message));
  EXPECT_TRUE(MessageEquals(message, "hey"));

  EXPECT_EQ(OK, node1.node().ClosePort(D));

  // No more ports should be open.
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, MergePortWithClosedPeer2) {
  // This tests that the right thing happens when merging into a port whose peer
  // has already been closed.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));

  // Write a message on D and close it.
  EXPECT_EQ(OK, node1.SendStringMessage(D, "hey"));
  EXPECT_EQ(OK, node1.node().ClosePort(D));

  // Initiate a merge between B and C.
  node1.AllowPortMerge(C);
  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));

  WaitForIdle();

  // Expect all proxies to be gone once idle. node1 should have no ports since
  // D was explicitly closed.
  EXPECT_TRUE(
      node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
  EXPECT_TRUE(node1.node().CanShutdownCleanly());

  // Expect A to have received the message sent on D.
  ScopedMessage message;
  ASSERT_TRUE(node0.ReadMessage(A, &message));
  EXPECT_TRUE(MessageEquals(message, "hey"));

  EXPECT_EQ(OK, node0.node().ClosePort(A));

  // No more ports should be open.
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, MergePortsWithClosedPeers) {
  // This tests that no residual ports are left behind if two ports are merged
  // when both of their peers have been closed.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));

  // Close A and D.
  EXPECT_EQ(OK, node0.node().ClosePort(A));
  EXPECT_EQ(OK, node1.node().ClosePort(D));

  WaitForIdle();

  // Initiate a merge between B and C.
  node1.AllowPortMerge(C);
  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));

  WaitForIdle();

  // Expect everything to have gone away.
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, MergePortsWithMovedPeers) {
  // This tests that ports can be merged successfully even if their peers are
  // moved around.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));

  // Set up another pair X-Y for moving ports on node0.
  PortRef X, Y;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y));

  ScopedMessage message;

  // Move A to new port E.
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A));
  ASSERT_TRUE(node0.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef E;
  ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));

  EXPECT_EQ(OK, node0.node().ClosePort(X));
  EXPECT_EQ(OK, node0.node().ClosePort(Y));

  // Write messages on E and D.
  EXPECT_EQ(OK, node0.SendStringMessage(E, "hey"));
  EXPECT_EQ(OK, node1.SendStringMessage(D, "hi"));

  // Initiate a merge between B and C.
  node1.AllowPortMerge(C);
  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));

  WaitForIdle();

  // Expect to receive D's message on E and E's message on D.
  ASSERT_TRUE(node0.ReadMessage(E, &message));
  EXPECT_TRUE(MessageEquals(message, "hi"));
  ASSERT_TRUE(node1.ReadMessage(D, &message));
  EXPECT_TRUE(MessageEquals(message, "hey"));

  // Close E and D.
  EXPECT_EQ(OK, node0.node().ClosePort(E));
  EXPECT_EQ(OK, node1.node().ClosePort(D));

  WaitForIdle();

  // Expect everything to have gone away.
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, MergePortsFailsGracefully) {
  // This tests that the system remains in a well-defined state if something
  // goes wrong during port merge.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));

  ScopedMessage message;
  PortRef X, Y;
  EXPECT_EQ(OK, node0.node().CreateUninitializedPort(&X));
  EXPECT_EQ(OK, node1.node().CreateUninitializedPort(&Y));
  EXPECT_EQ(OK, node0.node().InitializePort(X, node1.name(), Y.name(),
                                            node1.name(), Y.name()));
  EXPECT_EQ(OK, node1.node().InitializePort(Y, node0.name(), X.name(),
                                            node0.name(), X.name()));

  // Block the merge from proceeding until we can do something stupid with port
  // C. This avoids the test logic racing with async merge logic.
  node1.BlockOnEvent(Event::Type::kMergePort);

  // Initiate the merge between B and C.
  node1.AllowPortMerge(C);
  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));

  // Move C to a new port E. This is not a sane use of Node's public API but
  // is still hypothetically possible. It allows us to force a merge failure
  // because C will be in an invalid state by the time the merge is processed.
  // As a result, B should be closed.
  EXPECT_EQ(OK, node1.SendStringMessageWithPort(Y, "foo", C));

  node1.Unblock();

  WaitForIdle();

  ASSERT_TRUE(node0.ReadMessage(X, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef E;
  ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));

  EXPECT_EQ(OK, node0.node().ClosePort(X));
  EXPECT_EQ(OK, node1.node().ClosePort(Y));

  WaitForIdle();

  // C goes away as a result of normal proxy removal. B should have been closed
  // cleanly by the failed MergePorts.
  EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C));
  EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B));

  // Close A, D, and E.
  EXPECT_EQ(OK, node0.node().ClosePort(A));
  EXPECT_EQ(OK, node1.node().ClosePort(D));
  EXPECT_EQ(OK, node0.node().ClosePort(E));

  WaitForIdle();

  // Expect everything to have gone away.
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, RemotePeerStatus) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Create a local port pair. Neither port should appear to have a remote peer.
  PortRef a, b;
  PortStatus status{};
  node0.node().CreatePortPair(&a, &b);
  ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
  EXPECT_FALSE(status.peer_remote);
  ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
  EXPECT_FALSE(status.peer_remote);

  // Create a port pair spanning the two nodes. Both spanning ports should
  // immediately appear to have a remote peer.
  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);

  ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
  EXPECT_TRUE(status.peer_remote);
  ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
  EXPECT_TRUE(status.peer_remote);

  PortRef x2, x3;
  CreatePortPair(&node0, &x2, &node1, &x3);

  // Transfer |b| to |node1| and |x1| to |node0|. i.e., make the local peers
  // remote and the remote peers local.
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", b));
  EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", x1));
  WaitForIdle();

  ScopedMessage message;
  ASSERT_TRUE(node0.ReadMessage(x2, &message));
  ASSERT_EQ(1u, message->num_ports());
  ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &x1));

  ASSERT_TRUE(node1.ReadMessage(x3, &message));
  ASSERT_EQ(1u, message->num_ports());
  ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &b));

  // Now x0-x1 should be local to node0 and a-b should span the nodes.
  ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
  EXPECT_FALSE(status.peer_remote);
  ASSERT_EQ(OK, node0.node().GetStatus(x1, &status));
  EXPECT_FALSE(status.peer_remote);
  ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
  EXPECT_TRUE(status.peer_remote);
  ASSERT_EQ(OK, node1.node().GetStatus(b, &status));
  EXPECT_TRUE(status.peer_remote);

  // And swap them back one more time.
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", x1));
  EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", b));
  WaitForIdle();

  ASSERT_TRUE(node0.ReadMessage(x2, &message));
  ASSERT_EQ(1u, message->num_ports());
  ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &b));

  ASSERT_TRUE(node1.ReadMessage(x3, &message));
  ASSERT_EQ(1u, message->num_ports());
  ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &x1));

  ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
  EXPECT_TRUE(status.peer_remote);
  ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
  EXPECT_TRUE(status.peer_remote);
  ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
  EXPECT_FALSE(status.peer_remote);
  ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
  EXPECT_FALSE(status.peer_remote);

  EXPECT_EQ(OK, node0.node().ClosePort(x0));
  EXPECT_EQ(OK, node1.node().ClosePort(x1));
  EXPECT_EQ(OK, node0.node().ClosePort(x2));
  EXPECT_EQ(OK, node1.node().ClosePort(x3));
  EXPECT_EQ(OK, node0.node().ClosePort(a));
  EXPECT_EQ(OK, node0.node().ClosePort(b));

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, RemotePeerStatusAfterLocalPortMerge) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Set up a-b on node0 and c-d spanning node0-node1.
  PortRef a, b, c, d;
  node0.node().CreatePortPair(&a, &b);
  CreatePortPair(&node0, &c, &node1, &d);

  PortStatus status{};
  ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
  EXPECT_FALSE(status.peer_remote);
  ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
  EXPECT_FALSE(status.peer_remote);
  ASSERT_EQ(OK, node0.node().GetStatus(c, &status));
  EXPECT_TRUE(status.peer_remote);
  ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
  EXPECT_TRUE(status.peer_remote);

  EXPECT_EQ(OK, node0.node().MergeLocalPorts(b, c));
  WaitForIdle();

  ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
  EXPECT_TRUE(status.peer_remote);
  ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
  EXPECT_TRUE(status.peer_remote);

  EXPECT_EQ(OK, node0.node().ClosePort(a));
  EXPECT_EQ(OK, node1.node().ClosePort(d));
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, RemotePeerStatusAfterRemotePortMerge) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Set up a-b on node0 and c-d on node1.
  PortRef a, b, c, d;
  node0.node().CreatePortPair(&a, &b);
  node1.node().CreatePortPair(&c, &d);

  PortStatus status{};
  ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
  EXPECT_FALSE(status.peer_remote);
  ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
  EXPECT_FALSE(status.peer_remote);
  ASSERT_EQ(OK, node1.node().GetStatus(c, &status));
  EXPECT_FALSE(status.peer_remote);
  ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
  EXPECT_FALSE(status.peer_remote);

  node1.AllowPortMerge(c);
  EXPECT_EQ(OK, node0.node().MergePorts(b, node1.name(), c.name()));
  WaitForIdle();

  ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
  EXPECT_TRUE(status.peer_remote);
  ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
  EXPECT_TRUE(status.peer_remote);

  EXPECT_EQ(OK, node0.node().ClosePort(a));
  EXPECT_EQ(OK, node1.node().ClosePort(d));
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, RetransmitUserMessageEvents) {
  // Ensures that user message events can be retransmitted properly.
  TestNode node0(0);
  AddNode(&node0);

  PortRef a, b;
  node0.node().CreatePortPair(&a, &b);

  // Ping.
  const char* kMessage = "hey";
  ScopedMessage message;
  EXPECT_EQ(OK, node0.SendStringMessage(a, kMessage));
  ASSERT_TRUE(node0.ReadMessage(b, &message));
  EXPECT_TRUE(MessageEquals(message, kMessage));

  // Pong.
  EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
  EXPECT_FALSE(message);
  ASSERT_TRUE(node0.ReadMessage(a, &message));
  EXPECT_TRUE(MessageEquals(message, kMessage));

  // Ping again.
  EXPECT_EQ(OK, node0.node().SendUserMessage(a, std::move(message)));
  EXPECT_FALSE(message);
  ASSERT_TRUE(node0.ReadMessage(b, &message));
  EXPECT_TRUE(MessageEquals(message, kMessage));

  // Pong again!
  EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
  EXPECT_FALSE(message);
  ASSERT_TRUE(node0.ReadMessage(a, &message));
  EXPECT_TRUE(MessageEquals(message, kMessage));

  EXPECT_EQ(OK, node0.node().ClosePort(a));
  EXPECT_EQ(OK, node0.node().ClosePort(b));
}

TEST_F(PortsTest, SetAcknowledgeRequestInterval) {
  TestNode node0(0);
  AddNode(&node0);

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
  EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));

  // Send a batch of messages.
  EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 15));
  EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0));
  EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5));
  WaitForIdle();
  EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0));

  // Set to acknowledge every read message, and validate that already-read
  // messages are acknowledged.
  EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 1));
  WaitForIdle();
  EXPECT_EQ(10u, node0.GetUnacknowledgedMessageCount(a0));

  // Read a third of the messages from the other end.
  EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5));
  WaitForIdle();

  EXPECT_EQ(5u, node0.GetUnacknowledgedMessageCount(a0));

  TestNode node1(1);
  AddNode(&node1);

  // Transfer a1 across to node1.
  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));
  WaitForIdle();

  ScopedMessage message;
  ASSERT_TRUE(node1.ReadMessage(x1, &message));
  ASSERT_EQ(1u, message->num_ports());
  ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &a1));

  // Read the last third of the messages from the transferred node, and
  // validate that the unacknowledge message count updates correctly.
  EXPECT_TRUE(node1.ReadMultipleMessages(a1, 5));
  WaitForIdle();
  EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));

  // Turn the acknowledges down and validate that they don't go on indefinitely.
  EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 0));
  EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 10));
  WaitForIdle();
  EXPECT_TRUE(node1.ReadMultipleMessages(a1, 10));
  WaitForIdle();
  EXPECT_NE(0u, node0.GetUnacknowledgedMessageCount(a0));

  // Close the far port and validate that the closure updates the unacknowledged
  // count.
  EXPECT_EQ(OK, node1.node().ClosePort(a1));
  WaitForIdle();
  EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));

  EXPECT_EQ(OK, node0.node().ClosePort(a0));
  EXPECT_EQ(OK, node0.node().ClosePort(x0));
  EXPECT_EQ(OK, node1.node().ClosePort(x1));
}

}  // namespace test
}  // namespace ports
}  // namespace core
}  // namespace mojo

Messung V0.5
C=90 H=87 G=88

¤ Dauer der Verarbeitung: 0.21 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.