// Copyright 2016 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file.
int SendMultipleMessages(const PortRef& port, size_t num_messages) { for (size_t i = 0; i < num_messages; ++i) { int result = SendStringMessage(port, ""); if (result != OK) { return result;
}
} return OK;
}
// NOTE: This may be called from ForwardMessage and thus must not reenter // |node_|.
mozilla::MutexAutoLock lock(lock_);
incoming_events_.push({from_node, std::move(event)});
events_available_event_.Signal();
}
void ForwardEvent(const NodeName& node_name, ScopedEvent event) override {
{
mozilla::MutexAutoLock lock(lock_); if (drop_messages_) {
DVLOG(1) << "Dropping ForwardMessage from node " << node_name_ << " to "
<< node_name;
void PortStatusChanged(const PortRef& port) override { // The port may be closed, in which case we ignore the notification.
mozilla::MutexAutoLock lock(lock_); if (!save_messages_) { return;
}
for (;;) {
ScopedMessage message;
{
mozilla::MutexAutoUnlock unlock(lock_); if (!ReadMessage(port, &message)) { break;
}
}
private: void ProcessEvents() { for (;;) {
events_available_event_.Wait();
mozilla::MutexAutoLock lock(lock_);
if (should_quit_) { return;
}
dispatching_ = true; while (!incoming_events_.empty()) { if (block_on_event_ &&
incoming_events_.front().second->type() == blocked_event_type_) {
blocked_ = true; // Go idle if we hit a blocked event type. break;
}
blocked_ = false;
auto node_event_pair = std::move(incoming_events_.front());
incoming_events_.pop();
// NOTE: AcceptMessage() can re-enter this object to call any of the // NodeDelegate interface methods.
mozilla::MutexAutoUnlock unlock(lock_);
node_.AcceptEvent(node_event_pair.first,
std::move(node_event_pair.second));
}
for (constauto& entry : nodes_) {
entry.second->node().LostConnectionToNode(node->name());
}
}
// Waits until all known Nodes are idle. Message forwarding and processing // is handled in such a way that idleness is a stable state: once all nodes in // the system are idle, they will remain idle until the test explicitly // initiates some further event (e.g. sending a message, closing a port, or // removing a Node). void WaitForIdle() { for (;;) {
mozilla::MutexAutoLock global_lock(global_lock_); bool all_nodes_idle = true; for (constauto& entry : nodes_) { if (!entry.second->IsIdle()) {
all_nodes_idle = false;
}
entry.second->WakeUp();
} if (all_nodes_idle) { return;
}
// Wait for any Node to signal that it's idle.
mozilla::MutexAutoUnlock global_unlock(global_lock_);
std::vector<base::WaitableEvent*> events; for (constauto& entry : nodes_) {
events.push_back(&entry.second->idle_event());
}
base::WaitableEvent::WaitMany(events.data(), events.size());
}
}
private: // MessageRouter: void ForwardEvent(TestNode* from_node, const NodeName& node_name,
ScopedEvent event) override {
mozilla::MutexAutoLock global_lock(global_lock_);
mozilla::MutexAutoLock lock(lock_); // Drop messages from nodes that have been removed. if (nodes_.find(from_node->name()) == nodes_.end()) {
from_node->ClosePortsInEvent(event.get()); return;
}
auto it = nodes_.find(node_name); if (it == nodes_.end()) {
DVLOG(1) << "Node not found: " << node_name; return;
}
// Serialize and de-serialize all forwarded events.
size_t buf_size = event->GetSerializedSize();
mozilla::UniquePtr<char[]> buf(newchar[buf_size]);
event->Serialize(buf.get());
ScopedEvent copy = Event::Deserialize(buf.get(), buf_size); // This should always succeed unless serialization or deserialization // is broken. In that case, the loss of events should cause a test failure.
ASSERT_TRUE(copy);
// Also copy the payload for user messages. if (event->type() == Event::Type::kUserMessage) {
UserMessageEvent* message_event = static_cast<UserMessageEvent*>(event.get());
UserMessageEvent* message_copy = static_cast<UserMessageEvent*>(copy.get());
// Drop messages from nodes that have been removed. if (nodes_.find(from_node->name()) == nodes_.end()) { return;
}
for (constauto& entry : nodes_) {
TestNode* node = entry.second; // Broadcast doesn't deliver to the local node. if (node == from_node) { continue;
}
node->EnqueueEvent(from_node->name(), event->CloneForBroadcast());
}
}
// Acquired before any operation which makes a Node busy, and before testing // if all nodes are idle.
mozilla::Mutex global_lock_ MOZ_UNANNOTATED{"PortsTest Global Lock"};
// a0 should have eventually detected peer closure after node loss.
ScopedMessage message;
EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
node0.node().GetMessage(a0, &message, nullptr));
EXPECT_FALSE(message);
TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) { // Tests that a proxy gets cleaned up when its indirect peer lives on a lost // node.
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
TestNode node2(2);
AddNode(&node2);
// Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
PortRef A, B, C, D;
CreatePortPair(&node0, &A, &node1, &B);
CreatePortPair(&node1, &C, &node2, &D);
// Create E-F and send F over A to node 1.
PortRef E, F;
EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F));
// Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1 // will trivially become aware of the loss, and this test verifies that the // port A on node 0 will eventually also become aware of it.
// Make sure node2 stops processing events when it encounters an ObserveProxy.
node2.BlockOnEvent(Event::Type::kObserveProxy);
// Simulate node 1 and 2 disconnecting.
EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name()));
// Let node2 continue processing events and wait for everyone to go idle.
node2.Unblock();
WaitForIdle();
// Port F should be gone.
EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F));
// Port E should have detected peer closure despite the fact that there is // no longer a continuous route from F to E over which the event could travel.
PortStatus status{};
EXPECT_EQ(OK, node0.node().GetStatus(E, &status));
EXPECT_TRUE(status.peer_closed);
TEST_F(PortsTest, LostConnectionToNodeAfterSendingObserveProxy) { // Tests that a proxy gets cleaned up after a node disconnect if the // previous port already received the ObserveProxy event.
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
TestNode node2(2);
AddNode(&node2);
// Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
PortRef A, B, C, D;
CreatePortPair(&node0, &A, &node1, &B);
CreatePortPair(&node1, &C, &node2, &D);
// Create E-F and send F over A to node 1.
PortRef E, F;
EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F));
// Send F over C to node 2 and then simulate node 2 loss from node 1 after // node 0 received the ObserveProxy event. Node 1 needs to clean up the // closed proxy while the node 0 to node 2 connection is still intact.
node0.BlockOnEvent(Event::Type::kObserveProxy);
TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) { // Tests that a proxy gets cleaned up when its direct peer lives on a lost // node and it's predecessor lives on the same node.
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
PortRef A, B;
CreatePortPair(&node0, &A, &node1, &B);
// Send D but block node0 on an ObserveProxy event.
node0.BlockOnEvent(Event::Type::kObserveProxy);
EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D));
// node0 won't collapse the proxy but node1 will receive the message before // going idle.
WaitForIdle();
// This is "a1" from the point of view of node1.
PortName a2_name = message->ports()[0];
EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name));
EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello"));
// Send B and receive it as C.
EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
ASSERT_TRUE(node.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef C;
ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
// Send C and receive it as D.
EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
ASSERT_TRUE(node.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef D;
ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
// Send D and receive it as E.
EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D));
ASSERT_TRUE(node.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef E;
ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
// Send B and A to create proxies in each direction.
EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
// At this point we have a scenario with: // // D -> [B] -> C -> [A] // // Ensure that the proxies can collapse. The sent ports will be closed // eventually as a result of Y's closure.
WaitForIdle();
EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, SendWithClosedPeer) { // This tests that if a port is sent when its peer is already known to be // closed, the newly created port will be aware of that peer closure, and the // proxy will eventually collapse.
TestNode node(0);
AddNode(&node);
// Send a message from A to B, then close A.
PortRef A, B;
EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
EXPECT_EQ(OK, node.SendStringMessage(A, "hey"));
EXPECT_EQ(OK, node.node().ClosePort(A));
// Now send B over X-Y as new port C.
PortRef X, Y;
EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
ScopedMessage message;
ASSERT_TRUE(node.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef C;
ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
TEST_F(PortsTest, SendWithClosedPeerSent) { // This tests that if a port is closed while some number of proxies are still // routing messages (directly or indirectly) to it, that the peer port is // eventually notified of the closure, and the dead-end proxies will // eventually be removed.
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
// Write a message on A.
EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
// Initiate a merge between B and C.
node1.AllowPortMerge(C);
EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
WaitForIdle();
// Expect all proxies to be gone once idle.
EXPECT_TRUE(
node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
EXPECT_TRUE(
node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
// Expect D to have received the message sent on A.
ScopedMessage message;
ASSERT_TRUE(node1.ReadMessage(D, &message));
EXPECT_TRUE(MessageEquals(message, "hey"));
// No more ports should be open.
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, MergePortWithClosedPeer1) { // This tests that the right thing happens when initiating a merge on a port // whose peer has already been closed.
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
// Write a message on A.
EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
// Close A.
EXPECT_EQ(OK, node0.node().ClosePort(A));
// Initiate a merge between B and C.
node1.AllowPortMerge(C);
EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
WaitForIdle();
// Expect all proxies to be gone once idle. node0 should have no ports since // A was explicitly closed.
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(
node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
// Expect D to have received the message sent on A.
ScopedMessage message;
ASSERT_TRUE(node1.ReadMessage(D, &message));
EXPECT_TRUE(MessageEquals(message, "hey"));
EXPECT_EQ(OK, node1.node().ClosePort(D));
// No more ports should be open.
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, MergePortWithClosedPeer2) { // This tests that the right thing happens when merging into a port whose peer // has already been closed.
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
// Write a message on D and close it.
EXPECT_EQ(OK, node1.SendStringMessage(D, "hey"));
EXPECT_EQ(OK, node1.node().ClosePort(D));
// Initiate a merge between B and C.
node1.AllowPortMerge(C);
EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
WaitForIdle();
// Expect all proxies to be gone once idle. node1 should have no ports since // D was explicitly closed.
EXPECT_TRUE(
node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
EXPECT_TRUE(node1.node().CanShutdownCleanly());
// Expect A to have received the message sent on D.
ScopedMessage message;
ASSERT_TRUE(node0.ReadMessage(A, &message));
EXPECT_TRUE(MessageEquals(message, "hey"));
EXPECT_EQ(OK, node0.node().ClosePort(A));
// No more ports should be open.
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, MergePortsWithClosedPeers) { // This tests that no residual ports are left behind if two ports are merged // when both of their peers have been closed.
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
// Close A and D.
EXPECT_EQ(OK, node0.node().ClosePort(A));
EXPECT_EQ(OK, node1.node().ClosePort(D));
WaitForIdle();
// Initiate a merge between B and C.
node1.AllowPortMerge(C);
EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
WaitForIdle();
// Expect everything to have gone away.
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, MergePortsWithMovedPeers) { // This tests that ports can be merged successfully even if their peers are // moved around.
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
// Set up another pair X-Y for moving ports on node0.
PortRef X, Y;
EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y));
ScopedMessage message;
// Move A to new port E.
EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A));
ASSERT_TRUE(node0.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef E;
ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
// Write messages on E and D.
EXPECT_EQ(OK, node0.SendStringMessage(E, "hey"));
EXPECT_EQ(OK, node1.SendStringMessage(D, "hi"));
// Initiate a merge between B and C.
node1.AllowPortMerge(C);
EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
WaitForIdle();
// Expect to receive D's message on E and E's message on D.
ASSERT_TRUE(node0.ReadMessage(E, &message));
EXPECT_TRUE(MessageEquals(message, "hi"));
ASSERT_TRUE(node1.ReadMessage(D, &message));
EXPECT_TRUE(MessageEquals(message, "hey"));
// Close E and D.
EXPECT_EQ(OK, node0.node().ClosePort(E));
EXPECT_EQ(OK, node1.node().ClosePort(D));
WaitForIdle();
// Expect everything to have gone away.
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, MergePortsFailsGracefully) { // This tests that the system remains in a well-defined state if something // goes wrong during port merge.
TestNode node0(0);
AddNode(&node0);
TestNode node1(1);
AddNode(&node1);
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
// Block the merge from proceeding until we can do something stupid with port // C. This avoids the test logic racing with async merge logic.
node1.BlockOnEvent(Event::Type::kMergePort);
// Initiate the merge between B and C.
node1.AllowPortMerge(C);
EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
// Move C to a new port E. This is not a sane use of Node's public API but // is still hypothetically possible. It allows us to force a merge failure // because C will be in an invalid state by the time the merge is processed. // As a result, B should be closed.
EXPECT_EQ(OK, node1.SendStringMessageWithPort(Y, "foo", C));
// C goes away as a result of normal proxy removal. B should have been closed // cleanly by the failed MergePorts.
EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C));
EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B));
// Close A, D, and E.
EXPECT_EQ(OK, node0.node().ClosePort(A));
EXPECT_EQ(OK, node1.node().ClosePort(D));
EXPECT_EQ(OK, node0.node().ClosePort(E));
WaitForIdle();
// Expect everything to have gone away.
EXPECT_TRUE(node0.node().CanShutdownCleanly());
EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
// Create a local port pair. Neither port should appear to have a remote peer.
PortRef a, b;
PortStatus status{};
node0.node().CreatePortPair(&a, &b);
ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
EXPECT_FALSE(status.peer_remote);
ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
EXPECT_FALSE(status.peer_remote);
// Create a port pair spanning the two nodes. Both spanning ports should // immediately appear to have a remote peer.
PortRef x0, x1;
CreatePortPair(&node0, &x0, &node1, &x1);
// Transfer |b| to |node1| and |x1| to |node0|. i.e., make the local peers // remote and the remote peers local.
EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", b));
EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", x1));
WaitForIdle();
// Now x0-x1 should be local to node0 and a-b should span the nodes.
ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
EXPECT_FALSE(status.peer_remote);
ASSERT_EQ(OK, node0.node().GetStatus(x1, &status));
EXPECT_FALSE(status.peer_remote);
ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
EXPECT_TRUE(status.peer_remote);
ASSERT_EQ(OK, node1.node().GetStatus(b, &status));
EXPECT_TRUE(status.peer_remote);
// And swap them back one more time.
EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", x1));
EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", b));
WaitForIdle();
TEST_F(PortsTest, RetransmitUserMessageEvents) { // Ensures that user message events can be retransmitted properly.
TestNode node0(0);
AddNode(&node0);
PortRef a, b;
node0.node().CreatePortPair(&a, &b);
// Send a batch of messages.
EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 15));
EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0));
EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5));
WaitForIdle();
EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0));
// Set to acknowledge every read message, and validate that already-read // messages are acknowledged.
EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 1));
WaitForIdle();
EXPECT_EQ(10u, node0.GetUnacknowledgedMessageCount(a0));
// Read a third of the messages from the other end.
EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5));
WaitForIdle();
// Read the last third of the messages from the transferred node, and // validate that the unacknowledge message count updates correctly.
EXPECT_TRUE(node1.ReadMultipleMessages(a1, 5));
WaitForIdle();
EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));
// Turn the acknowledges down and validate that they don't go on indefinitely.
EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 0));
EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 10));
WaitForIdle();
EXPECT_TRUE(node1.ReadMultipleMessages(a1, 10));
WaitForIdle();
EXPECT_NE(0u, node0.GetUnacknowledgedMessageCount(a0));
// Close the far port and validate that the closure updates the unacknowledged // count.
EXPECT_EQ(OK, node1.node().ClosePort(a1));
WaitForIdle();
EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.