/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD * "criss-cross" setup, that might cause write-out on some other DRBD,
* which in turn might block on the other node at this very place. */ for (i = 0; i < number; i++) {
tmp = mempool_alloc(&drbd_buffer_page_pool, GFP_TRY); if (!tmp) goto fail;
set_page_private(tmp, (unsignedlong)page);
page = tmp;
} return page;
fail:
page_chain_for_each_safe(page, tmp) {
set_page_private(page, 0);
mempool_free(page, &drbd_buffer_page_pool);
} return NULL;
}
/** * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled) * @peer_device: DRBD device. * @number: number of pages requested * @retry: whether to retry, if not enough pages are available right now * * Tries to allocate number pages, first from our own page pool, then from * the kernel. * Possibly retry until DRBD frees sufficient pages somewhere else. * * If this allocation would exceed the max_buffers setting, we throttle * allocation (schedule_timeout) to give the system some room to breathe. * * We do not use max-buffers as hard limit, because it could lead to * congestion and further to a distributed deadlock during online-verify or * (checksum based) resync, if the max-buffers, socket buffer sizes and * resync-rate settings are mis-configured. * * Returns a page chain linked via page->private.
*/ struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsignedint number, bool retry)
{ struct drbd_device *device = peer_device->device; struct page *page; struct net_conf *nc; unsignedint mxb;
if (atomic_read(&device->pp_in_use) >= mxb)
schedule_timeout_interruptible(HZ / 10);
page = __drbd_alloc_pages(number);
if (page)
atomic_add(number, &device->pp_in_use); return page;
}
/* Must not be used from irq, as that may deadlock: see drbd_alloc_pages. * Is also used from inside an other spin_lock_irq(&resource->req_lock); * Either links the page chain back to the global pool,
* or returns all pages to the system. */ staticvoid drbd_free_pages(struct drbd_device *device, struct page *page)
{ struct page *tmp; int i = 0;
if (page == NULL) return;
page_chain_for_each_safe(page, tmp) {
set_page_private(page, 0); if (page_count(page) == 1)
mempool_free(page, &drbd_buffer_page_pool); else
put_page(page);
i++;
}
i = atomic_sub_return(i, &device->pp_in_use); if (i < 0)
drbd_warn(device, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
}
/* You need to hold the req_lock: _drbd_wait_ee_list_empty()
You must not have the req_lock: drbd_free_peer_req() drbd_alloc_peer_req() drbd_free_peer_reqs() drbd_ee_fix_bhs() drbd_finish_peer_reqs() drbd_clear_done_ee() drbd_wait_ee_list_empty()
*/
if (nr_pages) {
page = drbd_alloc_pages(peer_device, nr_pages,
gfpflags_allow_blocking(gfp_mask)); if (!page) goto fail; if (!mempool_is_saturated(&drbd_buffer_page_pool))
peer_req->flags |= EE_RELEASE_TO_MEMPOOL;
}
memset(peer_req, 0, sizeof(*peer_req));
INIT_LIST_HEAD(&peer_req->w.list);
drbd_clear_interval(&peer_req->i);
peer_req->i.size = request_size;
peer_req->i.sector = sector;
peer_req->submit_jif = jiffies;
peer_req->peer_device = peer_device;
peer_req->pages = page; /* * The block_id is opaque to the receiver. It is not endianness * converted, and sent back to the sender unchanged.
*/
peer_req->block_id = id;
/* * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
*/ staticint drbd_finish_peer_reqs(struct drbd_device *device)
{
LIST_HEAD(work_list); struct drbd_peer_request *peer_req, *t; int err = 0;
/* possible callbacks here: * e_end_block, and e_end_resync_block, e_send_superseded. * all ignore the last argument.
*/
list_for_each_entry_safe(peer_req, t, &work_list, w.list) { int err2;
/* list_del not necessary, next/prev members not touched */
err2 = peer_req->w.cb(&peer_req->w, !!err); if (!err)
err = err2;
drbd_free_peer_req(device, peer_req);
}
wake_up(&device->ee_wait);
/* avoids spin_lock/unlock
* and calling prepare_to_wait in the fast path */ while (!list_empty(head)) {
prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
spin_unlock_irq(&device->resource->req_lock);
io_schedule();
finish_wait(&device->ee_wait, &wait);
spin_lock_irq(&device->resource->req_lock);
}
}
/* quoting tcp(7): * On individual connections, the socket buffer size must be set prior to the * listen(2) or connect(2) calls in order to have it take effect. * This is our wrapper to do so.
*/ staticvoid drbd_setbufsize(struct socket *sock, unsignedint snd, unsignedint rcv)
{ /* open coded SO_SNDBUF, SO_RCVBUF */ if (snd) {
sock->sk->sk_sndbuf = snd;
sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
} if (rcv) {
sock->sk->sk_rcvbuf = rcv;
sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
}
}
/* explicitly bind to the configured IP as source IP * for the outgoing connections. * This is needed for multihomed hosts and to be * able to use lo: interfaces for drbd. * Make sure to use 0 as port number, so linux selects * a free one dynamically.
*/
what = "bind before connect";
err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len); if (err < 0) goto out;
/* connect may fail, peer not yet available.
* stay C_WF_CONNECTION, don't go Disconnecting! */
disconnect_on_error = 0;
what = "connect";
err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
out: if (err < 0) { if (sock) {
sock_release(sock);
sock = NULL;
} switch (-err) { /* timeout, busy, signal pending */ case ETIMEDOUT: case EAGAIN: case EINPROGRESS: case EINTR: case ERESTARTSYS: /* peer not (yet) available, network problem */ case ECONNREFUSED: case ENETUNREACH: case EHOSTDOWN: case EHOSTUNREACH:
disconnect_on_error = 0; break; default:
drbd_err(connection, "%s failed, err = %d\n", what, err);
} if (disconnect_on_error)
conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
}
/** * drbd_socket_okay() - Free the socket if its connection is not okay * @sock: pointer to the pointer to the socket.
*/ staticbool drbd_socket_okay(struct socket **sock)
{ int rr; char tb[4];
ok = drbd_socket_okay(sock1);
ok = drbd_socket_okay(sock2) && ok;
return ok;
}
/* Gets called if a connection is established, or if a new minor gets created
in a connection */ int drbd_connected(struct drbd_peer_device *peer_device)
{ struct drbd_device *device = peer_device->device; int err;
err = drbd_send_sync_param(peer_device); if (!err)
err = drbd_send_sizes(peer_device, 0, 0); if (!err)
err = drbd_send_uuids(peer_device); if (!err)
err = drbd_send_current_state(peer_device);
clear_bit(USE_DEGR_WFC_T, &device->flags);
clear_bit(RESIZE_PENDING, &device->flags);
atomic_set(&device->ap_in_flight, 0);
mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */ return err;
}
/* * return values: * 1 yes, we have a valid connection * 0 oops, did not work out, please try again * -1 peer talks different language, * no point in trying again, please go standalone. * -2 We do not have a network config...
*/ staticint conn_connect(struct drbd_connection *connection)
{ struct drbd_socket sock, msock; struct drbd_peer_device *peer_device; struct net_conf *nc; int vnr, timeout, h; bool discard_my_data, ok; enum drbd_state_rv rv; struct accept_wait_data ad = {
.connection = connection,
.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
};
/* NOT YET ... * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10; * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; * first set it to the P_CONNECTION_FEATURES timeout,
* which we set to 4x the configured ping_timeout. */
rcu_read_lock();
nc = rcu_dereference(connection->net_conf);
if (drbd_send_protocol(connection) == -EOPNOTSUPP) return -1;
/* Prevent a race between resync-handshake and * being promoted to Primary. * * Grab and release the state mutex, so we know that any current * drbd_set_role() is finished, and any incoming drbd_set_role * will see the STATE_SENT flag, and wait for it to be cleared.
*/
idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
mutex_lock(peer_device->device->state_mutex);
/* avoid a race with conn_request_state( C_DISCONNECTING ) */
spin_lock_irq(&connection->resource->req_lock);
set_bit(STATE_SENT, &connection->flags);
spin_unlock_irq(&connection->resource->req_lock);
drbd_thread_start(&connection->ack_receiver); /* opencoded create_singlethread_workqueue(),
* to be able to use format string arguments */
connection->ack_sender =
alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name); if (!connection->ack_sender) {
drbd_err(connection, "Failed to create workqueue ack_sender\n"); return 0;
}
mutex_lock(&connection->resource->conf_update); /* The discard_my_data flag is a single-shot modifier to the next * connection attempt, the handshake of which is now well underway. * No need for rcu style copying of the whole struct
* just to clear a single value. */
connection->net_conf->discard_my_data = 0;
mutex_unlock(&connection->resource->conf_update);
return h;
out_release_sockets: if (ad.s_listen)
sock_release(ad.s_listen); if (sock.socket)
sock_release(sock.socket); if (msock.socket)
sock_release(msock.socket); return -1;
}
err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT); if (err != size) { /* If we have nothing in the receive buffer now, to reduce * application latency, try to drain the backend queues as * quickly as possible, and let remote TCP know what we have
* received so far. */ if (err == -EAGAIN) {
tcp_sock_set_quickack(connection->data.socket->sk, 2);
drbd_unplug_all_devices(connection);
} if (err > 0) {
buffer += err;
size -= err;
}
err = drbd_recv_all_warn(connection, buffer, size); if (err) return err;
}
return err;
} /* This is blkdev_issue_flush, but asynchronous. * We want to submit to all component volumes in parallel, * then wait for all completions.
*/ struct issue_flush_context {
atomic_t pending; int error; struct completion done;
}; struct one_flush_context { struct drbd_device *device; struct issue_flush_context *ctx;
};
if (bio->bi_status) {
ctx->error = blk_status_to_errno(bio->bi_status);
drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
}
kfree(octx);
bio_put(bio);
if (!octx) {
drbd_warn(device, "Could not allocate a octx, CANNOT ISSUE FLUSH\n"); /* FIXME: what else can I do now? disconnecting or detaching * really does not help to improve the state of the world, either.
*/
bio_put(bio);
if (!get_ldev(device)) continue;
kref_get(&device->kref);
rcu_read_unlock();
submit_one_flush(device, &ctx);
rcu_read_lock();
}
rcu_read_unlock();
/* Do we want to add a timeout,
* if disk-timeout is set? */ if (!atomic_dec_and_test(&ctx.pending))
wait_for_completion(&ctx.done);
if (ctx.error) { /* would rather check on EOPNOTSUPP, but that is not reliable. * don't try again for ANY return value != 0
* if (rv == -EOPNOTSUPP) */ /* Any error is already reported by bio_endio callback. */
drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
}
}
}
/** * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it. * @connection: DRBD connection. * @epoch: Epoch object. * @ev: Epoch event.
*/ staticenum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection, struct drbd_epoch *epoch, enum epoch_event ev)
{ int epoch_size; struct drbd_epoch *next_epoch; enum finish_epoch rv = FE_STILL_LIVE;
spin_lock(&connection->epoch_lock); do {
next_epoch = NULL;
epoch_size = atomic_read(&epoch->epoch_size);
switch (ev & ~EV_CLEANUP) { case EV_PUT:
atomic_dec(&epoch->active); break; case EV_GOT_BARRIER_NR:
set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags); break; case EV_BECAME_LAST: /* nothing to do*/ break;
}
if (epoch_size != 0 &&
atomic_read(&epoch->active) == 0 &&
(test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) { if (!(ev & EV_CLEANUP)) {
spin_unlock(&connection->epoch_lock);
drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
spin_lock(&connection->epoch_lock);
} #if 0 /* FIXME: dec unacked on connection, once we have
* something to count pending connection packets in. */ if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
dec_unacked(epoch->connection); #endif
if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
wo = WO_DRAIN_IO; if (wo == WO_DRAIN_IO && !dc->disk_drain)
wo = WO_NONE;
return wo;
}
/* * drbd_bump_write_ordering() - Fall back to an other write ordering method * @wo: Write ordering method to try.
*/ void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev, enum write_ordering_e wo)
{ struct drbd_device *device; enum write_ordering_e pwo; int vnr; staticchar *write_ordering_str[] = {
[WO_NONE] = "none",
[WO_DRAIN_IO] = "drain",
[WO_BDEV_FLUSH] = "flush",
};
pwo = resource->write_ordering; if (wo != WO_BDEV_FLUSH)
wo = min(pwo, wo);
rcu_read_lock();
idr_for_each_entry(&resource->devices, device, vnr) { if (get_ldev(device)) {
wo = max_allowed_wo(device->ldev, wo); if (device->ldev == bdev)
bdev = NULL;
put_ldev(device);
}
}
if (bdev)
wo = max_allowed_wo(bdev, wo);
rcu_read_unlock();
resource->write_ordering = wo; if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
}
/* * Mapping "discard" to ZEROOUT with UNMAP does not work for us: * Drivers have to "announce" q->limits.max_write_zeroes_sectors, or it * will directly go to fallback mode, submitting normal writes, and * never even try to UNMAP. * * And dm-thin does not do this (yet), mostly because in general it has * to assume that "skip_block_zeroing" is set. See also: * https://www.mail-archive.com/dm-devel%40redhat.com/msg07965.html * https://www.redhat.com/archives/dm-devel/2018-January/msg00271.html * * We *may* ignore the discard-zeroes-data setting, if so configured. * * Assumption is that this "discard_zeroes_data=0" is only because the backend * may ignore partial unaligned discards. * * LVM/DM thin as of at least * LVM version: 2.02.115(2)-RHEL7 (2015-01-28) * Library version: 1.02.93-RHEL7 (2015-01-28) * Driver version: 4.29.0 * still behaves this way. * * For unaligned (wrt. alignment and granularity) or too small discards, * we zero-out the initial (and/or) trailing unaligned partial chunks, * but discard all the aligned full chunks. * * At least for LVM/DM thin, with skip_block_zeroing=false, * the result is effectively "discard_zeroes_data=1".
*/ /* flags: EE_TRIM|EE_ZEROOUT */ int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsignedint nr_sectors, int flags)
{ struct block_device *bdev = device->ldev->backing_bdev;
sector_t tmp, nr; unsignedint max_discard_sectors, granularity; int alignment; int err = 0;
if ((flags & EE_ZEROOUT) || !(flags & EE_TRIM)) goto zero_out;
/* Zero-sector (unknown) and one-sector granularities are the same. */
granularity = max(bdev_discard_granularity(bdev) >> 9, 1U);
alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
tmp = start; if (sector_div(tmp, granularity) != alignment) { if (nr_sectors < 2*granularity) goto zero_out; /* start + gran - (start + gran - align) % gran */
tmp = start + granularity - alignment;
tmp = start + granularity - sector_div(tmp, granularity);
nr = tmp - start; /* don't flag BLKDEV_ZERO_NOUNMAP, we don't know how many
* layers are below us, some may have smaller granularity */
err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
nr_sectors -= nr;
start = tmp;
} while (nr_sectors >= max_discard_sectors) {
err |= blkdev_issue_discard(bdev, start, max_discard_sectors,
GFP_NOIO);
nr_sectors -= max_discard_sectors;
start += max_discard_sectors;
} if (nr_sectors) { /* max_discard_sectors is unsigned int (and a multiple of * granularity, we made sure of that above already); * nr is < max_discard_sectors;
* I don't need sector_div here, even though nr is sector_t */
nr = nr_sectors;
nr -= (unsignedint)nr % granularity; if (nr) {
err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO);
nr_sectors -= nr;
start += nr;
}
}
zero_out: if (nr_sectors) {
err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO,
(flags & EE_TRIM) ? 0 : BLKDEV_ZERO_NOUNMAP);
} return err != 0;
}
staticvoid drbd_issue_peer_discard_or_zero_out(struct drbd_device *device, struct drbd_peer_request *peer_req)
{ /* If the backend cannot discard, or does not guarantee * read-back zeroes in discarded ranges, we fall back to * zero-out. Unless configuration specifically requested
* otherwise. */ if (!can_do_reliable_discards(device))
peer_req->flags |= EE_ZEROOUT;
/** * drbd_submit_peer_request() * @peer_req: peer request * * May spread the pages to multiple bios, * depending on bio_add_page restrictions. * * Returns 0 if all bios have been submitted, * -ENOMEM if we could not allocate enough bios, * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a * single page to an empty bio (which should never happen and likely indicates * that the lower level IO stack is in some way broken). This has been observed * on certain Xen deployments.
*/ /* TODO allocate from our own bio_set. */ int drbd_submit_peer_request(struct drbd_peer_request *peer_req)
{ struct drbd_device *device = peer_req->peer_device->device; struct bio *bios = NULL; struct bio *bio; struct page *page = peer_req->pages;
sector_t sector = peer_req->i.sector; unsignedint data_size = peer_req->i.size; unsignedint n_bios = 0; unsignedint nr_pages = PFN_UP(data_size);
/* TRIM/DISCARD: for now, always use the helper function * blkdev_issue_zeroout(..., discard=true). * It's synchronous, but it does the right thing wrt. bio splitting. * Correctness first, performance later. Next step is to code an * asynchronous variant of the same.
*/ if (peer_req->flags & (EE_TRIM | EE_ZEROOUT)) { /* wait for all pending IO completions, before we start
* zeroing things out. */
conn_wait_active_ee_empty(peer_req->peer_device->connection); /* add it to the active list now,
* so we can find it to present it in debugfs */
peer_req->submit_jif = jiffies;
peer_req->flags |= EE_SUBMITTED;
/* If this was a resync request from receive_rs_deallocated(),
* it is already on the sync_ee list */ if (list_empty(&peer_req->w.list)) {
spin_lock_irq(&device->resource->req_lock);
list_add_tail(&peer_req->w.list, &device->active_ee);
spin_unlock_irq(&device->resource->req_lock);
}
/* In most cases, we will only need one bio. But in case the lower * level restrictions happen to be different at this offset on this * side than those of the sending peer, we may need to submit the * request in more than one bio. * * Plain bio_alloc is good enough here, this is no DRBD internally * generated bio, but a bio allocated on behalf of the peer.
*/
next_bio: /* _DISCARD, _WRITE_ZEROES handled above. * REQ_OP_FLUSH (empty flush) not expected, * should have been mapped to a "drbd protocol barrier". * REQ_OP_SECURE_ERASE: I don't see how we could ever support that.
*/ if (!(peer_req_op(peer_req) == REQ_OP_WRITE ||
peer_req_op(peer_req) == REQ_OP_READ)) {
drbd_err(device, "Invalid bio op received: 0x%x\n", peer_req->opf); return -EINVAL;
}
bio = bio_alloc(device->ldev->backing_bdev, nr_pages, peer_req->opf, GFP_NOIO); /* > peer_req->i.sector, unless this is the first bio */
bio->bi_iter.bi_sector = sector;
bio->bi_private = peer_req;
bio->bi_end_io = drbd_peer_request_endio;
/* FIXME these are unacked on connection, * not a specific (peer)device.
*/
connection->current_epoch->barrier_nr = p->barrier;
connection->current_epoch->connection = connection;
rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
/* P_BARRIER_ACK may imply that the corresponding extent is dropped from * the activity log, which means it would not be resynced in case the * R_PRIMARY crashes now. * Therefore we must send the barrier_ack after the barrier request was
* completed. */ switch (connection->resource->write_ordering) { case WO_NONE: if (rv == FE_RECYCLED) return 0;
/* receiver context, in the writeout path of the other node.
* avoid potential distributed deadlock */
epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO); if (epoch) break; else
drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
fallthrough;
case WO_BDEV_FLUSH: case WO_DRAIN_IO:
conn_wait_active_ee_empty(connection);
drbd_flush(connection);
if (atomic_read(&connection->current_epoch->epoch_size)) {
epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO); if (epoch) break;
}
/* used from receive_RSDataReply (recv_resync_read) * and from receive_Data. * data_size: actual payload ("data in") * for normal writes that is bi_size. * for discards, that is zero. * for write same, it is logical_block_size. * both trim and write same have the bi_size ("data len to be affected") * as extra argument in the packet header.
*/ staticstruct drbd_peer_request *
read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector, struct packet_info *pi) __must_hold(local)
{ struct drbd_device *device = peer_device->device; const sector_t capacity = get_capacity(device->vdisk); struct drbd_peer_request *peer_req; struct page *page; int digest_size, err; unsignedint data_size = pi->size, ds; void *dig_in = peer_device->connection->int_dig_in; void *dig_vv = peer_device->connection->int_dig_vv; unsignedlong *data; struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL; struct p_trim *zeroes = (pi->cmd == P_ZEROES) ? pi->data : NULL;
digest_size = 0; if (!trim && peer_device->connection->peer_integrity_tfm) {
digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm); /* * FIXME: Receive the incoming digest into the receive buffer * here, together with its struct p_data?
*/
err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size); if (err) return NULL;
data_size -= digest_size;
}
/* assume request_size == data_size, but special case trim. */
ds = data_size; if (trim) { if (!expect(peer_device, data_size == 0)) return NULL;
ds = be32_to_cpu(trim->size);
} elseif (zeroes) { if (!expect(peer_device, data_size == 0)) return NULL;
ds = be32_to_cpu(zeroes->size);
}
if (!expect(peer_device, IS_ALIGNED(ds, 512))) return NULL; if (trim || zeroes) { if (!expect(peer_device, ds <= (DRBD_MAX_BBIO_SECTORS << 9))) return NULL;
} elseif (!expect(peer_device, ds <= DRBD_MAX_BIO_SIZE)) return NULL;
/* even though we trust out peer,
* we sometimes have to double check. */ if (sector + (ds>>9) > capacity) {
drbd_err(device, "request from peer beyond end of local disk: " "capacity: %llus < sector: %llus + size: %u\n",
(unsignedlonglong)capacity,
(unsignedlonglong)sector, ds); return NULL;
}
/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD * "criss-cross" setup, that might cause write-out on some other DRBD,
* which in turn might block on the other node at this very place. */
peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO); if (!peer_req) return NULL;
/* drbd_drain_block() just takes a data block * out of the socket input buffer, and discards it.
*/ staticint drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
{ struct page *page; int err = 0; void *data;
if (!data_size) return 0;
page = drbd_alloc_pages(peer_device, 1, 1);
data = kmap(page); while (data_size) { unsignedint len = min_t(int, data_size, PAGE_SIZE);
/* optimistically update recv_cnt. if receiving fails below,
* we disconnect anyways, and counters will be reset. */
peer_device->device->recv_cnt += data_size>>9;
bio = req->master_bio;
D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
atomic_add(pi->size >> 9, &device->rs_sect_ev); if (drbd_submit_peer_request(peer_req) == 0) return 0;
/* don't care for the reason here */
drbd_err(device, "submit failed, triggering re-connect\n");
spin_lock_irq(&device->resource->req_lock);
list_del(&peer_req->w.list);
spin_unlock_irq(&device->resource->req_lock);
err = recv_dless_read(peer_device, req, sector, pi->size); if (!err)
req_mod(req, DATA_RECEIVED, peer_device); /* else: nothing. handled from drbd_disconnect... * I don't think we may complete this just yet
* in case we are "on-disconnect: freeze" */
if (get_ldev(device)) { /* data is submitted to disk within recv_resync_read. * corresponding put_ldev done below on error,
* or in drbd_peer_request_endio. */
err = recv_resync_read(peer_device, sector, pi);
} else { if (drbd_ratelimit())
drbd_err(device, "Can not write resync data to local disk.\n");
drbd_for_each_overlap(i, &device->write_requests, sector, size) { if (!i->local) continue;
req = container_of(i, struct drbd_request, i); if (req->rq_state & RQ_LOCAL_PENDING ||
!(req->rq_state & RQ_POSTPONED)) continue; /* as it is RQ_POSTPONED, this will cause it to
* be queued on the retry workqueue. */
__req_mod(req, CONFLICT_RESOLVED, NULL, NULL);
}
}
/* * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
*/ staticint e_end_block(struct drbd_work *w, int cancel)
{ struct drbd_peer_request *peer_req =
container_of(w, struct drbd_peer_request, w); struct drbd_peer_device *peer_device = peer_req->peer_device; struct drbd_device *device = peer_device->device;
sector_t sector = peer_req->i.sector; int err = 0, pcmd;
if (peer_req->flags & EE_SEND_WRITE_ACK) { if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
pcmd = (device->state.conn >= C_SYNC_SOURCE &&
device->state.conn <= C_PAUSED_SYNC_T &&
peer_req->flags & EE_MAY_SET_IN_SYNC) ?
P_RS_WRITE_ACK : P_WRITE_ACK;
err = drbd_send_ack(peer_device, pcmd, peer_req); if (pcmd == P_RS_WRITE_ACK)
drbd_set_in_sync(peer_device, sector, peer_req->i.size);
} else {
err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req); /* we expect it to be marked out of sync anyways...
* maybe assert this? */
}
dec_unacked(device);
}
/* we delete from the conflict detection hash _after_ we sent out the
* P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */ if (peer_req->flags & EE_IN_INTERVAL_TREE) {
spin_lock_irq(&device->resource->req_lock);
D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
drbd_remove_epoch_entry_interval(device, peer_req); if (peer_req->flags & EE_RESTART_REQUESTS)
restart_conflicting_writes(device, sector, peer_req->i.size);
spin_unlock_irq(&device->resource->req_lock);
} else
D_ASSERT(device, drbd_interval_empty(&peer_req->i));
staticbool seq_greater(u32 a, u32 b)
{ /* * We assume 32-bit wrap-around here. * For 24-bit wrap-around, we would have to shift: * a <<= 8; b <<= 8;
*/ return (s32)a - (s32)b > 0;
}
static u32 seq_max(u32 a, u32 b)
{ return seq_greater(a, b) ? a : b;
}
if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
spin_lock(&device->peer_seq_lock);
newest_peer_seq = seq_max(device->peer_seq, peer_seq);
device->peer_seq = newest_peer_seq;
spin_unlock(&device->peer_seq_lock); /* wake up only if we actually changed device->peer_seq */ if (peer_seq == newest_peer_seq)
wake_up(&device->seq_wait);
}
}
/* Called from receive_Data. * Synchronize packets on sock with packets on msock. * * This is here so even when a P_DATA packet traveling via sock overtook an Ack * packet traveling on msock, they are still processed in the order they have * been sent. * * Note: we don't care for Ack packets overtaking P_DATA packets. * * In case packet_seq is larger than device->peer_seq number, there are * outstanding packets on the msock. We wait for them to arrive. * In case we are the logically next packet, we update device->peer_seq * ourselves. Correctly handles 32bit wrap around. * * Assume we have a 10 GBit connection, that is about 1<<30 byte per second, * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have * 1<<9 == 512 seconds aka ages for the 32bit wrap around... * * returns 0 if we may process the packet,
* -ERESTARTSYS if we were interrupted (by disconnect signal). */ staticint wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
{ struct drbd_device *device = peer_device->device;
DEFINE_WAIT(wait); long timeout; int ret = 0, tp;
if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) return 0;
spin_lock(&device->peer_seq_lock); for (;;) { if (!seq_greater(peer_seq - 1, device->peer_seq)) {
device->peer_seq = seq_max(device->peer_seq, peer_seq); break;
}
if (signal_pending(current)) {
ret = -ERESTARTSYS; break;
}
/* * Inserting the peer request into the write_requests tree will prevent * new conflicting local requests from being added.
*/
drbd_insert_interval(&device->write_requests, &peer_req->i);
repeat:
drbd_for_each_overlap(i, &device->write_requests, sector, size) { if (i == &peer_req->i) continue; if (i->completed) continue;
if (!i->local) { /* * Our peer has sent a conflicting remote request; this * should not happen in a two-node setup. Wait for the * earlier peer request to complete.
*/
err = drbd_wait_misc(device, i); if (err) goto out; goto repeat;
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.