#include <linux/drbd_limits.h> #include"drbd_int.h" #include"drbd_protocol.h" #include"drbd_req.h"/* only for _req_mod in tl_release and tl_clear */ #include"drbd_vli.h" #include"drbd_debugfs.h"
#include <linux/moduleparam.h> /* thanks to these macros, if compiled into the kernel (not-module),
* these become boot parameters (e.g., drbd.minor_count) */
#ifdef CONFIG_DRBD_FAULT_INJECTION int drbd_enable_faults; int drbd_fault_rate; staticint drbd_fault_count; staticint drbd_fault_devs; /* bitmap of enabled faults */
module_param_named(enable_faults, drbd_enable_faults, int, 0664); /* fault rate % value - applies to all enabled faults */
module_param_named(fault_rate, drbd_fault_rate, int, 0664); /* count of faults inserted */
module_param_named(fault_count, drbd_fault_count, int, 0664); /* bitmap of devices to insert faults on */
module_param_named(fault_devs, drbd_fault_devs, int, 0644); #endif
#ifdef __CHECKER__ /* When checking with sparse, and this is an inline function, sparse will give tons of false positives. When this is a real functions sparse works.
*/ int _get_ldev_if_state(struct drbd_device *device, enum drbd_disk_state mins)
{ int io_allowed;
atomic_inc(&device->local_cnt);
io_allowed = (device->state.disk >= mins); if (!io_allowed) { if (atomic_dec_and_test(&device->local_cnt))
wake_up(&device->misc_wait);
} return io_allowed;
}
#endif
/** * tl_release() - mark as BARRIER_ACKED all requests in the corresponding transfer log epoch * @connection: DRBD connection. * @barrier_nr: Expected identifier of the DRBD write barrier packet. * @set_size: Expected number of requests before that barrier. * * In case the passed barrier_nr or set_size does not match the oldest * epoch of not yet barrier-acked requests, this function will cause a * termination of the connection.
*/ void tl_release(struct drbd_connection *connection, unsignedint barrier_nr, unsignedint set_size)
{ struct drbd_request *r; struct drbd_request *req = NULL, *tmp = NULL; int expect_epoch = 0; int expect_size = 0;
spin_lock_irq(&connection->resource->req_lock);
/* find oldest not yet barrier-acked write request,
* count writes in its epoch. */
list_for_each_entry(r, &connection->transfer_log, tl_requests) { constunsigned s = r->rq_state; if (!req) { if (!(s & RQ_WRITE)) continue; if (!(s & RQ_NET_MASK)) continue; if (s & RQ_NET_DONE) continue;
req = r;
expect_epoch = req->epoch;
expect_size ++;
} else { if (r->epoch != expect_epoch) break; if (!(s & RQ_WRITE)) continue; /* if (s & RQ_DONE): not expected */ /* if (!(s & RQ_NET_MASK)): not expected */
expect_size++;
}
}
/* first some paranoia code */ if (req == NULL) {
drbd_err(connection, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
barrier_nr); goto bail;
} if (expect_epoch != barrier_nr) {
drbd_err(connection, "BAD! BarrierAck #%u received, expected #%u!\n",
barrier_nr, expect_epoch); goto bail;
}
if (expect_size != set_size) {
drbd_err(connection, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
barrier_nr, set_size, expect_size); goto bail;
}
/* Clean up list of requests processed during current epoch. */ /* this extra list walk restart is paranoia, * to catch requests being barrier-acked "unexpectedly".
* It usually should find the same req again, or some READ preceding it. */
list_for_each_entry(req, &connection->transfer_log, tl_requests) if (req->epoch == expect_epoch) {
tmp = req; break;
}
req = list_prepare_entry(tmp, &connection->transfer_log, tl_requests);
list_for_each_entry_safe_from(req, r, &connection->transfer_log, tl_requests) { struct drbd_peer_device *peer_device; if (req->epoch != expect_epoch) break;
peer_device = conn_peer_device(connection, req->device->vnr);
_req_mod(req, BARRIER_ACKED, peer_device);
}
spin_unlock_irq(&connection->resource->req_lock);
/** * _tl_restart() - Walks the transfer log, and applies an action to all requests * @connection: DRBD connection to operate on. * @what: The action/event to perform with all request objects * * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO, * RESTART_FROZEN_DISK_IO.
*/ /* must hold resource->req_lock */ void _tl_restart(struct drbd_connection *connection, enum drbd_req_event what)
{ struct drbd_peer_device *peer_device; struct drbd_request *req, *r;
/** * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL * @connection: DRBD connection. * * This is called after the connection to the peer was lost. The storage covered * by the requests on the transfer gets marked as our of sync. Called from the * receiver thread and the worker thread.
*/ void tl_clear(struct drbd_connection *connection)
{
tl_restart(connection, CONNECTION_LOST_WHILE_PENDING);
}
/** * tl_abort_disk_io() - Abort disk I/O for all requests for a certain device in the TL * @device: DRBD device.
*/ void tl_abort_disk_io(struct drbd_device *device)
{ struct drbd_connection *connection = first_peer_device(device)->connection; struct drbd_request *req, *r;
/* if the receiver has been "EXITING", the last thing it did * was set the conn state to "StandAlone", * if now a re-connect request comes in, conn state goes C_UNCONNECTED, * and receiver thread will be "started". * drbd_thread_start needs to set "RESTARTING" in that case. * t_state check and assignment needs to be within the same spinlock, * so either thread_start sees EXITING, and can remap to RESTARTING, * or thread_start see NONE, and can proceed as normal.
*/
/* Get ref on module for thread - this is released when thread exits */ if (!try_module_get(THIS_MODULE)) {
drbd_err(resource, "Failed to get module reference in drbd_thread_start\n");
spin_unlock_irqrestore(&thi->t_lock, flags); returnfalse;
}
kref_get(&resource->kref); if (thi->connection)
kref_get(&thi->connection->kref);
init_completion(&thi->stop);
thi->reset_cpu_mask = 1;
thi->t_state = RUNNING;
spin_unlock_irqrestore(&thi->t_lock, flags);
flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
nt = kthread_create(drbd_thread_setup, (void *) thi, "drbd_%c_%s", thi->name[0], thi->resource->name);
if (IS_ERR(nt)) {
drbd_err(resource, "Couldn't start thread\n");
#ifdef CONFIG_SMP /* * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs * * Forces all threads of a resource onto the same CPU. This is beneficial for * DRBD's performance. May be overwritten by user's configuration.
*/ staticvoid drbd_calc_cpu_mask(cpumask_var_t *cpu_mask)
{ unsignedint *resources_per_cpu, min_index = ~0;
resources_per_cpu = kcalloc(nr_cpu_ids, sizeof(*resources_per_cpu),
GFP_KERNEL); if (resources_per_cpu) { struct drbd_resource *resource; unsignedint cpu, min = ~0;
/** * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread * @thi: drbd_thread object * * call in the "main loop" of _all_ threads, no need for any mutex, current won't die * prematurely.
*/ void drbd_thread_current_set_cpu(struct drbd_thread *thi)
{ struct drbd_resource *resource = thi->resource; struct task_struct *p = current;
/* * drbd_header_size - size of a packet header * * The header size is a multiple of 8, so any payload following the header is * word aligned on 64-bit architectures. (The bitmap send and receive code * relies on this.)
*/ unsignedint drbd_header_size(struct drbd_connection *connection)
{ if (connection->agreed_pro_version >= 100) {
BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8)); returnsizeof(struct p_header100);
} else {
BUILD_BUG_ON(sizeof(struct p_header80) != sizeof(struct p_header95));
BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8)); returnsizeof(struct p_header80);
}
}
staticint __send_command(struct drbd_connection *connection, int vnr, struct drbd_socket *sock, enum drbd_packet cmd, unsignedint header_size, void *data, unsignedint size)
{ int msg_flags; int err;
/* * Called with @data == NULL and the size of the data blocks in @size * for commands that send data blocks. For those commands, omit the * MSG_MORE flag: this will increase the likelihood that data blocks * which are page aligned on the sender will end up page aligned on the * receiver.
*/
msg_flags = data ? MSG_MORE : 0;
header_size += prepare_header(connection, vnr, sock->sbuf, cmd,
header_size + size);
err = drbd_send_all(connection, sock->socket, sock->sbuf, header_size,
msg_flags); if (data && !err)
err = drbd_send_all(connection, sock->socket, data, size, 0); /* DRBD protocol "pings" are latency critical.
* This is supposed to trigger tcp_push_pending_frames() */ if (!err && (cmd == P_PING || cmd == P_PING_ACK))
tcp_sock_set_nodelay(sock->socket->sk);
if (nc->tentative && connection->agreed_pro_version < 92) {
rcu_read_unlock();
drbd_err(connection, "--dry-run is not supported by peer"); return -EOPNOTSUPP;
}
/** * drbd_send_current_state() - Sends the drbd state to the peer * @peer_device: DRBD peer device.
*/ int drbd_send_current_state(struct drbd_peer_device *peer_device)
{ struct drbd_socket *sock; struct p_state *p;
sock = &peer_device->connection->data;
p = drbd_prepare_command(peer_device, sock); if (!p) return -EIO;
p->state = cpu_to_be32(peer_device->device->state.i); /* Within the send mutex */ return drbd_send_command(peer_device, sock, P_STATE, sizeof(*p), NULL, 0);
}
/** * drbd_send_state() - After a state change, sends the new state to the peer * @peer_device: DRBD peer device. * @state: the state to send, not necessarily the current state. * * Each state change queues an "after_state_ch" work, which will eventually * send the resulting new state to the peer. If more state changes happen * between queuing and processing of the after_state_ch work, we still * want to send each intermediary state in the order it occurred.
*/ int drbd_send_state(struct drbd_peer_device *peer_device, union drbd_state state)
{ struct drbd_socket *sock; struct p_state *p;
sock = &peer_device->connection->data;
p = drbd_prepare_command(peer_device, sock); if (!p) return -EIO;
p->state = cpu_to_be32(state.i); /* Within the send mutex */ return drbd_send_command(peer_device, sock, P_STATE, sizeof(*p), NULL, 0);
}
int drbd_send_state_req(struct drbd_peer_device *peer_device, union drbd_state mask, union drbd_state val)
{ struct drbd_socket *sock; struct p_req_state *p;
/* may we use this feature? */
rcu_read_lock();
use_rle = rcu_dereference(first_peer_device(device)->connection->net_conf)->use_rle;
rcu_read_unlock(); if (!use_rle || first_peer_device(device)->connection->agreed_pro_version < 90) return 0;
if (c->bit_offset >= c->bm_bits) return 0; /* nothing to do. */
/* use at most thus many bytes */
bitstream_init(&bs, p->code, size, 0);
memset(p->code, 0, size); /* plain bits covered in this code string */
plain_bits = 0;
/* p->encoding & 0x80 stores whether the first run length is set. * bit offset is implicit.
* start with toggle == 2 to be able to tell the first iteration */
toggle = 2;
/* see how much plain bits we can stuff into one packet
* using RLE and VLI. */ do {
tmp = (toggle == 0) ? _drbd_bm_find_next_zero(device, c->bit_offset)
: _drbd_bm_find_next(device, c->bit_offset); if (tmp == -1UL)
tmp = c->bm_bits;
rl = tmp - c->bit_offset;
if (toggle == 2) { /* first iteration */ if (rl == 0) { /* the first checked bit was set,
* store start value, */
dcbp_set_start(p, 1); /* but skip encoding of zero run length */
toggle = !toggle; continue;
}
dcbp_set_start(p, 0);
}
/* paranoia: catch zero runlength.
* can only happen if bitmap is modified while we scan it. */ if (rl == 0) {
drbd_err(device, "unexpected zero runlength while encoding bitmap " "t:%u bo:%lu\n", toggle, c->bit_offset); return -1;
}
bits = vli_encode_bits(&bs, rl); if (bits == -ENOBUFS) /* buffer full */ break; if (bits <= 0) {
drbd_err(device, "error while encoding bitmap: %d\n", bits); return 0;
}
if (plain_bits < (len << 3)) { /* incompressible with this method.
* we need to rewind both word and bit position. */
c->bit_offset -= plain_bits;
bm_xfer_ctx_bit_to_word_offset(c);
c->bit_offset = c->word_offset * BITS_PER_LONG; return 0;
}
/* RLE + VLI was able to compress it just fine.
* update c->word_offset. */
bm_xfer_ctx_bit_to_word_offset(c);
/* store pad_bits */
dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
return len;
}
/* * send_bitmap_rle_or_plain * * Return 0 when done, 1 when another iteration is needed, and a negative error * code upon failure.
*/ staticint
send_bitmap_rle_or_plain(struct drbd_peer_device *peer_device, struct bm_xfer_ctx *c)
{ struct drbd_device *device = peer_device->device; struct drbd_socket *sock = &peer_device->connection->data; unsignedint header_size = drbd_header_size(peer_device->connection); struct p_compressed_bm *p = sock->sbuf + header_size; int len, err;
len = fill_bitmap_rle_bits(device, p,
DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c); if (len < 0) return -EIO;
if (c->bit_offset >= c->bm_bits)
len = 0; /* DONE */
} else { /* was not compressible.
* send a buffer full of plain text bits instead. */ unsignedint data_size; unsignedlong num_words; unsignedlong *p = sock->sbuf + header_size;
if (c->bit_offset > c->bm_bits)
c->bit_offset = c->bm_bits;
} if (!err) { if (len == 0) {
INFO_bm_xfer_stats(peer_device, "send", c); return 0;
} else return 1;
} return -EIO;
}
/* See the comment at receive_bitmap() */ staticint _drbd_send_bitmap(struct drbd_device *device, struct drbd_peer_device *peer_device)
{ struct bm_xfer_ctx c; int err;
if (!expect(device, device->bitmap)) returnfalse;
if (get_ldev(device)) { if (drbd_md_test_flag(device->ldev, MDF_FULL_SYNC)) {
drbd_info(device, "Writing the whole bitmap, MDF_FullSync was set.\n");
drbd_bm_set_all(device); if (drbd_bm_write(device, peer_device)) { /* write_bm did fail! Leave full sync flag set in Meta P_DATA * but otherwise process as per normal - need to tell other
* side that a full resync is required! */
drbd_err(device, "Failed to write bitmap to disk!\n");
} else {
drbd_md_clear_flag(device, MDF_FULL_SYNC);
drbd_md_sync(device);
}
}
put_ldev(device);
}
/** * _drbd_send_ack() - Sends an ack packet * @peer_device: DRBD peer device. * @cmd: Packet command code. * @sector: sector, needs to be in big endian byte order * @blksize: size in byte, needs to be in big endian byte order * @block_id: Id, big endian byte order
*/ staticint _drbd_send_ack(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
u64 sector, u32 blksize, u64 block_id)
{ struct drbd_socket *sock; struct p_block_ack *p;
if (peer_device->device->state.conn < C_CONNECTED) return -EIO;
/* dp->sector and dp->block_id already/still in network byte order, * data_size is payload size according to dp->head,
* and may need to be corrected for digest size. */ void drbd_send_ack_dp(struct drbd_peer_device *peer_device, enum drbd_packet cmd, struct p_data *dp, int data_size)
{ if (peer_device->connection->peer_integrity_tfm)
data_size -= crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
_drbd_send_ack(peer_device, cmd, dp->sector, cpu_to_be32(data_size),
dp->block_id);
}
/* This function misuses the block_id field to signal if the blocks
* are is sync or not. */ int drbd_send_ack_ex(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
sector_t sector, int blksize, u64 block_id)
{ return _drbd_send_ack(peer_device, cmd,
cpu_to_be64(sector),
cpu_to_be32(blksize),
cpu_to_be64(block_id));
}
/* called on sndtimeo * returns false if we should retry, * true if we think connection is dead
*/ staticint we_should_drop_the_connection(struct drbd_connection *connection, struct socket *sock)
{ int drop_it; /* long elapsed = (long)(jiffies - device->last_received); */
/* The idea of sendpage seems to be to put some kind of reference * to the page into the skb, and to hand it over to the NIC. In * this process get_page() gets called. * * As soon as the page was really sent over the network put_page() * gets called by some part of the network layer. [ NIC driver? ] * * [ get_page() / put_page() increment/decrement the count. If count * reaches 0 the page will be freed. ] * * This works nicely with pages from FSs. * But this means that in protocol A we might signal IO completion too early! * * In order not to corrupt data during a resync we must make sure * that we do not reuse our own buffer pages (EEs) to early, therefore * we have the net_ee list. * * XFS seems to have problems, still, it submits pages with page_count == 0! * As a workaround, we disable sendpage on pages * with page_count == 0 or PageSlab.
*/ staticint _drbd_no_send_page(struct drbd_peer_device *peer_device, struct page *page, int offset, size_t size, unsigned msg_flags)
{ struct socket *socket; void *addr; int err;
staticint _drbd_send_page(struct drbd_peer_device *peer_device, struct page *page, int offset, size_t size, unsigned msg_flags)
{ struct socket *socket = peer_device->connection->data.socket; struct msghdr msg = { .msg_flags = msg_flags, }; struct bio_vec bvec; int len = size; int err = -EIO;
/* e.g. XFS meta- & log-data is in slab pages, which have a * page_count of 0 and/or have PageSlab() set. * we cannot use send_page for those, as that does get_page(); * put_page(); and would cause either a VM_BUG directly, or * __page_cache_release a page that would actually still be referenced
* by someone, leading to some obscure delayed Oops somewhere else. */ if (!drbd_disable_sendpage && sendpages_ok(page, len, offset))
msg.msg_flags |= MSG_NOSIGNAL | MSG_SPLICE_PAGES;
drbd_update_congested(peer_device->connection); do { int sent;
/* our digest is still only over the payload.
* TRIM does not carry any payload. */ if (digest_size)
drbd_csum_bio(peer_device->connection->integrity_tfm, req->master_bio, digest_out);
err = __send_command(peer_device->connection, device->vnr, sock, P_DATA, sizeof(*p) + digest_size, NULL, req->i.size); if (!err) { /* For protocol A, we have to memcpy the payload into * socket buffers, as we may complete right away * as soon as we handed it over to tcp, at which point the data * pages may become invalid. * * For data-integrity enabled, we copy it as well, so we can be * sure that even if the bio pages may still be modified, it * won't change the data on the wire, thus if the digest checks * out ok after sending on this side, but does not fit on the * receiving side, we sure have detected corruption elsewhere.
*/ if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || digest_size)
err = _drbd_send_bio(peer_device, req->master_bio); else
err = _drbd_send_zc_bio(peer_device, req->master_bio);
/* double check digest, sometimes buffers have been modified in flight. */ if (digest_size > 0 && digest_size <= 64) { /* 64 byte, 512 bit, is the largest digest size
* currently supported in kernel crypto. */ unsignedchar digest[64];
drbd_csum_bio(peer_device->connection->integrity_tfm, req->master_bio, digest); if (memcmp(p + 1, digest, digest_size)) {
drbd_warn(device, "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
(unsignedlonglong)req->i.sector, req->i.size);
}
} /* else if (digest_size > 64) { ... Be noisy about digest too large ...
} */
}
out:
mutex_unlock(&sock->mutex); /* locked by drbd_prepare_command() */
return err;
}
/* answer packet, used to send data back for read requests: * Peer -> (diskless) R_PRIMARY (P_DATA_REPLY) * C_SYNC_SOURCE -> C_SYNC_TARGET (P_RS_DATA_REPLY)
*/ int drbd_send_block(struct drbd_peer_device *peer_device, enum drbd_packet cmd, struct drbd_peer_request *peer_req)
{ struct drbd_device *device = peer_device->device; struct drbd_socket *sock; struct p_data *p; int err; int digest_size;
sock = &peer_device->connection->data;
p = drbd_prepare_command(peer_device, sock);
Packets sent via the data socket "sock" and packets sent via the meta data socket "msock"
sock msock -----------------+-------------------------+------------------------------ timeout conf.timeout / 2 conf.timeout / 2 timeout action send a ping via msock Abort communication and close all sockets
*/
/* * you must have down()ed the appropriate [m]sock_mutex elsewhere!
*/ int drbd_send(struct drbd_connection *connection, struct socket *sock, void *buf, size_t size, unsigned msg_flags)
{ struct kvec iov = {.iov_base = buf, .iov_len = size}; struct msghdr msg = {.msg_flags = msg_flags | MSG_NOSIGNAL}; int rv, sent = 0;
mutex_lock(&drbd_main_mutex);
spin_lock_irqsave(&device->resource->req_lock, flags); /* to have a stable device->state.role
* and no race with updating open_cnt */
if (device->state.role != R_PRIMARY) { if (mode & BLK_OPEN_WRITE)
rv = -EROFS; elseif (!drbd_allow_oos)
rv = -EMEDIUMTYPE;
}
if (!rv)
device->open_cnt++;
spin_unlock_irqrestore(&device->resource->req_lock, flags);
mutex_unlock(&drbd_main_mutex);
if (device->bitmap) /* should no longer be there. */
drbd_bm_cleanup(device);
__free_page(device->md_io.page);
put_disk(device->vdisk);
kfree(device->rs_plan_s);
/* not for_each_connection(connection, resource): * those may have been cleaned up and disassociated already.
*/
for_each_peer_device_safe(peer_device, tmp_peer_device, device) {
kref_put(&peer_device->connection->kref, drbd_destroy_connection);
kfree(peer_device);
} if (device->submit.wq)
destroy_workqueue(device->submit.wq);
kfree(device);
kref_put(&resource->kref, drbd_destroy_resource);
}
/* One global retry thread, if we need to push back some bio and have it * reinserted through our make request function.
*/ staticstruct retry_worker { struct workqueue_struct *wq; struct work_struct worker;
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.