// SPDX-License-Identifier: GPL-2.0-or-later /* Client connection-specific management code. * * Copyright (C) 2016, 2020 Red Hat, Inc. All Rights Reserved. * Written by David Howells (dhowells@redhat.com) * * Client connections need to be cached for a little while after they've made a * call so as to handle retransmitted DATA packets in case the server didn't * receive the final ACK or terminating ABORT we sent it. * * There are flags of relevance to the cache: * * (2) DONT_REUSE - The connection should be discarded as soon as possible and * should not be reused. This is set when an exclusive connection is used * or a call ID counter overflows. * * The caching state may only be changed if the cache lock is held. * * There are two idle client connection expiry durations. If the total number * of connections is below the reap threshold, we use the normal duration; if * it's above, we use the fast duration.
*/
if (bundle) {
id = bundle->debug_id;
dead = __refcount_dec_and_test(&bundle->ref, &r);
trace_rxrpc_bundle(id, r - 1, why); if (dead)
rxrpc_free_bundle(bundle);
}
}
/* * Get rid of outstanding client connection preallocations when a local * endpoint is destroyed.
*/ void rxrpc_purge_client_connections(struct rxrpc_local *local)
{
rxrpc_destroy_client_conn_ids(local);
}
/* * Determine if a connection may be reused.
*/ staticbool rxrpc_may_reuse_conn(struct rxrpc_connection *conn)
{ struct rxrpc_net *rxnet; int id_cursor, id, distance, limit;
if (!conn) goto dont_reuse;
rxnet = conn->rxnet; if (test_bit(RXRPC_CONN_DONT_REUSE, &conn->flags)) goto dont_reuse;
/* The IDR tree gets very expensive on memory if the connection IDs are * widely scattered throughout the number space, so we shall want to * kill off connections that, say, have an ID more than about four * times the maximum number of client conns away from the current * allocation point to try and keep the IDs concentrated.
*/
id_cursor = idr_get_cursor(&conn->local->conn_ids);
id = conn->proto.cid >> RXRPC_CIDSHIFT;
distance = id - id_cursor; if (distance < 0)
distance = -distance;
limit = umax(atomic_read(&rxnet->nr_conns) * 4, 1024); if (distance > limit) goto mark_dont_reuse;
/* First, see if the bundle is already there. */
_debug("search 1");
spin_lock(&local->client_bundles_lock);
p = local->client_bundles.rb_node; while (p) {
bundle = rb_entry(p, struct rxrpc_bundle, local_node);
rxrpc_activate_bundle(bundle);
conn->bundle_shift = shift;
bundle->conns[slot] = conn;
bundle->conn_ids[slot] = conn->debug_id; for (i = 0; i < RXRPC_MAXCALLS; i++)
set_bit(shift + i, &bundle->avail_chans); returntrue;
}
/* * Add a connection to a bundle if there are no usable connections or we have * connections waiting for extra capacity.
*/ staticbool rxrpc_bundle_has_space(struct rxrpc_bundle *bundle)
{ int slot = -1, i, usable;
_enter("");
bundle->alloc_error = 0;
/* See if there are any usable connections. */
usable = 0; for (i = 0; i < ARRAY_SIZE(bundle->conns); i++) { if (rxrpc_may_reuse_conn(bundle->conns[i]))
usable++; elseif (slot == -1)
slot = i;
}
if (!usable && bundle->upgrade)
bundle->try_upgrade = true;
if (!usable) goto alloc_conn;
if (!bundle->avail_chans &&
!bundle->try_upgrade &&
usable < ARRAY_SIZE(bundle->conns)) goto alloc_conn;
/* * Assign a channel to the call at the front of the queue and wake the call up. * We don't increment the callNumber counter until this number has been exposed * to the world.
*/ staticvoid rxrpc_activate_one_channel(struct rxrpc_connection *conn, unsignedint channel)
{ struct rxrpc_channel *chan = &conn->channels[channel]; struct rxrpc_bundle *bundle = conn->bundle; struct rxrpc_call *call = list_entry(bundle->waiting_calls.next, struct rxrpc_call, wait_link);
u32 call_id = chan->call_counter + 1;
/* Cancel the final ACK on the previous call if it hasn't been sent yet * as the DATA packet will implicitly ACK it.
*/
clear_bit(RXRPC_CONN_FINAL_ACK_0 + channel, &conn->flags);
clear_bit(conn->bundle_shift + channel, &bundle->avail_chans);
/* * Remove a connection from the idle list if it's on it.
*/ staticvoid rxrpc_unidle_conn(struct rxrpc_connection *conn)
{ if (!list_empty(&conn->cache_link)) {
list_del_init(&conn->cache_link);
rxrpc_put_connection(conn, rxrpc_conn_put_unidle);
}
}
if (rxrpc_bundle_has_space(bundle))
rxrpc_activate_channels(bundle);
}
}
/* * Note that a call, and thus a connection, is about to be exposed to the * world.
*/ void rxrpc_expose_client_call(struct rxrpc_call *call)
{ unsignedint channel = call->cid & RXRPC_CHANNELMASK; struct rxrpc_connection *conn = call->conn; struct rxrpc_channel *chan = &conn->channels[channel];
if (!test_and_set_bit(RXRPC_CALL_EXPOSED, &call->flags)) { /* Mark the call ID as being used. If the callNumber counter * exceeds ~2 billion, we kill the connection after its * outstanding calls have finished so that the counter doesn't * wrap.
*/
chan->call_counter++; if (chan->call_counter >= INT_MAX)
set_bit(RXRPC_CONN_DONT_REUSE, &conn->flags);
trace_rxrpc_client(conn, channel, rxrpc_client_exposed);
/* * Set the reap timer.
*/ staticvoid rxrpc_set_client_reap_timer(struct rxrpc_local *local)
{ if (!local->kill_all_client_conns) { unsignedlong now = jiffies; unsignedlong reap_at = now + rxrpc_conn_idle_client_expiry;
if (local->rxnet->live)
timer_reduce(&local->client_conn_reap_timer, reap_at);
}
}
/* Calls that have never actually been assigned a channel can simply be * discarded.
*/
conn = call->conn; if (!conn) {
_debug("call is waiting");
ASSERTCMP(call->call_id, ==, 0);
ASSERT(!test_bit(RXRPC_CALL_EXPOSED, &call->flags)); /* May still be on ->new_client_calls. */
spin_lock_irq(&local->client_call_lock);
list_del_init(&call->wait_link);
spin_unlock_irq(&local->client_call_lock); return;
}
/* If a client call was exposed to the world, we save the result for * retransmission. * * We use a barrier here so that the call number and abort code can be * read without needing to take a lock. * * TODO: Make the incoming packet handler check this and handle * terminal retransmission without requiring access to the call.
*/ if (test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
_debug("exposed %u,%u", call->call_id, call->abort_code);
__rxrpc_disconnect_call(conn, call);
if (test_and_clear_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags)) {
trace_rxrpc_client(conn, channel, rxrpc_client_to_active);
bundle->try_upgrade = false; if (may_reuse)
rxrpc_activate_channels(bundle);
}
}
/* See if we can pass the channel directly to another call. */ if (may_reuse && !list_empty(&bundle->waiting_calls)) {
trace_rxrpc_client(conn, channel, rxrpc_client_chan_pass);
rxrpc_activate_one_channel(conn, channel); return;
}
/* Schedule the final ACK to be transmitted in a short while so that it * can be skipped if we find a follow-on call. The first DATA packet * of the follow on call will implicitly ACK this call.
*/ if (call->completion == RXRPC_CALL_SUCCEEDED &&
test_bit(RXRPC_CALL_EXPOSED, &call->flags)) { unsignedlong final_ack_at = jiffies + 2;
/* If no channels remain active, then put the connection on the idle * list for a short while. Give it a ref to stop it going away if it * becomes unbundled.
*/ if (!conn->act_chans) {
trace_rxrpc_client(conn, channel, rxrpc_client_to_idle);
conn->idle_timestamp = jiffies;
/* * Remove a connection from a bundle.
*/ staticvoid rxrpc_unbundle_conn(struct rxrpc_connection *conn)
{ struct rxrpc_bundle *bundle = conn->bundle; unsignedint bindex; int i;
_enter("C=%x", conn->debug_id);
if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK)
rxrpc_process_delayed_final_acks(conn, true);
bindex = conn->bundle_shift / RXRPC_MAXCALLS; if (bundle->conns[bindex] == conn) {
_debug("clear slot %u", bindex);
bundle->conns[bindex] = NULL;
bundle->conn_ids[bindex] = 0; for (i = 0; i < RXRPC_MAXCALLS; i++)
clear_bit(conn->bundle_shift + i, &bundle->avail_chans);
rxrpc_put_client_connection_id(bundle->local, conn);
rxrpc_deactivate_bundle(bundle);
rxrpc_put_connection(conn, rxrpc_conn_put_unbundle);
}
}
/* * Drop the active count on a bundle.
*/ void rxrpc_deactivate_bundle(struct rxrpc_bundle *bundle)
{ struct rxrpc_local *local; bool need_put = false;
if (!bundle) return;
local = bundle->local; if (atomic_dec_and_lock(&bundle->active, &local->client_bundles_lock)) { if (!bundle->exclusive) {
_debug("erase bundle");
rb_erase(&bundle->local_node, &local->client_bundles);
need_put = true;
}
spin_unlock(&local->client_bundles_lock); if (need_put)
rxrpc_put_bundle(bundle, rxrpc_bundle_put_discard);
}
}
/* * Clean up a dead client connection.
*/ void rxrpc_kill_client_conn(struct rxrpc_connection *conn)
{ struct rxrpc_local *local = conn->local; struct rxrpc_net *rxnet = local->rxnet;
/* * Discard expired client connections from the idle list. Each conn in the * idle list has been exposed and holds an extra ref because of that. * * This may be called from conn setup or from a work item so cannot be * considered non-reentrant.
*/ void rxrpc_discard_expired_client_conns(struct rxrpc_local *local)
{ struct rxrpc_connection *conn; unsignedlong expiry, conn_expires_at, now; unsignedint nr_conns;
_enter("");
/* We keep an estimate of what the number of conns ought to be after * we've discarded some so that we don't overdo the discarding.
*/
nr_conns = atomic_read(&local->rxnet->nr_client_conns);
next:
conn = list_first_entry_or_null(&local->idle_client_conns, struct rxrpc_connection, cache_link); if (!conn) return;
if (!local->kill_all_client_conns) { /* If the number of connections is over the reap limit, we * expedite discard by reducing the expiry timeout. We must, * however, have at least a short grace period to be able to do * final-ACK or ABORT retransmission.
*/
expiry = rxrpc_conn_idle_client_expiry; if (nr_conns > rxrpc_reap_client_connections)
expiry = rxrpc_conn_idle_client_fast_expiry; if (conn->local->service_closed)
expiry = rxrpc_closed_conn_expiry * HZ;
conn_expires_at = conn->idle_timestamp + expiry;
now = jiffies; if (time_after(conn_expires_at, now)) goto not_yet_expired;
}
rxrpc_unbundle_conn(conn); /* Drop the ->cache_link ref */
rxrpc_put_connection(conn, rxrpc_conn_put_discard_idle);
nr_conns--; goto next;
not_yet_expired: /* The connection at the front of the queue hasn't yet expired, so * schedule the work item for that point if we discarded something. * * We don't worry if the work item is already scheduled - it can look * after rescheduling itself at a later time. We could cancel it, but * then things get messier.
*/
_debug("not yet"); if (!local->kill_all_client_conns)
timer_reduce(&local->client_conn_reap_timer, conn_expires_at);
_leave("");
}
/* * Clean up the client connections on a local endpoint.
*/ void rxrpc_clean_up_local_conns(struct rxrpc_local *local)
{ struct rxrpc_connection *conn;
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.