/* * net/tipc/group.c: TIPC group messaging code * * Copyright (c) 2017, Ericsson AB * Copyright (c) 2020, Red Hat Inc * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. Neither the names of the copyright holders nor the names of its * contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * Alternatively, this software may be distributed under the terms of the * GNU General Public License ("GPL") version 2 as published by the Free * Software Foundation. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE.
*/
for (n = rb_first(&grp->members); n; n = rb_next(n)) {
m = container_of(n, struct tipc_member, tree_node); if (m->node == node) return m;
} return NULL;
}
m = tipc_group_find_dest(grp, dnode, dport); if (!tipc_group_is_receiver(m)) {
*mbr = NULL; returnfalse;
}
*mbr = m;
if (m->window >= len) returnfalse;
*grp->open = false;
/* If not fully advertised, do it now to prevent mutual blocking */
adv = m->advertised;
state = m->state; if (state == MBR_JOINED && adv == ADV_IDLE) returntrue; if (state == MBR_ACTIVE && adv == ADV_ACTIVE) returntrue; if (state == MBR_PENDING && adv == ADV_IDLE) returntrue;
__skb_queue_head_init(&xmitq);
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq);
tipc_node_distr_xmit(grp->net, &xmitq); returntrue;
}
/* If prev bcast was replicast, reject until all receivers have acked */ if (grp->bc_ackers) {
*grp->open = false; returntrue;
} if (list_empty(&grp->small_win)) returnfalse;
m = list_first_entry(&grp->small_win, struct tipc_member, small_win); if (m->window >= len) returnfalse;
/* tipc_group_sort_msg() - sort msg into queue by bcast sequence number
*/ staticvoid tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq)
{ struct tipc_msg *_hdr, *hdr = buf_msg(skb);
u16 bc_seqno = msg_grp_bc_seqno(hdr); struct sk_buff *_skb, *tmp; int mtyp = msg_type(hdr);
/* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */ if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) {
skb_queue_walk_safe(defq, _skb, tmp) {
_hdr = buf_msg(_skb); if (!less(bc_seqno, msg_grp_bc_seqno(_hdr))) continue;
__skb_queue_before(defq, _skb, skb); return;
} /* Bcast was not bypassed, - add to tail */
} /* Unicasts are never bypassed, - always add to tail */
__skb_queue_tail(defq, skb);
}
if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) break;
/* Decide what to do with message */ switch (mtyp) { case TIPC_GRP_MCAST_MSG: if (msg_nameinst(hdr) != grp->instance) {
update = true;
deliver = false;
}
fallthrough; case TIPC_GRP_BCAST_MSG:
m->bc_rcv_nxt++;
ack = msg_grp_bc_ack_req(hdr); break; case TIPC_GRP_UCAST_MSG: break; case TIPC_GRP_MEMBER_EVT: if (m->state == MBR_LEAVING)
leave = true; if (!grp->events)
deliver = false; break; default: break;
}
void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
u32 port, struct sk_buff_head *xmitq)
{ struct list_head *active = &grp->active; int max_active = grp->max_active; int reclaim_limit = max_active * 3 / 4; int active_cnt = grp->active_cnt; struct tipc_member *m, *rm, *pm;
m = tipc_group_find_member(grp, node, port); if (!m) return;
m->advertised -= blks;
switch (m->state) { case MBR_JOINED: /* First, decide if member can go active */ if (active_cnt <= max_active) {
m->state = MBR_ACTIVE;
list_add_tail(&m->list, active);
grp->active_cnt++;
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
} else {
m->state = MBR_PENDING;
list_add_tail(&m->list, &grp->pending);
}
if (active_cnt < reclaim_limit) break;
/* Reclaim from oldest active member, if possible */ if (!list_empty(active)) {
rm = list_first_entry(active, struct tipc_member, list);
rm->state = MBR_RECLAIMING;
list_del_init(&rm->list);
tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq); break;
} /* Nobody to reclaim from; - revert oldest pending to JOINED */
pm = list_first_entry(&grp->pending, struct tipc_member, list);
list_del_init(&pm->list);
pm->state = MBR_JOINED;
tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); break; case MBR_ACTIVE: if (!list_is_last(&m->list, &grp->active))
list_move_tail(&m->list, &grp->active); if (m->advertised > (ADV_ACTIVE * 3 / 4)) break;
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); break; case MBR_REMITTED: if (m->advertised > ADV_IDLE) break;
m->state = MBR_JOINED;
grp->active_cnt--; if (m->advertised < ADV_IDLE) {
pr_warn_ratelimited("Rcv unexpected msg after REMIT\n");
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
}
if (list_empty(&grp->pending)) return;
/* Set oldest pending member to active and advertise */
pm = list_first_entry(&grp->pending, struct tipc_member, list);
pm->state = MBR_ACTIVE;
list_move_tail(&pm->list, &grp->active);
grp->active_cnt++;
tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); break; case MBR_RECLAIMING: case MBR_JOINING: case MBR_LEAVING: default: break;
}
}
if (grp->scope == TIPC_NODE_SCOPE && node != tipc_own_addr(grp->net)) return;
m = tipc_group_find_member(grp, node, port);
switch (msg_type(hdr)) { case GRP_JOIN_MSG: if (!m)
m = tipc_group_create_member(grp, node, port,
0, MBR_JOINING); if (!m) return;
m->bc_syncpt = msg_grp_bc_syncpt(hdr);
m->bc_rcv_nxt = m->bc_syncpt;
m->window += msg_adv_win(hdr);
/* Wait until PUBLISH event is received if necessary */ if (m->state != MBR_PUBLISHED) return;
/* Member can be taken into service */
m->state = MBR_JOINED;
tipc_group_open(m, usr_wakeup);
tipc_group_update_member(m, 0);
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
tipc_group_create_event(grp, m, TIPC_PUBLISHED,
m->bc_syncpt, inputq); return; case GRP_LEAVE_MSG: if (!m) return;
m->bc_syncpt = msg_grp_bc_syncpt(hdr);
list_del_init(&m->list);
tipc_group_open(m, usr_wakeup);
tipc_group_decr_active(grp, m);
m->state = MBR_LEAVING;
tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
m->bc_syncpt, inputq); return; case GRP_ADV_MSG: if (!m) return;
m->window += msg_adv_win(hdr);
tipc_group_open(m, usr_wakeup); return; case GRP_ACK_MSG: if (!m) return;
m->bc_acked = msg_grp_bc_acked(hdr); if (--grp->bc_ackers) return;
list_del_init(&m->small_win);
*m->group->open = true;
*usr_wakeup = true;
tipc_group_update_member(m, 0); return; case GRP_RECLAIM_MSG: if (!m) return;
tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq);
m->window = ADV_IDLE;
tipc_group_open(m, usr_wakeup); return; case GRP_REMIT_MSG: if (!m || m->state != MBR_RECLAIMING) return;
remitted = msg_grp_remitted(hdr);
/* Messages preceding the REMIT still in receive queue */ if (m->advertised > remitted) {
m->state = MBR_REMITTED;
in_flight = m->advertised - remitted;
m->advertised = ADV_IDLE + in_flight; return;
} /* This should never happen */ if (m->advertised < remitted)
pr_warn_ratelimited("Unexpected REMIT msg\n");
/* All messages preceding the REMIT have been read */
m->state = MBR_JOINED;
grp->active_cnt--;
m->advertised = ADV_IDLE;
/* Set oldest pending member to active and advertise */ if (list_empty(&grp->pending)) return;
pm = list_first_entry(&grp->pending, struct tipc_member, list);
pm->state = MBR_ACTIVE;
list_move_tail(&pm->list, &grp->active);
grp->active_cnt++; if (pm->advertised <= (ADV_ACTIVE * 3 / 4))
tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); return; default:
pr_warn("Received unknown GROUP_PROTO message\n");
}
}
/* tipc_group_member_evt() - receive and handle a member up/down event
*/ void tipc_group_member_evt(struct tipc_group *grp, bool *usr_wakeup, int *sk_rcvbuf, struct tipc_msg *hdr, struct sk_buff_head *inputq, struct sk_buff_head *xmitq)
{ struct tipc_event *evt = (void *)msg_data(hdr);
u32 instance = evt->found_lower;
u32 node = evt->port.node;
u32 port = evt->port.ref; int event = evt->event; struct tipc_member *m; struct net *net;
u32 self;
if (!grp) return;
net = grp->net;
self = tipc_own_addr(net); if (!grp->loopback && node == self && port == grp->portid) return;
m = tipc_group_find_member(grp, node, port);
switch (event) { case TIPC_PUBLISHED: /* Send and wait for arrival of JOIN message if necessary */ if (!m) {
m = tipc_group_create_member(grp, node, port, instance,
MBR_PUBLISHED); if (!m) break;
tipc_group_update_member(m, 0);
tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); break;
}
if (m->state != MBR_JOINING) break;
/* Member can be taken into service */
m->instance = instance;
m->state = MBR_JOINED;
tipc_group_open(m, usr_wakeup);
tipc_group_update_member(m, 0);
tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
tipc_group_create_event(grp, m, TIPC_PUBLISHED,
m->bc_syncpt, inputq); break; case TIPC_WITHDRAWN: if (!m) break;
/* Only send event if no LEAVE message can be expected */ if (!tipc_node_is_up(net, node))
tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
m->bc_rcv_nxt, inputq); break; default: break;
}
*sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.