#include <linux/drbd_limits.h> #include"drbd_int.h" #include"drbd_protocol.h" #include"drbd_req.h"/* only for _req_mod in tl_release and tl_clear */ #include"drbd_vli.h" #include"drbd_debugfs.h"
#include <linux/moduleparam.h> /* thanks to these macros, if compiled into the kernel (not-module),
* these become boot parameters (e.g., drbd.minor_count) */
#ifdef CONFIG_DRBD_FAULT_INJECTION int drbd_enable_faults; int drbd_fault_rate; staticint drbd_fault_count; staticint drbd_fault_devs; /* bitmap of enabled faults */
module_param_named(enable_faults, drbd_enable_faults, int, 0664); /* fault rate % value - applies to all enabled faults */
module_param_named(fault_rate, drbd_fault_rate, int, 0664); /* count of faults inserted */
module_param_named(fault_count, drbd_fault_count, int, 0664); /* bitmap of devices to insert faults on */
module_param_named(fault_devs, drbd_fault_devs, int, 0644); #endif
#ifdef __CHECKER__ /* When checking with sparse, and this is an inline function, sparse will give tons of false positives. When this is a real functions sparse works.
*/ int _get_ldev_if_state(struct drbd_device *device, enum drbd_disk_state mins)
{ int io_allowed;
atomic_inc(&device->local_cnt);
io_allowed = (device->state.disk >= mins); if (!io_allowed) { if (atomic_dec_and_test(&device->local_cnt))
wake_up(&device->misc_wait);
} return io_allowed;
}
#endif
/** * tl_release() - mark as BARRIER_ACKED all requests in the corresponding transfer log epoch * @connection: DRBD connection. * @barrier_nr: Expected identifier of the DRBD write barrier packet. * @set_size: Expected number of requests before that barrier. * * In case the passed barrier_nr or set_size does not match the oldest * epoch of not yet barrier-acked requests, this function will cause a * termination of the connection.
*/ void tl_release(struct drbd_connection *connection, unsignedint barrier_nr, unsignedint set_size)
{ struct drbd_request *r; struct drbd_request *req = NULL, *tmp = NULL; int expect_epoch = 0; int expect_size = 0;
spin_lock_irq(&connection->resource->req_lock);
/* find oldest not yet barrier-acked write request,
* count writes in its epoch. */
list_for_each_entry(r, &connection->transfer_log, tl_requests) { constunsigned s = r->rq_state; if (!req) { if (!(s & RQ_WRITE)) continue; if (!(s & RQ_NET_MASK)) continue; if (s & RQ_NET_DONE) continue;
req = r;
expect_epoch = req->epoch;
expect_size ++;
} else { if (r->epoch != expect_epoch) break; if (!(s & RQ_WRITE)) continue; /* if (s & RQ_DONE): not expected */ /* if (!(s & RQ_NET_MASK)): not expected */
expect_size++;
}
}
/* first some paranoia code */ if (req == NULL) {
drbd_err(connection, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
barrier_nr); goto bail;
} if (expect_epoch != barrier_nr) {
drbd_err(connection, "BAD! BarrierAck #%u received, expected #%u!\n",
barrier_nr, expect_epoch); goto bail;
}
if (expect_size != set_size) {
drbd_err(connection, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
barrier_nr, set_size, expect_size); goto bail;
}
/* Clean up list of requests processed during current epoch. */ /* this extra list walk restart is paranoia, * to catch requests being barrier-acked "unexpectedly".
* It usually should find the same req again, or some READ preceding it. */
list_for_each_entry(req, &connection->transfer_log, tl_requests) if (req->epoch == expect_epoch) {
tmp = req; break;
}
req = list_prepare_entry(tmp, &connection->transfer_log, tl_requests);
list_for_each_entry_safe_from(req, r, &connection->transfer_log, tl_requests) { struct drbd_peer_device *peer_device; if (req->epoch != expect_epoch) break;
peer_device = conn_peer_device(connection, req->device->vnr);
_req_mod(req, BARRIER_ACKED, peer_device);
}
spin_unlock_irq(&connection->resource->req_lock);
/** * _tl_restart() - Walks the transfer log, and applies an action to all requests * @connection: DRBD connection to operate on. * @what: The action/event to perform with all request objects * * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO, * RESTART_FROZEN_DISK_IO.
*/ /* must hold resource->req_lock */ void _tl_restart(struct drbd_connection *connection, enum drbd_req_event what)
{ struct drbd_peer_device *peer_device; struct drbd_request *req, *r;
/** * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL * @connection: DRBD connection. * * This is called after the connection to the peer was lost. The storage covered * by the requests on the transfer gets marked as our of sync. Called from the * receiver thread and the worker thread.
*/ void tl_clear(struct drbd_connection *connection)
{
tl_restart(connection, CONNECTION_LOST_WHILE_PENDING);
}
/** * tl_abort_disk_io() - Abort disk I/O for all requests for a certain device in the TL * @device: DRBD device.
*/ void tl_abort_disk_io(struct drbd_device *device)
{ struct drbd_connection *connection = first_peer_device(device)->connection; struct drbd_request *req, *r;
/* if the receiver has been "EXITING", the last thing it did * was set the conn state to "StandAlone", * if now a re-connect request comes in, conn state goes C_UNCONNECTED, * and receiver thread will be "started". * drbd_thread_start needs to set "RESTARTING" in that case. * t_state check and assignment needs to be within the same spinlock, * so either thread_start sees EXITING, and can remap to RESTARTING, * or thread_start see NONE, and can proceed as normal.
*/
/* Get ref on module for thread - this is released when thread exits */ if (!try_module_get(THIS_MODULE)) {
drbd_err(resource, "Failed to get module reference in drbd_thread_start\n");
spin_unlock_irqrestore(&thi->t_lock, flags); returnfalse;
}
kref_get(&resource->kref); if (thi->connection)
kref_get(&thi->connection->kref);
init_completion(&thi->stop);
thi->reset_cpu_mask = 1;
thi->t_state = RUNNING;
spin_unlock_irqrestore(&thi->t_lock, flags);
flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
nt = kthread_create(drbd_thread_setup, (void *) thi, "drbd_%c_%s", thi->name[0], thi->resource->name);
if (IS_ERR(nt)) {
drbd_err(resource, "Couldn't start thread\n");
#ifdef CONFIG_SMP /* * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs * * Forces all threads of a resource onto the same CPU. This is beneficial for * DRBD's performance. May be overwritten by user's configuration.
*/ staticvoid drbd_calc_cpu_mask(cpumask_var_t *cpu_mask)
{ unsignedint *resources_per_cpu, min_index = ~0;
resources_per_cpu = kcalloc(nr_cpu_ids, sizeof(*resources_per_cpu),
GFP_KERNEL); if (resources_per_cpu) { struct drbd_resource *resource; unsignedint cpu, min = ~0;
/** * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread * @thi: drbd_thread object * * call in the "main loop" of _all_ threads, no need for any mutex, current won't die * prematurely.
*/ void drbd_thread_current_set_cpu(struct drbd_thread *thi)
{ struct drbd_resource *resource = thi->resource; struct task_struct *p = current;
/* * drbd_header_size - size of a packet header * * The header size is a multiple of 8, so any payload following the header is * word aligned on 64-bit architectures. (The bitmap send and receive code * relies on this.)
*/ unsignedint drbd_header_size(struct drbd_connection *connection)
{ if (connection->agreed_pro_version >= 100) {
BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8)); returnsizeof(struct p_header100);
} else {
BUILD_BUG_ON(sizeof(struct p_header80) != sizeof(struct p_header95));
BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8)); returnsizeof(struct p_header80);
}
}
staticint __send_command(struct drbd_connection *connection, int vnr, struct drbd_socket *sock, enum drbd_packet cmd, unsignedint header_size, void *data, unsignedint size)
{ int msg_flags; int err;
/* * Called with @data == NULL and the size of the data blocks in @size * for commands that send data blocks. For those commands, omit the * MSG_MORE flag: this will increase the likelihood that data blocks * which are page aligned on the sender will end up page aligned on the * receiver.
*/
msg_flags = data ? MSG_MORE : 0;
header_size += prepare_header(connection, vnr, sock->sbuf, cmd,
header_size + size);
err = drbd_send_all(connection, sock->socket, sock->sbuf, header_size,
msg_flags); if (data && !err)
err = drbd_send_all(connection, sock->socket, data, size, 0); /* DRBD protocol "pings" are latency critical.
* This is supposed to trigger tcp_push_pending_frames() */ if (!err && (cmd == P_PING || cmd == P_PING_ACK))
tcp_sock_set_nodelay(sock->socket->sk);
if (nc->tentative && connection->agreed_pro_version < 92) {
rcu_read_unlock();
drbd_err(connection, "--dry-run is not supported by peer"); return -EOPNOTSUPP;
}
/** * drbd_send_current_state() - Sends the drbd state to the peer * @peer_device: DRBD peer device.
*/ int drbd_send_current_state(struct drbd_peer_device *peer_device)
{ struct drbd_socket *sock; struct p_state *p;
sock = &peer_device->connection->data;
p = drbd_prepare_command(peer_device, sock); if (!p) return -EIO;
p->state = cpu_to_be32(peer_device->device->state.i); /* Within the send mutex */ return drbd_send_command(peer_device, sock, P_STATE, sizeof(*p), NULL, 0);
}
/** * drbd_send_state() - After a state change, sends the new state to the peer * @peer_device: DRBD peer device. * @state: the state to send, not necessarily the current state. * * Each state change queues an "after_state_ch" work, which will eventually * send the resulting new state to the peer. If more state changes happen * between queuing and processing of the after_state_ch work, we still * want to send each intermediary state in the order it occurred.
*/ int drbd_send_state(struct drbd_peer_device *peer_device, union drbd_state state)
{ struct drbd_socket *sock; struct p_state *p;
sock = &peer_device->connection->data;
p = drbd_prepare_command(peer_device, sock); if (!p) return -EIO;
p->state = cpu_to_be32(state.i); /* Within the send mutex */ return drbd_send_command(peer_device, sock, P_STATE, sizeof(*p), NULL, 0);
}
int drbd_send_state_req(struct drbd_peer_device *peer_device, union drbd_state mask, union drbd_state val)
{ struct drbd_socket *sock; struct p_req_state *p;
/* may we use this feature? */
rcu_read_lock();
use_rle = rcu_dereference(first_peer_device(device)->connection->net_conf)->use_rle;
rcu_read_unlock(); if (!use_rle || first_peer_device(device)->connection->agreed_pro_version < 90) return 0;
if (c->bit_offset >= c->bm_bits) return 0; /* nothing to do. */
/* use at most thus many bytes */
bitstream_init(&bs, p->code, size, 0);
memset(p->code, 0, size); /* plain bits covered in this code string */
plain_bits = 0;
/* p->encoding & 0x80 stores whether the first run length is set. * bit offset is implicit.
* start with toggle == 2 to be able to tell the first iteration */
toggle = 2;
/* see how much plain bits we can stuff into one packet
* using RLE and VLI. */ do {
tmp = (toggle == 0) ? _drbd_bm_find_next_zero(device, c->bit_offset)
: _drbd_bm_find_next(device, c->bit_offset); if (tmp == -1UL)
tmp = c->bm_bits;
rl = tmp - c->bit_offset;
if (toggle == 2) { /* first iteration */ if (rl == 0) { /* the first checked bit was set,
* store start value, */
dcbp_set_start(p, 1); /* but skip encoding of zero run length */
toggle = !toggle; continue;
}
dcbp_set_start(p, 0);
}
/* paranoia: catch zero runlength.
* can only happen if bitmap is modified while we scan it. */ if (rl == 0) {
drbd_err(device, "unexpected zero runlength while encoding bitmap " "t:%u bo:%lu\n", toggle, c->bit_offset); return -1;
}
bits = vli_encode_bits(&bs, rl); if (bits == -ENOBUFS) /* buffer full */ break; if (bits <= 0) {
drbd_err(device, "error while encoding bitmap: %d\n", bits); return 0;
}
if (plain_bits < (len << 3)) { /* incompressible with this method.
* we need to rewind both word and bit position. */
c->bit_offset -= plain_bits;
bm_xfer_ctx_bit_to_word_offset(c);
c->bit_offset = c->word_offset * BITS_PER_LONG; return 0;
}
/* RLE + VLI was able to compress it just fine.
* update c->word_offset. */
bm_xfer_ctx_bit_to_word_offset(c);
/* store pad_bits */
dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
return len;
}
/* * send_bitmap_rle_or_plain * * Return 0 when done, 1 when another iteration is needed, and a negative error * code upon failure.
*/ staticint
send_bitmap_rle_or_plain(struct drbd_peer_device *peer_device, struct bm_xfer_ctx *c)
{ struct drbd_device *device = peer_device->device; struct drbd_socket *sock = &peer_device->connection->data; unsignedint header_size = drbd_header_size(peer_device->connection); struct p_compressed_bm *p = sock->sbuf + header_size; int len, err;
len = fill_bitmap_rle_bits(device, p,
DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c); if (len < 0) return -EIO;
if (c->bit_offset >= c->bm_bits)
len = 0; /* DONE */
} else { /* was not compressible.
* send a buffer full of plain text bits instead. */ unsignedint data_size; unsignedlong num_words; unsignedlong *p = sock->sbuf + header_size;
if (c->bit_offset > c->bm_bits)
c->bit_offset = c->bm_bits;
} if (!err) { if (len == 0) {
INFO_bm_xfer_stats(peer_device, "send", c); return 0;
} else return 1;
} return -EIO;
}
/* See the comment at receive_bitmap() */ staticint _drbd_send_bitmap(struct drbd_device *device, struct drbd_peer_device *peer_device)
{ struct bm_xfer_ctx c; int err;
if (!expect(device, device->bitmap)) returnfalse;
if (get_ldev(device)) { if (drbd_md_test_flag(device->ldev, MDF_FULL_SYNC)) {
drbd_info(device, "Writing the whole bitmap, MDF_FullSync was set.\n");
drbd_bm_set_all(device); if (drbd_bm_write(device, peer_device)) { /* write_bm did fail! Leave full sync flag set in Meta P_DATA * but otherwise process as per normal - need to tell other
* side that a full resync is required! */
drbd_err(device, "Failed to write bitmap to disk!\n");
} else {
drbd_md_clear_flag(device, MDF_FULL_SYNC);
drbd_md_sync(device);
}
}
put_ldev(device);
}
/** * _drbd_send_ack() - Sends an ack packet * @peer_device: DRBD peer device. * @cmd: Packet command code. * @sector: sector, needs to be in big endian byte order * @blksize: size in byte, needs to be in big endian byte order * @block_id: Id, big endian byte order
*/ staticint _drbd_send_ack(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
u64 sector, u32 blksize, u64 block_id)
{ struct drbd_socket *sock; struct p_block_ack *p;
if (peer_device->device->state.conn < C_CONNECTED) return -EIO;
/* dp->sector and dp->block_id already/still in network byte order, * data_size is payload size according to dp->head,
* and may need to be corrected for digest size. */ void drbd_send_ack_dp(struct drbd_peer_device *peer_device, enum drbd_packet cmd, struct p_data *dp, int data_size)
{ if (peer_device->connection->peer_integrity_tfm)
data_size -= crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
_drbd_send_ack(peer_device, cmd, dp->sector, cpu_to_be32(data_size),
dp->block_id);
}
/* This function misuses the block_id field to signal if the blocks
* are is sync or not. */ int drbd_send_ack_ex(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
sector_t sector, int blksize, u64 block_id)
{ return _drbd_send_ack(peer_device, cmd,
cpu_to_be64(sector),
cpu_to_be32(blksize),
cpu_to_be64(block_id));
}
/* called on sndtimeo * returns false if we should retry, * true if we think connection is dead
*/ staticint we_should_drop_the_connection(struct drbd_connection *connection, struct socket *sock)
{ int drop_it; /* long elapsed = (long)(jiffies - device->last_received); */
/* The idea of sendpage seems to be to put some kind of reference * to the page into the skb, and to hand it over to the NIC. In * this process get_page() gets called. * * As soon as the page was really sent over the network put_page() * gets called by some part of the network layer. [ NIC driver? ] * * [ get_page() / put_page() increment/decrement the count. If count * reaches 0 the page will be freed. ] * * This works nicely with pages from FSs. * But this means that in protocol A we might signal IO completion too early! * * In order not to corrupt data during a resync we must make sure * that we do not reuse our own buffer pages (EEs) to early, therefore * we have the net_ee list. * * XFS seems to have problems, still, it submits pages with page_count == 0! * As a workaround, we disable sendpage on pages * with page_count == 0 or PageSlab.
*/ staticint _drbd_no_send_page(struct drbd_peer_device *peer_device, struct page *page, int offset, size_t size, unsigned msg_flags)
{ struct socket *socket; void *addr; int err;
staticint _drbd_send_page(struct drbd_peer_device *peer_device, struct page *page, int offset, size_t size, unsigned msg_flags)
{ struct socket *socket = peer_device->connection->data.socket; struct msghdr msg = { .msg_flags = msg_flags, }; struct bio_vec bvec; int len = size; int err = -EIO;
/* e.g. XFS meta- & log-data is in slab pages, which have a * page_count of 0 and/or have PageSlab() set. * we cannot use send_page for those, as that does get_page(); * put_page(); and would cause either a VM_BUG directly, or * __page_cache_release a page that would actually still be referenced
* by someone, leading to some obscure delayed Oops somewhere else. */ if (!drbd_disable_sendpage && sendpages_ok(page, len, offset))
msg.msg_flags |= MSG_NOSIGNAL | MSG_SPLICE_PAGES;
drbd_update_congested(peer_device->connection); do { int sent;
/* our digest is still only over the payload.
* TRIM does not carry any payload. */ if (digest_size)
drbd_csum_bio(peer_device->connection->integrity_tfm, req->master_bio, digest_out);
err = __send_command(peer_device->connection, device->vnr, sock, P_DATA, sizeof(*p) + digest_size, NULL, req->i.size); if (!err) { /* For protocol A, we have to memcpy the payload into * socket buffers, as we may complete right away * as soon as we handed it over to tcp, at which point the data * pages may become invalid. * * For data-integrity enabled, we copy it as well, so we can be * sure that even if the bio pages may still be modified, it * won't change the data on the wire, thus if the digest checks * out ok after sending on this side, but does not fit on the * receiving side, we sure have detected corruption elsewhere.
*/ if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || digest_size)
err = _drbd_send_bio(peer_device, req->master_bio); else
err = _drbd_send_zc_bio(peer_device, req->master_bio);
/* double check digest, sometimes buffers have been modified in flight. */ if (digest_size > 0 && digest_size <= 64) { /* 64 byte, 512 bit, is the largest digest size
* currently supported in kernel crypto. */ unsignedchar digest[64];
drbd_csum_bio(peer_device->connection->integrity_tfm, req->master_bio, digest); if (memcmp(p + 1, digest, digest_size)) {
drbd_warn(device, "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
(unsignedlonglong)req->i.sector, req->i.size);
}
} /* else if (digest_size > 64) { ... Be noisy about digest too large ...
} */
}
out:
mutex_unlock(&sock->mutex); /* locked by drbd_prepare_command() */
return err;
}
/* answer packet, used to send data back for read requests: * Peer -> (diskless) R_PRIMARY (P_DATA_REPLY) * C_SYNC_SOURCE -> C_SYNC_TARGET (P_RS_DATA_REPLY)
*/ int drbd_send_block(struct drbd_peer_device *peer_device, enum drbd_packet cmd, struct drbd_peer_request *peer_req)
{ struct drbd_device *device = peer_device->device; struct drbd_socket *sock; struct p_data *p; int err; int digest_size;
sock = &peer_device->connection->data;
p = drbd_prepare_command(peer_device, sock);
Packets sent via the data socket "sock" and packets sent via the meta data socket "msock"
sock msock -----------------+-------------------------+------------------------------ timeout conf.timeout / 2 conf.timeout / 2 timeout action send a ping via msock Abort communication and close all sockets
*/
/* * you must have down()ed the appropriate [m]sock_mutex elsewhere!
*/ int drbd_send(struct drbd_connection *connection, struct socket *sock, void *buf, size_t size, unsigned msg_flags)
{ struct kvec iov = {.iov_base = buf, .iov_len = size}; struct msghdr msg = {.msg_flags = msg_flags | MSG_NOSIGNAL}; int rv, sent = 0;
mutex_lock(&drbd_main_mutex);
spin_lock_irqsave(&device->resource->req_lock, flags); /* to have a stable device->state.role
* and no race with updating open_cnt */
if (device->state.role != R_PRIMARY) { if (mode & BLK_OPEN_WRITE)
rv = -EROFS; elseif (!drbd_allow_oos)
rv = -EMEDIUMTYPE;
}
if (!rv)
device->open_cnt++;
spin_unlock_irqrestore(&device->resource->req_lock, flags);
mutex_unlock(&drbd_main_mutex);
if (device->bitmap) /* should no longer be there. */
drbd_bm_cleanup(device);
__free_page(device->md_io.page);
put_disk(device->vdisk);
kfree(device->rs_plan_s);
/* not for_each_connection(connection, resource): * those may have been cleaned up and disassociated already.
*/
for_each_peer_device_safe(peer_device, tmp_peer_device, device) {
kref_put(&peer_device->connection->kref, drbd_destroy_connection);
kfree(peer_device);
} if (device->submit.wq)
destroy_workqueue(device->submit.wq);
kfree(device);
kref_put(&resource->kref, drbd_destroy_resource);
}
/* One global retry thread, if we need to push back some bio and have it * reinserted through our make request function.
*/ staticstruct retry_worker { struct workqueue_struct *wq; struct work_struct worker;
if (!expected)
drbd_err(device, "req=%p completion_ref=%d rq_state=%x\n",
req, atomic_read(&req->completion_ref),
req->rq_state);
/* We still need to put one kref associated with the * "completion_ref" going zero in the code path that queued it * here. The request object may still be referenced by a * frozen local req->private_bio, in case we force-detached.
*/
kref_put(&req->kref, drbd_req_destroy);
/* A single suspended or otherwise blocking device may stall * all others as well. Fortunately, this code path is to * recover from a situation that "should not happen": * concurrent writes in multi-primary setup. * In a "normal" lifecycle, this workqueue is supposed to be * destroyed without ever doing anything. * If it turns out to be an issue anyways, we can do per * resource (replication group) or per device (minor) retry * workqueues instead.
*/
/* We are not just doing submit_bio_noacct(),
* as we want to keep the start_time information. */
inc_ap_bio(device);
__drbd_make_request(device, bio);
}
}
/* Drop the extra reference that would otherwise * have been dropped by complete_master_bio.
* do_retry() needs to grab a new one. */
dec_ap_bio(req->device);
/* first remove proc, * drbdsetup uses it's presence to detect * whether DRBD is loaded. * If we would get stuck in proc removal, * but have netlink already deregistered, * some drbdsetup commands may wait forever * for an answer.
*/ if (drbd_proc)
remove_proc_entry("drbd", NULL);
staticint init_submitter(struct drbd_device *device)
{ /* opencoded create_singlethread_workqueue(),
* to be able to say "drbd%d", ..., minor */
device->submit.wq =
alloc_ordered_workqueue("drbd%u_submit", WQ_MEM_RECLAIM, device->minor); if (!device->submit.wq) return -ENOMEM;
enum drbd_ret_code drbd_create_device(struct drbd_config_context *adm_ctx, unsignedint minor)
{ struct drbd_resource *resource = adm_ctx->resource; struct drbd_connection *connection, *n; struct drbd_device *device; struct drbd_peer_device *peer_device, *tmp_peer_device; struct gendisk *disk; int id; int vnr = adm_ctx->volume; enum drbd_ret_code err = ERR_NOMEM; struct queue_limits lim = { /* * Setting the max_hw_sectors to an odd value of 8kibyte here. * This triggers a max_bio_size message upon first attach or * connect.
*/
.max_hw_sectors = DRBD_MAX_BIO_SIZE_SAFE >> 8,
.features = BLK_FEAT_WRITE_CACHE | BLK_FEAT_FUA |
BLK_FEAT_ROTATIONAL |
BLK_FEAT_STABLE_WRITES,
};
device = minor_to_device(minor); if (device) return ERR_MINOR_OR_VOLUME_EXISTS;
/* GFP_KERNEL, we are outside of all write-out paths */
device = kzalloc(sizeof(struct drbd_device), GFP_KERNEL); if (!device) return ERR_NOMEM;
kref_init(&device->kref);
device->md_io.page = alloc_page(GFP_KERNEL); if (!device->md_io.page) goto out_no_io_page;
if (drbd_bm_init(device)) goto out_no_bitmap;
device->read_requests = RB_ROOT;
device->write_requests = RB_ROOT;
id = idr_alloc(&drbd_devices, device, minor, minor + 1, GFP_KERNEL); if (id < 0) { if (id == -ENOSPC)
err = ERR_MINOR_OR_VOLUME_EXISTS; goto out_no_minor_idr;
}
kref_get(&device->kref);
id = idr_alloc(&resource->devices, device, vnr, vnr + 1, GFP_KERNEL); if (id < 0) { if (id == -ENOSPC)
err = ERR_MINOR_OR_VOLUME_EXISTS; goto out_idr_remove_minor;
}
kref_get(&device->kref);
fail:
drbd_cleanup(); if (err == -ENOMEM)
pr_err("ran out of memory\n"); else
pr_err("initialization failure\n"); return err;
}
staticvoid drbd_free_one_sock(struct drbd_socket *ds)
{ struct socket *s;
mutex_lock(&ds->mutex);
s = ds->socket;
ds->socket = NULL;
mutex_unlock(&ds->mutex); if (s) { /* so debugfs does not need to mutex_lock() */
synchronize_rcu();
kernel_sock_shutdown(s, SHUT_RDWR);
sock_release(s);
}
}
void drbd_free_sock(struct drbd_connection *connection)
{ if (connection->data.socket)
drbd_free_one_sock(&connection->data); if (connection->meta.socket)
drbd_free_one_sock(&connection->meta);
}
/* meta data management */
void conn_md_sync(struct drbd_connection *connection)
{ struct drbd_peer_device *peer_device; int vnr;
if (drbd_md_sync_page_io(device, device->ldev, sector, REQ_OP_WRITE)) { /* this was a try anyways ... */
drbd_err(device, "meta data update failed!\n");
drbd_chk_io_error(device, 1, DRBD_META_IO_ERROR);
}
}
/** * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set * @device: DRBD device.
*/ void drbd_md_sync(struct drbd_device *device)
{ struct meta_data_on_disk *buffer;
/* Don't accidentally change the DRBD meta data layout. */
BUILD_BUG_ON(UI_SIZE != 4);
BUILD_BUG_ON(sizeof(struct meta_data_on_disk) != 4096);
timer_delete(&device->md_sync_timer); /* timer may be rearmed by drbd_md_mark_dirty() now. */ if (!test_and_clear_bit(MD_DIRTY, &device->flags)) return;
/* We use here D_FAILED and not D_ATTACHING because we try to write
* metadata even if we detach due to a disk failure! */ if (!get_ldev_if_state(device, D_FAILED)) return;
buffer = drbd_md_get_buffer(device, __func__); if (!buffer) goto out;
drbd_md_write(device, buffer);
/* Update device->ldev->md.la_size_sect,
* since we updated it on metadata. */
device->ldev->md.la_size_sect = get_capacity(device->vdisk);
/* both not set: default to old fixed size activity log */ if (al_stripes == 0 && al_stripe_size_4k == 0) {
al_stripes = 1;
al_stripe_size_4k = MD_32kB_SECT/8;
}
/* some paranoia plausibility checks */
/* we need both values to be set */ if (al_stripes == 0 || al_stripe_size_4k == 0) goto err;
al_size_4k = (u64)al_stripes * al_stripe_size_4k;
/* Upper limit of activity log area, to avoid potential overflow * problems in al_tr_number_to_on_disk_sector(). As right now, more * than 72 * 4k blocks total only increases the amount of history,
* limiting this arbitrarily to 16 GB is not a real limitation ;-) */ if (al_size_4k > (16 * 1024 * 1024/4)) goto err;
/* Lower limit: we need at least 8 transaction slots (32kB)
* to not break existing setups */ if (al_size_4k < MD_32kB_SECT/8) goto err;
/* The on-disk size of the activity log, calculated from offsets, and * the size of the activity log calculated from the stripe settings, * should match. * Though we could relax this a bit: it is ok, if the striped activity log * fits in the available on-disk activity log size. * Right now, that would break how resize is implemented. * TODO: make drbd_determine_dev_size() (and the drbdmeta tool) aware
* of possible unused padding space in the on disk layout. */ if (in_core->al_offset < 0) { if (in_core->bm_offset > in_core->al_offset) goto err;
on_disk_al_sect = -in_core->al_offset;
on_disk_bm_sect = in_core->al_offset - in_core->bm_offset;
} else { if (in_core->al_offset != MD_4kB_SECT) goto err; if (in_core->bm_offset < in_core->al_offset + in_core->al_size_4k * MD_4kB_SECT) goto err;
/* old fixed size meta data is exactly that: fixed. */ if (in_core->meta_dev_idx >= 0) { if (in_core->md_size_sect != MD_128MB_SECT
|| in_core->al_offset != MD_4kB_SECT
|| in_core->bm_offset != MD_4kB_SECT + MD_32kB_SECT
|| in_core->al_stripes != 1
|| in_core->al_stripe_size_4k != MD_32kB_SECT/8) goto err;
}
if (capacity < in_core->md_size_sect) goto err; if (capacity - in_core->md_size_sect < drbd_md_first_sector(bdev)) goto err;
/* should be aligned, and at least 32k */ if ((on_disk_al_sect & 7) || (on_disk_al_sect < MD_32kB_SECT)) goto err;
/* should fit (for now: exactly) into the available on-disk space;
* overflow prevention is in check_activity_log_stripe_size() above. */ if (on_disk_al_sect != in_core->al_size_4k * MD_4kB_SECT) goto err;
/* again, should be aligned */ if (in_core->bm_offset & 7) goto err;
/* FIXME check for device grow with flex external meta data? */
/* can the available bitmap space cover the last agreed device size? */ if (on_disk_bm_sect < (in_core->la_size_sect+7)/MD_4kB_SECT/8/512) goto err;
/** * drbd_md_read() - Reads in the meta data super block * @device: DRBD device. * @bdev: Device from which the meta data should be read in. * * Return NO_ERROR on success, and an enum drbd_ret_code in case * something goes wrong. * * Called exactly once during drbd_adm_attach(), while still being D_DISKLESS, * even before @bdev is assigned to @device->ldev.
*/ int drbd_md_read(struct drbd_device *device, struct drbd_backing_dev *bdev)
{ struct meta_data_on_disk *buffer;
u32 magic, flags; int i, rv = NO_ERROR;
if (device->state.disk != D_DISKLESS) return ERR_DISK_CONFIGURED;
buffer = drbd_md_get_buffer(device, __func__); if (!buffer) return ERR_NOMEM;
/* First, figure out where our meta data superblock is located,
* and read it. */
bdev->md.meta_dev_idx = bdev->disk_conf->meta_dev_idx;
bdev->md.md_offset = drbd_md_ss(bdev); /* Even for (flexible or indexed) external meta data, * initially restrict us to the 4k superblock for now.
* Affects the paranoia out-of-range access check in drbd_md_sync_page_io(). */
bdev->md.md_size_sect = 8;
if (drbd_md_sync_page_io(device, bdev, bdev->md.md_offset,
REQ_OP_READ)) { /* NOTE: can't do normal error processing here as this is
called BEFORE disk is attached */
drbd_err(device, "Error while reading metadata.\n");
rv = ERR_IO_MD_DISK; goto err;
}
magic = be32_to_cpu(buffer->magic);
flags = be32_to_cpu(buffer->flags); if (magic == DRBD_MD_MAGIC_84_UNCLEAN ||
(magic == DRBD_MD_MAGIC_08 && !(flags & MDF_AL_CLEAN))) { /* btw: that's Activity Log clean, not "all" clean. */
drbd_err(device, "Found unclean meta data. Did you \"drbdadm apply-al\"?\n");
rv = ERR_MD_UNCLEAN; goto err;
}
rv = ERR_MD_INVALID; if (magic != DRBD_MD_MAGIC_08) { if (magic == DRBD_MD_MAGIC_07)
drbd_err(device, "Found old (0.7) meta data magic. Did you \"drbdadm create-md\"?\n"); else
drbd_err(device, "Meta data magic not found. Did you \"drbdadm create-md\"?\n"); goto err;
}
/** * drbd_md_mark_dirty() - Mark meta data super block as dirty * @device: DRBD device. * * Call this function if you change anything that should be written to * the meta-data super block. This function sets MD_DIRTY, and starts a * timer that ensures that within five seconds you have to call drbd_md_sync().
*/ void drbd_md_mark_dirty(struct drbd_device *device)
{ if (!test_and_set_bit(MD_DIRTY, &device->flags))
mod_timer(&device->md_sync_timer, jiffies + 5*HZ);
}
void drbd_uuid_move_history(struct drbd_device *device) __must_hold(local)
{ int i;
for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
device->ldev->md.uuid[i+1] = device->ldev->md.uuid[i];
}
void __drbd_uuid_set(struct drbd_device *device, int idx, u64 val) __must_hold(local)
{ if (idx == UI_CURRENT) { if (device->state.role == R_PRIMARY)
val |= 1; else
val &= ~((u64)1);
/** * drbd_uuid_new_current() - Creates a new current UUID * @device: DRBD device. * * Creates a new current UUID, and rotates the old current UUID into * the bitmap slot. Causes an incremental resync upon next connect.
*/ void drbd_uuid_new_current(struct drbd_device *device) __must_hold(local)
{
u64 val; unsignedlonglong bm_uuid;
if (val == 0) {
drbd_uuid_move_history(device);
device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
device->ldev->md.uuid[UI_BITMAP] = 0;
} else { unsignedlonglong bm_uuid = device->ldev->md.uuid[UI_BITMAP]; if (bm_uuid)
drbd_warn(device, "bm UUID was already set: %llX\n", bm_uuid);
device->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
}
spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags);
drbd_md_mark_dirty(device);
}
/** * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io() * @device: DRBD device. * @peer_device: Peer DRBD device. * * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
*/ int drbd_bmio_set_n_write(struct drbd_device *device, struct drbd_peer_device *peer_device) __must_hold(local)
if (!rv) {
drbd_md_clear_flag(device, MDF_FULL_SYNC);
drbd_md_sync(device);
}
return rv;
}
/** * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io() * @device: DRBD device. * @peer_device: Peer DRBD device. * * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
*/ int drbd_bmio_clear_n_write(struct drbd_device *device, struct drbd_peer_device *peer_device) __must_hold(local)
/** * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap * @device: DRBD device. * @io_fn: IO callback to be called when bitmap IO is possible * @done: callback to be called after the bitmap IO was performed * @why: Descriptive text of the reason for doing the IO * @flags: Bitmap flags * @peer_device: Peer DRBD device. * * While IO on the bitmap happens we freeze application IO thus we ensure * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be * called from worker context. It MUST NOT be used while a previous such * work is still pending! * * Its worker function encloses the call of io_fn() by get_ldev() and * put_ldev().
*/ void drbd_queue_bitmap_io(struct drbd_device *device, int (*io_fn)(struct drbd_device *, struct drbd_peer_device *), void (*done)(struct drbd_device *, int), char *why, enum bm_flag flags, struct drbd_peer_device *peer_device)
{
D_ASSERT(device, current == peer_device->connection->worker.task);
D_ASSERT(device, !test_bit(BITMAP_IO_QUEUED, &device->flags));
D_ASSERT(device, !test_bit(BITMAP_IO, &device->flags));
D_ASSERT(device, list_empty(&device->bm_io_work.w.list)); if (device->bm_io_work.why)
drbd_err(device, "FIXME going to queue '%s' but '%s' still pending?\n",
why, device->bm_io_work.why);
spin_lock_irq(&device->resource->req_lock);
set_bit(BITMAP_IO, &device->flags); /* don't wait for pending application IO if the caller indicates that
* application IO does not conflict anyways. */ if (flags == BM_LOCKED_CHANGE_ALLOWED || atomic_read(&device->ap_bio_cnt) == 0) { if (!test_and_set_bit(BITMAP_IO_QUEUED, &device->flags))
drbd_queue_work(&peer_device->connection->sender_work,
&device->bm_io_work.w);
}
spin_unlock_irq(&device->resource->req_lock);
}
/** * drbd_bitmap_io() - Does an IO operation on the whole bitmap * @device: DRBD device. * @io_fn: IO callback to be called when bitmap IO is possible * @why: Descriptive text of the reason for doing the IO * @flags: Bitmap flags * @peer_device: Peer DRBD device. * * freezes application IO while that the actual IO operations runs. This * functions MAY NOT be called from worker context.
*/ int drbd_bitmap_io(struct drbd_device *device, int (*io_fn)(struct drbd_device *, struct drbd_peer_device *), char *why, enum bm_flag flags, struct drbd_peer_device *peer_device)
{ /* Only suspend io, if some operation is supposed to be locked out */ constbool do_suspend_io = flags & (BM_DONT_CLEAR|BM_DONT_SET|BM_DONT_TEST); int rv;
D_ASSERT(device, current != first_peer_device(device)->connection->worker.task);
constchar *cmdname(enum drbd_packet cmd)
{ /* THINK may need to become several global tables * when we want to support more than
* one PRO_VERSION */ staticconstchar *cmdnames[] = {
/* enum drbd_packet, but not commands - obsoleted flags: * P_MAY_IGNORE * P_MAX_OPT_CMD
*/
};
/* too big for the array: 0xfffX */ if (cmd == P_INITIAL_META) return"InitialMeta"; if (cmd == P_INITIAL_DATA) return"InitialData"; if (cmd == P_CONNECTION_FEATURES) return"ConnectionFeatures"; if (cmd >= ARRAY_SIZE(cmdnames)) return"Unknown"; return cmdnames[cmd];
}
/** * drbd_wait_misc - wait for a request to make progress * @device: device associated with the request * @i: the struct drbd_interval embedded in struct drbd_request or * struct drbd_peer_request
*/ int drbd_wait_misc(struct drbd_device *device, struct drbd_interval *i)
{ struct net_conf *nc;
DEFINE_WAIT(wait); long timeout;
#ifdef CONFIG_DRBD_FAULT_INJECTION /* Fault insertion support including random number generator shamelessly
* stolen from kernel/rcutorture.c */ struct fault_random_state { unsignedlong state; unsignedlong count;
};
#define FAULT_RANDOM_MULT 39916801 /* prime */ #define FAULT_RANDOM_ADD 479001701 /* prime */ #define FAULT_RANDOM_REFRESH 10000
/* * Crude but fast random-number generator. Uses a linear congruential * generator, with occasional help from get_random_bytes().
*/ staticunsignedlong
_drbd_fault_random(struct fault_random_state *rsp)
{ long refresh;
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.