/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
// Helper logger macro which includes the name of the `this` NodeController in // the logged messages. #define NODECONTROLLER_LOG(level_, fmt_, ...) \
MOZ_LOG(gNodeControllerLog, level_, \
("[%s]: " fmt_, ToString(mName).c_str(), ##__VA_ARGS__))
// Helper warning macro which both does logger logging and emits NS_WARNING logs // under debug mode. #ifdef DEBUG # define NODECONTROLLER_WARNING(fmt_, ...) \ do { \
nsPrintfCString warning("[%s]: " fmt_, ToString(mName).c_str(), \ ##__VA_ARGS__); \
NS_WARNING(warning.get()); \
MOZ_LOG(gNodeControllerLog, LogLevel::Debug, ("%s", warning.get())); \
} while (0) #else # define NODECONTROLLER_WARNING(fmt_, ...) \
NODECONTROLLER_LOG(LogLevel::Warning, fmt_, ##__VA_ARGS__) #endif
NodeController::~NodeController() { auto state = mState.Lock();
MOZ_RELEASE_ASSERT(state->mPeers.IsEmpty(), "Destroying NodeController before closing all peers");
MOZ_RELEASE_ASSERT(state->mInvites.IsEmpty(), "Destroying NodeController before closing all invites");
};
// FIXME: Actually provide some way to create the thing. /* static */ NodeController* NodeController::GetSingleton() {
MOZ_ASSERT(gNodeController); return gNodeController;
}
bool NodeController::GetMessage(const PortRef& aPort,
UniquePtr<IPC::Message>* aMessage) {
UniquePtr<UserMessageEvent> messageEvent; int rv = mNode->GetMessage(aPort, &messageEvent, nullptr); if (rv != mojo::core::ports::OK) { if (rv == mojo::core::ports::ERROR_PORT_PEER_CLOSED) { returnfalse;
}
MOZ_CRASH("GetMessage on port in invalid state");
}
if (messageEvent) {
UniquePtr<IPC::Message> message = messageEvent->TakeMessage<IPC::Message>();
// If our UserMessageEvent has any ports directly attached to it, fetch them // from our node and attach them to the IPC::Message we extracted. // // It's important to only do this if we have nonempty set of ports on the // event, as we may have never serialized our IPC::Message's ports onto the // event if it was routed in-process. if (messageEvent->num_ports() > 0) {
nsTArray<ScopedPort> attachedPorts(messageEvent->num_ports()); for (size_t i = 0; i < messageEvent->num_ports(); ++i) {
attachedPorts.AppendElement(
ScopedPort{GetPort(messageEvent->ports()[i]), this});
}
message->SetAttachedPorts(std::move(attachedPorts));
}
int rv = mNode->SendUserMessage(aPort, std::move(messageEvent)); if (rv == mojo::core::ports::OK) { returntrue;
} if (rv == mojo::core::ports::ERROR_PORT_PEER_CLOSED) {
NODECONTROLLER_LOG(LogLevel::Debug, "Ignoring message to port %s as peer was closed",
ToString(aPort.name()).c_str()); returntrue;
}
NODECONTROLLER_WARNING("Failed to send message to port %s",
ToString(aPort.name()).c_str()); returnfalse;
}
auto NodeController::SerializeEventMessage(
UniquePtr<Event> aEvent, const NodeName* aRelayTarget,
uint32_t aType) -> UniquePtr<IPC::Message> {
UniquePtr<IPC::Message> message; if (aEvent->type() == Event::kUserMessage) {
MOZ_DIAGNOSTIC_ASSERT(
aType == EVENT_MESSAGE_TYPE, "Can only send a UserMessage in an EVENT_MESSAGE_TYPE");
message = static_cast<UserMessageEvent*>(aEvent.get())
->TakeMessage<IPC::Message>();
} else {
message = MakeUnique<IPC::Message>(MSG_ROUTING_CONTROL, aType);
}
// Use an intermediate buffer to serialize to avoid potential issues with the // segmented `IPC::Message` bufferlist. This should be fairly cheap, as the // majority of events are fairly small.
Vector<char, 256, InfallibleAllocPolicy> buffer;
(void)buffer.initLengthUninitialized(length); if (aRelayTarget) {
memcpy(buffer.begin(), aRelayTarget, sizeof(NodeName));
aEvent->Serialize(buffer.begin() + sizeof(NodeName));
} else {
aEvent->Serialize(buffer.begin());
}
#ifdef DEBUG // Debug-assert that we can read the same data back out of the buffer.
MOZ_ASSERT(message->event_footer_size() == length);
Vector<char, 256, InfallibleAllocPolicy> buffer2;
(void)buffer2.initLengthUninitialized(message->event_footer_size());
MOZ_ASSERT(message->ReadFooter(buffer2.begin(), buffer2.length(), /* truncate */ false));
MOZ_ASSERT(!memcmp(buffer2.begin(), buffer.begin(), buffer.length())); #endif
Vector<char, 256, InfallibleAllocPolicy> buffer;
(void)buffer.initLengthUninitialized(aMessage->event_footer_size()); // Truncate the message when reading the footer, so that the extra footer data // is no longer present in the message. This allows future code to eventually // send the same `IPC::Message` to another process. if (!aMessage->ReadFooter(buffer.begin(), buffer.length(), /* truncate */ true)) {
NODECONTROLLER_WARNING("Call to ReadFooter for message '%s' Failed",
aMessage->name()); return nullptr;
}
aMessage->set_event_footer_size(0);
UniquePtr<Event> event; if (aRelayTarget) {
MOZ_ASSERT(aMessage->is_relay()); if (buffer.length() < sizeof(NodeName)) {
NODECONTROLLER_WARNING( "Insufficient space in message footer for message '%s'",
aMessage->name()); return nullptr;
}
memcpy(aRelayTarget, buffer.begin(), sizeof(NodeName));
event = Event::Deserialize(buffer.begin() + sizeof(NodeName),
buffer.length() - sizeof(NodeName));
} else {
event = Event::Deserialize(buffer.begin(), buffer.length());
}
if (!event) {
NODECONTROLLER_WARNING("Call to Event::Deserialize for message '%s' Failed",
aMessage->name()); return nullptr;
}
if (channel) {
channel->Close();
} if (invite.mChannel) {
invite.mChannel->Close();
} if (invite.mToMerge.is_valid()) { // Ignore any possible errors here.
(void)mNode->ClosePort(invite.mToMerge);
} for (auto& port : pendingMerges) { // Ignore any possible errors here.
(void)mNode->ClosePort(port);
}
mNode->LostConnectionToNode(aNodeName);
}
void NodeController::ContactRemotePeer(const NodeName& aNode,
UniquePtr<Event> aEvent) { // On Windows and macOS, messages holding HANDLEs or mach ports must be // relayed via the broker process so it can transfer ownership. bool needsRelay = false; #ifdefined(XP_WIN) || defined(XP_DARWIN) if (aEvent && !IsBroker() && aNode != kBrokerNodeName &&
aEvent->type() == Event::kUserMessage) { auto* userEvent = static_cast<UserMessageEvent*>(aEvent.get());
needsRelay =
userEvent->HasMessage() &&
userEvent->GetMessage<IPC::Message>()->num_relayed_attachments() > 0;
} #endif
UniquePtr<IPC::Message> message; if (aEvent) {
message =
SerializeEventMessage(std::move(aEvent), needsRelay ? &aNode : nullptr);
MOZ_ASSERT(message->is_relay() == needsRelay, "Message relay status set incorrectly");
}
RefPtr<NodeChannel> peer;
RefPtr<NodeChannel> broker; bool needsIntroduction = false; bool needsBroker = needsRelay;
{ auto state = mState.Lock();
// Check if we know this peer. If we don't, we'll need to request an // introduction.
peer = state->mPeers.Get(aNode); if (!peer) { // We don't know the peer, check if we've already requested an // introduction, or if we need to request a new one. auto& queue = state->mPendingMessages.LookupOrInsertWith(aNode, [&]() {
needsIntroduction = true;
needsBroker = true; return Queue<UniquePtr<IPC::Message>, 64>{};
}); // If we aren't relaying, queue up the message to be sent. if (message && !needsRelay) {
queue.Push(std::move(message));
}
}
if (needsBroker && !IsBroker()) {
broker = state->mPeers.Get(kBrokerNodeName);
}
}
if (needsBroker && !broker) {
NODECONTROLLER_WARNING( "Dropping message '%s'; no connection to unknown peer %s",
message ? message->name() : "", ToString(aNode).c_str()); if (needsIntroduction) { // We have no broker and will never be able to be introduced to this node. // Queue a task to clean up any ports connected to it.
XRE_GetAsyncIOEventTarget()->Dispatch(NewRunnableMethod<NodeName>( "NodeController::DropPeer", this, &NodeController::DropPeer, aNode));
} return;
}
if (needsIntroduction) {
NODECONTROLLER_LOG(LogLevel::Info, "Requesting introduction to peer %s",
ToString(aNode).c_str());
broker->RequestIntroduction(aNode);
}
if (message) { if (needsRelay) {
NODECONTROLLER_LOG(LogLevel::Info, "Relaying message '%s' for peer %s due to %" PRIu32 " attachments",
message->name(), ToString(aNode).c_str(),
message->num_relayed_attachments());
MOZ_ASSERT(message->num_relayed_attachments() > 0 && broker);
broker->SendEventMessage(std::move(message));
} elseif (peer) {
peer->SendEventMessage(std::move(message));
}
}
}
if (IsBroker()) {
OnBroadcast(mName, std::move(message));
} elseif (RefPtr<NodeChannel> broker = GetNodeChannel(kBrokerNodeName)) {
broker->Broadcast(std::move(message));
} else {
NODECONTROLLER_WARNING( "Trying to broadcast event, but no connection to broker");
}
}
void NodeController::PortStatusChanged(const PortRef& aPortRef) {
RefPtr<UserData> userData; int rv = mNode->GetUserData(aPortRef, &userData); if (rv != mojo::core::ports::OK) {
NODECONTROLLER_WARNING("GetUserData call for port '%s' failed",
ToString(aPortRef.name()).c_str()); return;
} if (userData) { // All instances of `UserData` attached to ports in this node must be of // type `PortObserver`, so we can call `OnPortStatusChanged` directly on // them. static_cast<PortObserver*>(userData.get())->OnPortStatusChanged();
}
}
NodeName fromNode = aFromNode; #ifdefined(XP_WIN) || defined(XP_DARWIN) if (isRelay) { if (event->type() != Event::kUserMessage) {
NODECONTROLLER_WARNING( "Unexpected relay of non-UserMessage event from peer %s!",
ToString(aFromNode).c_str());
DropPeer(aFromNode); return;
}
// If we're the broker, then we'll need to forward this message on to the // true recipient. To do this, we re-serialize the message, passing along // the original source node, and send it to the final node. if (IsBroker()) {
UniquePtr<IPC::Message> message =
SerializeEventMessage(std::move(event), &aFromNode); if (!message) {
NODECONTROLLER_WARNING( "Relaying EventMessage from peer %s failed to re-serialize!",
ToString(aFromNode).c_str());
DropPeer(aFromNode); return;
}
MOZ_ASSERT(message->is_relay(), "Message stopped being a relay message?");
MOZ_ASSERT(message->num_relayed_attachments() > 0, "Message doesn't have relayed attachments?");
NODECONTROLLER_LOG(
LogLevel::Info, "Relaying message '%s' from peer %s to peer %s (%" PRIu32 " attachments)",
message->name(), ToString(aFromNode).c_str(),
ToString(relayTarget).c_str(), message->num_relayed_attachments());
RefPtr<NodeChannel> peer;
{ auto state = mState.Lock();
peer = state->mPeers.Get(relayTarget);
} if (!peer) {
NODECONTROLLER_WARNING( "Dropping relayed message from %s to unknown peer %s",
ToString(aFromNode).c_str(), ToString(relayTarget).c_str()); return;
}
// Otherwise, we're the final recipient, so we can continue & process the // message as usual. if (aFromNode != kBrokerNodeName) {
NODECONTROLLER_WARNING( "Unexpected relayed EventMessage from non-broker peer %s!",
ToString(aFromNode).c_str());
DropPeer(aFromNode); return;
}
fromNode = relayTarget;
NODECONTROLLER_LOG(LogLevel::Info, "Got relayed message from peer %s",
ToString(fromNode).c_str());
} #endif
// If we're getting a requested port merge from another process, check to make // sure that we're expecting the request, and record that the merge has // arrived so we don't try to close the port on error. if (event->type() == Event::kMergePort) { // Check that the target port for the merge actually exists. auto targetPort = GetPort(event->port_name()); if (!targetPort.is_valid()) {
NODECONTROLLER_WARNING( "Unexpected MergePortEvent from peer %s for unknown port %s",
ToString(fromNode).c_str(), ToString(event->port_name()).c_str());
DropPeer(fromNode); return;
}
// Check if `targetPort` is in our pending merges entry for the given source // node. If this makes the `mPendingMerges` entry empty, remove it. bool expectingMerge = [&] { auto state = mState.Lock(); auto pendingMerges = state->mPendingMerges.Lookup(aFromNode); if (!pendingMerges) { returnfalse;
}
size_t removed = pendingMerges->RemoveElementsBy(
[&](auto& port) { return port.name() == targetPort.name(); }); if (removed != 0 && pendingMerges->IsEmpty()) {
pendingMerges.Remove();
} return removed != 0;
}();
if (!expectingMerge) {
NODECONTROLLER_WARNING( "Unexpected MergePortEvent from peer %s for port %s",
ToString(fromNode).c_str(), ToString(event->port_name()).c_str());
DropPeer(fromNode); return;
}
}
// NOTE: This method may be called off of the IO thread by the // `BroadcastEvent` node callback. if (!IsBroker()) {
NODECONTROLLER_WARNING("Broadcast request received by non-broker node"); return;
}
UniquePtr<Event> event = DeserializeEventMessage(std::move(aMessage)); if (!event) {
NODECONTROLLER_WARNING("Invalid broadcast message from peer"); return;
}
nsTArray<RefPtr<NodeChannel>> peers;
{ auto state = mState.Lock();
peers.SetCapacity(state->mPeers.Count()); for (constauto& peer : state->mPeers.Values()) {
peers.AppendElement(peer);
}
} for (constauto& peer : peers) { // NOTE: This `clone` operation is only supported for a limited number of // message types by the ports API, which provides some extra security by // only allowing those specific types of messages to be broadcasted. // Messages which don't support `CloneForBroadcast` cannot be broadcast, and // the ports library will not attempt to broadcast them. auto clone = event->CloneForBroadcast(); if (!clone) {
NODECONTROLLER_WARNING("Attempt to broadcast unsupported message"); break;
}
if (aFromNode != kBrokerNodeName) {
NODECONTROLLER_WARNING("Introduction received from non-broker node");
DropPeer(aFromNode); return;
}
MOZ_ASSERT(aIntroduction.mMyPid == base::GetCurrentProcId(), "We're the wrong process to receive this?");
if (!aIntroduction.mHandle) {
NODECONTROLLER_WARNING("Could not be introduced to peer %s",
ToString(aIntroduction.mName).c_str());
mNode->LostConnectionToNode(aIntroduction.mName);
auto state = mState.Lock();
state->mPendingMessages.Remove(aIntroduction.mName); return;
}
auto channel =
MakeUnique<IPC::Channel>(std::move(aIntroduction.mHandle),
aIntroduction.mMode, aIntroduction.mOtherPid); auto nodeChannel = MakeRefPtr<NodeChannel>(
aIntroduction.mName, std::move(channel), this, aIntroduction.mOtherPid);
{ auto state = mState.Lock(); bool isNew = false;
state->mPeers.LookupOrInsertWith(aIntroduction.mName, [&]() {
isNew = true; return nodeChannel;
}); if (!isNew) { // We got a duplicate introduction. This can happen during normal // execution if both sides request an introduction at the same time. We // can just ignore the second one, as they'll arrive in the same order in // both processes.
nodeChannel->Close(); return;
}
// Deliver any pending messages, then remove the entry from our table. We do // this while `mState` is still held to ensure that these messages are // all sent before another thread can observe the newly created channel. // As the channel hasn't been `Connect()`-ed yet, this will only queue the // messages up to be sent, so is OK to do with the mutex held. These // messages will be processed to be sent during `Start()` below, which is // performed outside of the lock. if (auto pending = state->mPendingMessages.Lookup(aIntroduction.mName)) { while (!pending->IsEmpty()) {
nodeChannel->SendEventMessage(pending->Pop());
}
pending.Remove();
}
}
// NodeChannel::Start must be called with the lock not held, as it may lead to // callbacks being made into `OnChannelError` or `OnMessageReceived`, which // will attempt to re-acquire our lock.
nodeChannel->Start();
}
RefPtr<NodeChannel> peerB = GetNodeChannel(aName);
IPC::Channel::ChannelHandle handleA, handleB; if (!peerB || !IPC::Channel::CreateRawPipe(&handleA, &handleB)) {
NODECONTROLLER_WARNING( "Rejecting introduction request from '%s' for unknown peer '%s'",
ToString(aFromNode).c_str(), ToString(aName).c_str());
// We don't know this peer, or ran into issues creating the descriptor! Send // an invalid introduction to content to clean up any pending outbound // messages.
NodeChannel::Introduction intro{aName, nullptr, IPC::Channel::MODE_SERVER,
peerA->OtherPid(), base::kInvalidProcessId};
peerA->Introduce(std::move(intro)); return;
}
if (aRealName == mojo::core::ports::kInvalidNodeName ||
aInitialPort == mojo::core::ports::kInvalidPortName) {
NODECONTROLLER_WARNING("Invalid name in AcceptInvite message");
DropPeer(aFromNode); return;
}
bool inserted = false;
Invite invite;
{ auto state = mState.Lock();
// Try to remove the source node from our invites list and insert it into // our peers map under the new name. if (state->mInvites.Remove(aFromNode, &invite)) {
MOZ_ASSERT(invite.mChannel && invite.mToMerge.is_valid());
state->mPeers.LookupOrInsertWith(aRealName, [&]() {
inserted = true; return invite.mChannel;
});
}
} if (!inserted) {
NODECONTROLLER_WARNING("Invalid AcceptInvite message from node %s",
ToString(aFromNode).c_str());
DropPeer(aFromNode); return;
}
// Update the name of the node. This field is only accessed from the IO // thread, so it's safe to update it without a lock held.
invite.mChannel->SetName(aRealName);
// Start the port merge to allow our existing initial port to begin // communicating with the remote port.
PORTS_ALWAYS_OK(mNode->MergePorts(invite.mToMerge, aRealName, aInitialPort));
}
// Create the peer with a randomly generated name, and store it in `mInvites`. // This channel and name will be used for communication with the node until it // sends us its' real name in an `AcceptInvite` message. auto ports = CreatePortPair(); auto inviteName = RandomNodeName(); auto nodeChannel =
MakeRefPtr<NodeChannel>(inviteName, std::move(aChannel), this,
base::kInvalidProcessId, aChildProcessHost);
{ auto state = mState.Lock();
MOZ_DIAGNOSTIC_ASSERT(!state->mPeers.Contains(inviteName), "UUID conflict?");
MOZ_DIAGNOSTIC_ASSERT(!state->mInvites.Contains(inviteName), "UUID conflict?");
state->mInvites.InsertOrUpdate(inviteName,
Invite{nodeChannel, ports.second.Release()});
}
auto nodeName = RandomNodeName();
gNodeController = new NodeController(nodeName);
auto ports = gNodeController->CreatePortPair();
PortRef toMerge = ports.second.Release();
// Mark the port as expecting a pending merge. This is a duplicate of the // information tracked by `mPendingMerges`, and was added by upstream // chromium. // See https://chromium-review.googlesource.com/c/chromium/src/+/3289065
{
mojo::core::ports::SinglePortLocker locker(&toMerge);
locker.port()->pending_merge_peer = true;
}
auto nodeChannel = MakeRefPtr<NodeChannel>(
kBrokerNodeName, std::move(aChannel), gNodeController, aParentPid);
{ auto state = gNodeController->mState.Lock();
MOZ_DIAGNOSTIC_ASSERT(!state->mPeers.Contains(kBrokerNodeName));
state->mPeers.InsertOrUpdate(kBrokerNodeName, nodeChannel);
MOZ_DIAGNOSTIC_ASSERT(!state->mPendingMerges.Contains(kBrokerNodeName));
state->mPendingMerges.LookupOrInsert(kBrokerNodeName)
.AppendElement(toMerge);
}
// Collect all objects from our state which need to be cleaned up.
nsTArray<NodeName> lostConnections;
nsTArray<RefPtr<NodeChannel>> channelsToClose;
nsTArray<PortRef> portsToClose;
{ auto state = nodeController->mState.Lock(); for (constauto& chan : state->mPeers) {
lostConnections.AppendElement(chan.GetKey());
channelsToClose.AppendElement(chan.GetData());
} for (constauto& pending : state->mPendingMessages.Keys()) {
lostConnections.AppendElement(pending);
} for (constauto& invite : state->mInvites.Values()) {
channelsToClose.AppendElement(invite.mChannel);
portsToClose.AppendElement(invite.mToMerge);
} for (constauto& pendingPorts : state->mPendingMerges.Values()) {
portsToClose.AppendElements(pendingPorts);
}
state->mPeers.Clear();
state->mPendingMessages.Clear();
state->mInvites.Clear();
state->mPendingMerges.Clear();
} for (auto& nodeChannel : channelsToClose) {
nodeChannel->Close();
} for (auto& port : portsToClose) {
nodeController->mNode->ClosePort(port);
} for (auto& name : lostConnections) {
nodeController->mNode->LostConnectionToNode(name);
}
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.