if (atomic_dec_and_test(&conn->cdc_pend_tx_wr)) { /* If user owns the sock_lock, mark the connection need sending. * User context will later try to send when it release sock_lock * in smc_release_cb()
*/ if (sock_owned_by_user(&smc->sk))
conn->tx_in_release_sock = true; else
smc_tx_pending(conn);
if (unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq)))
wake_up(&conn->cdc_pend_tx_wq);
}
WARN_ON(atomic_read(&conn->cdc_pend_tx_wr) < 0);
staticinlinevoid smc_cdc_add_pending_send(struct smc_connection *conn, struct smc_cdc_tx_pend *pend)
{
BUILD_BUG_ON_MSG( sizeof(struct smc_cdc_msg) > SMC_WR_BUF_SIZE, "must increase SMC_WR_BUF_SIZE to at least sizeof(struct smc_cdc_msg)");
BUILD_BUG_ON_MSG(
offsetofend(struct smc_cdc_msg, reserved) > SMC_WR_TX_SIZE, "must adapt SMC_WR_TX_SIZE to sizeof(struct smc_cdc_msg); if not all smc_wr upper layer protocols use the same message size any more, must start to set link->wr_tx_sges[i].length on each individual smc_wr_tx_send()");
BUILD_BUG_ON_MSG( sizeof(struct smc_cdc_tx_pend) > SMC_WR_TX_PEND_PRIV_SIZE, "must increase SMC_WR_TX_PEND_PRIV_SIZE to at least sizeof(struct smc_cdc_tx_pend)");
pend->conn = conn;
pend->cursor = conn->tx_curs_sent;
pend->p_cursor = conn->local_tx_ctrl.prod;
pend->ctrl_seq = conn->tx_cdc_seq;
}
int smc_cdc_msg_send(struct smc_connection *conn, struct smc_wr_buf *wr_buf, struct smc_cdc_tx_pend *pend)
{ struct smc_link *link = conn->lnk; union smc_host_cursor cfed; int rc;
/* send a validation msg indicating the move of a conn to an other QP link */ int smcr_cdc_msg_send_validation(struct smc_connection *conn, struct smc_cdc_tx_pend *pend, struct smc_wr_buf *wr_buf)
{ struct smc_host_cdc_msg *local = &conn->local_tx_ctrl; struct smc_link *link = conn->lnk; struct smc_cdc_msg *peer; int rc;
/* Send a SMC-D CDC header. * This increments the free space available in our send buffer. * Also update the confirmed receive buffer with what was sent to the peer.
*/ int smcd_cdc_msg_send(struct smc_connection *conn)
{ struct smc_sock *smc = container_of(conn, struct smc_sock, conn); union smc_host_cursor curs; struct smcd_cdc_msg cdc; int rc, diff;
if (smc_ism_support_dmb_nocopy(conn->lgr->smcd)) /* if local sndbuf shares the same memory region with * peer DMB, then don't update the tx_curs_fin * and sndbuf_space until peer has consumed the data.
*/ return 0;
/* Calculate transmitted data and increment free send buffer space */
diff = smc_curs_diff(conn->sndbuf_desc->len, &conn->tx_curs_fin,
&conn->tx_curs_sent); /* increased by confirmed number of bytes */
smp_mb__before_atomic();
atomic_add(diff, &conn->sndbuf_space); /* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
smp_mb__after_atomic();
smc_curs_copy(&conn->tx_curs_fin, &conn->tx_curs_sent, conn);
/* new data included urgent business */
smc_curs_copy(&conn->urg_curs, &conn->local_rx_ctrl.prod, conn);
conn->urg_state = SMC_URG_VALID; if (!sock_flag(&smc->sk, SOCK_URGINLINE)) /* we'll skip the urgent byte, so don't account for it */
(*diff_prod)--;
base = (char *)conn->rmb_desc->cpu_addr + conn->rx_off; if (conn->urg_curs.count)
conn->urg_rx_byte = *(base + conn->urg_curs.count - 1); else
conn->urg_rx_byte = *(base + conn->rmb_desc->len - 1);
sk_send_sigurg(&smc->sk);
}
diff_cons = smc_curs_diff(conn->peer_rmbe_size, &cons_old,
&conn->local_rx_ctrl.cons); if (diff_cons) { /* peer_rmbe_space is decreased during data transfer with RDMA * write
*/
smp_mb__before_atomic();
atomic_add(diff_cons, &conn->peer_rmbe_space); /* guarantee 0 <= peer_rmbe_space <= peer_rmbe_size */
smp_mb__after_atomic();
/* if local sndbuf shares the same memory region with * peer RMB, then update tx_curs_fin and sndbuf_space * here since peer has already consumed the data.
*/ if (conn->lgr->is_smcd &&
smc_ism_support_dmb_nocopy(conn->lgr->smcd)) { /* Calculate consumed data and * increment free send buffer space.
*/
diff_tx = smc_curs_diff(conn->sndbuf_desc->len,
&conn->tx_curs_fin,
&conn->local_rx_ctrl.cons); /* increase local sndbuf space and fin_curs */
smp_mb__before_atomic();
atomic_add(diff_tx, &conn->sndbuf_space); /* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
smp_mb__after_atomic();
smc_curs_copy(&conn->tx_curs_fin,
&conn->local_rx_ctrl.cons, conn);
smc_tx_sndbuf_nonfull(smc);
}
}
diff_prod = smc_curs_diff(conn->rmb_desc->len, &prod_old,
&conn->local_rx_ctrl.prod); if (diff_prod) { if (conn->local_rx_ctrl.prod_flags.urg_data_present)
smc_cdc_handle_urg_data_arrival(smc, &diff_prod); /* bytes_to_rcv is decreased in smc_recvmsg */
smp_mb__before_atomic();
atomic_add(diff_prod, &conn->bytes_to_rcv); /* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */
smp_mb__after_atomic();
smc->sk.sk_data_ready(&smc->sk);
} else { if (conn->local_rx_ctrl.prod_flags.write_blocked)
smc->sk.sk_data_ready(&smc->sk); if (conn->local_rx_ctrl.prod_flags.urg_data_pending)
conn->urg_state = SMC_URG_NOTYET;
}
/* trigger sndbuf consumer: RDMA write into peer RMBE and CDC */ if ((diff_cons && smc_tx_prepared_sends(conn)) ||
conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
conn->local_rx_ctrl.prod_flags.urg_data_pending) { if (!sock_owned_by_user(&smc->sk))
smc_tx_pending(conn); else
conn->tx_in_release_sock = true;
}
if (diff_cons && conn->urg_tx_pend &&
atomic_read(&conn->peer_rmbe_space) == conn->peer_rmbe_size) { /* urg data confirmed by peer, indicate we're ready for more */
conn->urg_tx_pend = false;
smc->sk.sk_write_space(&smc->sk);
}
if (conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) {
smc->sk.sk_err = ECONNRESET;
conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
} if (smc_cdc_rxed_any_close_or_senddone(conn)) {
smc->sk.sk_shutdown |= RCV_SHUTDOWN; if (smc->clcsock && smc->clcsock->sk)
smc->clcsock->sk->sk_shutdown |= RCV_SHUTDOWN;
smc_sock_set_flag(&smc->sk, SOCK_DONE);
sock_hold(&smc->sk); /* sock_put in close_work */ if (!queue_work(smc_close_wq, &conn->close_work))
sock_put(&smc->sk);
}
}
/* called under tasklet context */ staticvoid smc_cdc_msg_recv(struct smc_sock *smc, struct smc_cdc_msg *cdc)
{
sock_hold(&smc->sk);
bh_lock_sock(&smc->sk);
smc_cdc_msg_recv_action(smc, cdc);
bh_unlock_sock(&smc->sk);
sock_put(&smc->sk); /* no free sk in softirq-context */
}
/* Schedule a tasklet for this connection. Triggered from the ISM device IRQ * handler to indicate update in the DMBE. * * Context: * - tasklet context
*/ staticvoid smcd_cdc_rx_tsklet(struct tasklet_struct *t)
{ struct smc_connection *conn = from_tasklet(conn, t, rx_tsklet); struct smcd_cdc_msg *data_cdc; struct smcd_cdc_msg cdc; struct smc_sock *smc;
if (cdc->prod_flags.failover_validation) {
smc_cdc_msg_validate(smc, cdc, link); return;
} if (smc_cdc_before(ntohs(cdc->seqno),
conn->local_rx_ctrl.seqno)) /* received seqno is old */ return;
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.