Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Linux/drivers/block/drbd/   (Open Source Betriebssystem Version 6.17.9©)  Datei vom 24.10.2025 mit Größe 107 kB image not shown  

Quelle  drbd_main.c   Sprache: C

 
// SPDX-License-Identifier: GPL-2.0-only
/*
   drbd.c

   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.

   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.

   Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
   from Logicworks, Inc. for making SDP replication support possible.


 */


#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt

#include <linux/module.h>
#include <linux/jiffies.h>
#include <linux/drbd.h>
#include <linux/uaccess.h>
#include <asm/types.h>
#include <net/sock.h>
#include <linux/ctype.h>
#include <linux/mutex.h>
#include <linux/fs.h>
#include <linux/file.h>
#include <linux/proc_fs.h>
#include <linux/init.h>
#include <linux/mm.h>
#include <linux/memcontrol.h>
#include <linux/mm_inline.h>
#include <linux/slab.h>
#include <linux/random.h>
#include <linux/reboot.h>
#include <linux/notifier.h>
#include <linux/kthread.h>
#include <linux/workqueue.h>
#include <linux/unistd.h>
#include <linux/vmalloc.h>
#include <linux/sched/signal.h>

#include <linux/drbd_limits.h>
#include "drbd_int.h"
#include "drbd_protocol.h"
#include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
#include "drbd_vli.h"
#include "drbd_debugfs.h"

static DEFINE_MUTEX(drbd_main_mutex);
static int drbd_open(struct gendisk *disk, blk_mode_t mode);
static void drbd_release(struct gendisk *gd);
static void md_sync_timer_fn(struct timer_list *t);
static int w_bitmap_io(struct drbd_work *w, int unused);

MODULE_AUTHOR("Philipp Reisner , "
       "Lars Ellenberg ");
MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
MODULE_VERSION(REL_VERSION);
MODULE_LICENSE("GPL");
MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
   __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);

#include <linux/moduleparam.h>
/* thanks to these macros, if compiled into the kernel (not-module),
 * these become boot parameters (e.g., drbd.minor_count) */


#ifdef CONFIG_DRBD_FAULT_INJECTION
int drbd_enable_faults;
int drbd_fault_rate;
static int drbd_fault_count;
static int drbd_fault_devs;
/* bitmap of enabled faults */
module_param_named(enable_faults, drbd_enable_faults, int, 0664);
/* fault rate % value - applies to all enabled faults */
module_param_named(fault_rate, drbd_fault_rate, int, 0664);
/* count of faults inserted */
module_param_named(fault_count, drbd_fault_count, int, 0664);
/* bitmap of devices to insert faults on */
module_param_named(fault_devs, drbd_fault_devs, int, 0644);
#endif

/* module parameters we can keep static */
static bool drbd_allow_oos; /* allow_open_on_secondary */
static bool drbd_disable_sendpage;
MODULE_PARM_DESC(allow_oos, "DONT USE!");
module_param_named(allow_oos, drbd_allow_oos, bool, 0);
module_param_named(disable_sendpage, drbd_disable_sendpage, bool, 0644);

/* module parameters we share */
int drbd_proc_details; /* Detail level in proc drbd*/
module_param_named(proc_details, drbd_proc_details, int, 0644);
/* module parameters shared with defaults */
unsigned int drbd_minor_count = DRBD_MINOR_COUNT_DEF;
/* Module parameter for setting the user mode helper program
 * to run. Default is /sbin/drbdadm */

char drbd_usermode_helper[80] = "/sbin/drbdadm";
module_param_named(minor_count, drbd_minor_count, uint, 0444);
module_param_string(usermode_helper, drbd_usermode_helper, sizeof(drbd_usermode_helper), 0644);

/* in 2.6.x, our device mapping and config info contains our virtual gendisks
 * as member "struct gendisk *vdisk;"
 */

struct idr drbd_devices;
struct list_head drbd_resources;
struct mutex resources_mutex;

struct kmem_cache *drbd_request_cache;
struct kmem_cache *drbd_ee_cache; /* peer requests */
struct kmem_cache *drbd_bm_ext_cache; /* bitmap extents */
struct kmem_cache *drbd_al_ext_cache; /* activity log extents */
mempool_t drbd_request_mempool;
mempool_t drbd_ee_mempool;
mempool_t drbd_md_io_page_pool;
mempool_t drbd_buffer_page_pool;
struct bio_set drbd_md_io_bio_set;
struct bio_set drbd_io_bio_set;

DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);

static const struct block_device_operations drbd_ops = {
 .owner  = THIS_MODULE,
 .submit_bio = drbd_submit_bio,
 .open  = drbd_open,
 .release = drbd_release,
};

#ifdef __CHECKER__
/* When checking with sparse, and this is an inline function, sparse will
   give tons of false positives. When this is a real functions sparse works.
 */

int _get_ldev_if_state(struct drbd_device *device, enum drbd_disk_state mins)
{
 int io_allowed;

 atomic_inc(&device->local_cnt);
 io_allowed = (device->state.disk >= mins);
 if (!io_allowed) {
  if (atomic_dec_and_test(&device->local_cnt))
   wake_up(&device->misc_wait);
 }
 return io_allowed;
}

#endif

/**
 * tl_release() - mark as BARRIER_ACKED all requests in the corresponding transfer log epoch
 * @connection: DRBD connection.
 * @barrier_nr: Expected identifier of the DRBD write barrier packet.
 * @set_size: Expected number of requests before that barrier.
 *
 * In case the passed barrier_nr or set_size does not match the oldest
 * epoch of not yet barrier-acked requests, this function will cause a
 * termination of the connection.
 */

void tl_release(struct drbd_connection *connection, unsigned int barrier_nr,
  unsigned int set_size)
{
 struct drbd_request *r;
 struct drbd_request *req = NULL, *tmp = NULL;
 int expect_epoch = 0;
 int expect_size = 0;

 spin_lock_irq(&connection->resource->req_lock);

 /* find oldest not yet barrier-acked write request,
 * count writes in its epoch. */

 list_for_each_entry(r, &connection->transfer_log, tl_requests) {
  const unsigned s = r->rq_state;
  if (!req) {
   if (!(s & RQ_WRITE))
    continue;
   if (!(s & RQ_NET_MASK))
    continue;
   if (s & RQ_NET_DONE)
    continue;
   req = r;
   expect_epoch = req->epoch;
   expect_size ++;
  } else {
   if (r->epoch != expect_epoch)
    break;
   if (!(s & RQ_WRITE))
    continue;
   /* if (s & RQ_DONE): not expected */
   /* if (!(s & RQ_NET_MASK)): not expected */
   expect_size++;
  }
 }

 /* first some paranoia code */
 if (req == NULL) {
  drbd_err(connection, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
    barrier_nr);
  goto bail;
 }
 if (expect_epoch != barrier_nr) {
  drbd_err(connection, "BAD! BarrierAck #%u received, expected #%u!\n",
    barrier_nr, expect_epoch);
  goto bail;
 }

 if (expect_size != set_size) {
  drbd_err(connection, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
    barrier_nr, set_size, expect_size);
  goto bail;
 }

 /* Clean up list of requests processed during current epoch. */
 /* this extra list walk restart is paranoia,
 * to catch requests being barrier-acked "unexpectedly".
 * It usually should find the same req again, or some READ preceding it. */

 list_for_each_entry(req, &connection->transfer_log, tl_requests)
  if (req->epoch == expect_epoch) {
   tmp = req;
   break;
  }
 req = list_prepare_entry(tmp, &connection->transfer_log, tl_requests);
 list_for_each_entry_safe_from(req, r, &connection->transfer_log, tl_requests) {
  struct drbd_peer_device *peer_device;
  if (req->epoch != expect_epoch)
   break;
  peer_device = conn_peer_device(connection, req->device->vnr);
  _req_mod(req, BARRIER_ACKED, peer_device);
 }
 spin_unlock_irq(&connection->resource->req_lock);

 return;

bail:
 spin_unlock_irq(&connection->resource->req_lock);
 conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
}


/**
 * _tl_restart() - Walks the transfer log, and applies an action to all requests
 * @connection: DRBD connection to operate on.
 * @what:       The action/event to perform with all request objects
 *
 * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
 * RESTART_FROZEN_DISK_IO.
 */

/* must hold resource->req_lock */
void _tl_restart(struct drbd_connection *connection, enum drbd_req_event what)
{
 struct drbd_peer_device *peer_device;
 struct drbd_request *req, *r;

 list_for_each_entry_safe(req, r, &connection->transfer_log, tl_requests) {
  peer_device = conn_peer_device(connection, req->device->vnr);
  _req_mod(req, what, peer_device);
 }
}

void tl_restart(struct drbd_connection *connection, enum drbd_req_event what)
{
 spin_lock_irq(&connection->resource->req_lock);
 _tl_restart(connection, what);
 spin_unlock_irq(&connection->resource->req_lock);
}

/**
 * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
 * @connection: DRBD connection.
 *
 * This is called after the connection to the peer was lost. The storage covered
 * by the requests on the transfer gets marked as our of sync. Called from the
 * receiver thread and the worker thread.
 */

void tl_clear(struct drbd_connection *connection)
{
 tl_restart(connection, CONNECTION_LOST_WHILE_PENDING);
}

/**
 * tl_abort_disk_io() - Abort disk I/O for all requests for a certain device in the TL
 * @device: DRBD device.
 */

void tl_abort_disk_io(struct drbd_device *device)
{
 struct drbd_connection *connection = first_peer_device(device)->connection;
 struct drbd_request *req, *r;

 spin_lock_irq(&connection->resource->req_lock);
 list_for_each_entry_safe(req, r, &connection->transfer_log, tl_requests) {
  if (!(req->rq_state & RQ_LOCAL_PENDING))
   continue;
  if (req->device != device)
   continue;
  _req_mod(req, ABORT_DISK_IO, NULL);
 }
 spin_unlock_irq(&connection->resource->req_lock);
}

static int drbd_thread_setup(void *arg)
{
 struct drbd_thread *thi = (struct drbd_thread *) arg;
 struct drbd_resource *resource = thi->resource;
 unsigned long flags;
 int retval;

 snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
   thi->name[0],
   resource->name);

 allow_kernel_signal(DRBD_SIGKILL);
 allow_kernel_signal(SIGXCPU);
restart:
 retval = thi->function(thi);

 spin_lock_irqsave(&thi->t_lock, flags);

 /* if the receiver has been "EXITING", the last thing it did
 * was set the conn state to "StandAlone",
 * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
 * and receiver thread will be "started".
 * drbd_thread_start needs to set "RESTARTING" in that case.
 * t_state check and assignment needs to be within the same spinlock,
 * so either thread_start sees EXITING, and can remap to RESTARTING,
 * or thread_start see NONE, and can proceed as normal.
 */


 if (thi->t_state == RESTARTING) {
  drbd_info(resource, "Restarting %s thread\n", thi->name);
  thi->t_state = RUNNING;
  spin_unlock_irqrestore(&thi->t_lock, flags);
  goto restart;
 }

 thi->task = NULL;
 thi->t_state = NONE;
 smp_mb();
 complete_all(&thi->stop);
 spin_unlock_irqrestore(&thi->t_lock, flags);

 drbd_info(resource, "Terminating %s\n", current->comm);

 /* Release mod reference taken when thread was started */

 if (thi->connection)
  kref_put(&thi->connection->kref, drbd_destroy_connection);
 kref_put(&resource->kref, drbd_destroy_resource);
 module_put(THIS_MODULE);
 return retval;
}

static void drbd_thread_init(struct drbd_resource *resource, struct drbd_thread *thi,
        int (*func) (struct drbd_thread *), const char *name)
{
 spin_lock_init(&thi->t_lock);
 thi->task    = NULL;
 thi->t_state = NONE;
 thi->function = func;
 thi->resource = resource;
 thi->connection = NULL;
 thi->name = name;
}

int drbd_thread_start(struct drbd_thread *thi)
{
 struct drbd_resource *resource = thi->resource;
 struct task_struct *nt;
 unsigned long flags;

 /* is used from state engine doing drbd_thread_stop_nowait,
 * while holding the req lock irqsave */

 spin_lock_irqsave(&thi->t_lock, flags);

 switch (thi->t_state) {
 case NONE:
  drbd_info(resource, "Starting %s thread (from %s [%d])\n",
    thi->name, current->comm, current->pid);

  /* Get ref on module for thread - this is released when thread exits */
  if (!try_module_get(THIS_MODULE)) {
   drbd_err(resource, "Failed to get module reference in drbd_thread_start\n");
   spin_unlock_irqrestore(&thi->t_lock, flags);
   return false;
  }

  kref_get(&resource->kref);
  if (thi->connection)
   kref_get(&thi->connection->kref);

  init_completion(&thi->stop);
  thi->reset_cpu_mask = 1;
  thi->t_state = RUNNING;
  spin_unlock_irqrestore(&thi->t_lock, flags);
  flush_signals(current); /* otherw. may get -ERESTARTNOINTR */

  nt = kthread_create(drbd_thread_setup, (void *) thi,
        "drbd_%c_%s", thi->name[0], thi->resource->name);

  if (IS_ERR(nt)) {
   drbd_err(resource, "Couldn't start thread\n");

   if (thi->connection)
    kref_put(&thi->connection->kref, drbd_destroy_connection);
   kref_put(&resource->kref, drbd_destroy_resource);
   module_put(THIS_MODULE);
   return false;
  }
  spin_lock_irqsave(&thi->t_lock, flags);
  thi->task = nt;
  thi->t_state = RUNNING;
  spin_unlock_irqrestore(&thi->t_lock, flags);
  wake_up_process(nt);
  break;
 case EXITING:
  thi->t_state = RESTARTING;
  drbd_info(resource, "Restarting %s thread (from %s [%d])\n",
    thi->name, current->comm, current->pid);
  fallthrough;
 case RUNNING:
 case RESTARTING:
 default:
  spin_unlock_irqrestore(&thi->t_lock, flags);
  break;
 }

 return true;
}


void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
{
 unsigned long flags;

 enum drbd_thread_state ns = restart ? RESTARTING : EXITING;

 /* may be called from state engine, holding the req lock irqsave */
 spin_lock_irqsave(&thi->t_lock, flags);

 if (thi->t_state == NONE) {
  spin_unlock_irqrestore(&thi->t_lock, flags);
  if (restart)
   drbd_thread_start(thi);
  return;
 }

 if (thi->t_state != ns) {
  if (thi->task == NULL) {
   spin_unlock_irqrestore(&thi->t_lock, flags);
   return;
  }

  thi->t_state = ns;
  smp_mb();
  init_completion(&thi->stop);
  if (thi->task != current)
   send_sig(DRBD_SIGKILL, thi->task, 1);
 }

 spin_unlock_irqrestore(&thi->t_lock, flags);

 if (wait)
  wait_for_completion(&thi->stop);
}

#ifdef CONFIG_SMP
/*
 * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
 *
 * Forces all threads of a resource onto the same CPU. This is beneficial for
 * DRBD's performance. May be overwritten by user's configuration.
 */

static void drbd_calc_cpu_mask(cpumask_var_t *cpu_mask)
{
 unsigned int *resources_per_cpu, min_index = ~0;

 resources_per_cpu = kcalloc(nr_cpu_ids, sizeof(*resources_per_cpu),
        GFP_KERNEL);
 if (resources_per_cpu) {
  struct drbd_resource *resource;
  unsigned int cpu, min = ~0;

  rcu_read_lock();
  for_each_resource_rcu(resource, &drbd_resources) {
   for_each_cpu(cpu, resource->cpu_mask)
    resources_per_cpu[cpu]++;
  }
  rcu_read_unlock();
  for_each_online_cpu(cpu) {
   if (resources_per_cpu[cpu] < min) {
    min = resources_per_cpu[cpu];
    min_index = cpu;
   }
  }
  kfree(resources_per_cpu);
 }
 if (min_index == ~0) {
  cpumask_setall(*cpu_mask);
  return;
 }
 cpumask_set_cpu(min_index, *cpu_mask);
}

/**
 * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
 * @thi: drbd_thread object
 *
 * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
 * prematurely.
 */

void drbd_thread_current_set_cpu(struct drbd_thread *thi)
{
 struct drbd_resource *resource = thi->resource;
 struct task_struct *p = current;

 if (!thi->reset_cpu_mask)
  return;
 thi->reset_cpu_mask = 0;
 set_cpus_allowed_ptr(p, resource->cpu_mask);
}
#else
#define drbd_calc_cpu_mask(A) ({})
#endif

/*
 * drbd_header_size  -  size of a packet header
 *
 * The header size is a multiple of 8, so any payload following the header is
 * word aligned on 64-bit architectures.  (The bitmap send and receive code
 * relies on this.)
 */

unsigned int drbd_header_size(struct drbd_connection *connection)
{
 if (connection->agreed_pro_version >= 100) {
  BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8));
  return sizeof(struct p_header100);
 } else {
  BUILD_BUG_ON(sizeof(struct p_header80) !=
        sizeof(struct p_header95));
  BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
  return sizeof(struct p_header80);
 }
}

static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
{
 h->magic   = cpu_to_be32(DRBD_MAGIC);
 h->command = cpu_to_be16(cmd);
 h->length  = cpu_to_be16(size);
 return sizeof(struct p_header80);
}

static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
{
 h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
 h->command = cpu_to_be16(cmd);
 h->length = cpu_to_be32(size);
 return sizeof(struct p_header95);
}

static unsigned int prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
          int size, int vnr)
{
 h->magic = cpu_to_be32(DRBD_MAGIC_100);
 h->volume = cpu_to_be16(vnr);
 h->command = cpu_to_be16(cmd);
 h->length = cpu_to_be32(size);
 h->pad = 0;
 return sizeof(struct p_header100);
}

static unsigned int prepare_header(struct drbd_connection *connection, int vnr,
       void *buffer, enum drbd_packet cmd, int size)
{
 if (connection->agreed_pro_version >= 100)
  return prepare_header100(buffer, cmd, size, vnr);
 else if (connection->agreed_pro_version >= 95 &&
   size > DRBD_MAX_SIZE_H80_PACKET)
  return prepare_header95(buffer, cmd, size);
 else
  return prepare_header80(buffer, cmd, size);
}

static void *__conn_prepare_command(struct drbd_connection *connection,
        struct drbd_socket *sock)
{
 if (!sock->socket)
  return NULL;
 return sock->sbuf + drbd_header_size(connection);
}

void *conn_prepare_command(struct drbd_connection *connection, struct drbd_socket *sock)
{
 void *p;

 mutex_lock(&sock->mutex);
 p = __conn_prepare_command(connection, sock);
 if (!p)
  mutex_unlock(&sock->mutex);

 return p;
}

void *drbd_prepare_command(struct drbd_peer_device *peer_device, struct drbd_socket *sock)
{
 return conn_prepare_command(peer_device->connection, sock);
}

static int __send_command(struct drbd_connection *connection, int vnr,
     struct drbd_socket *sock, enum drbd_packet cmd,
     unsigned int header_size, void *data,
     unsigned int size)
{
 int msg_flags;
 int err;

 /*
 * Called with @data == NULL and the size of the data blocks in @size
 * for commands that send data blocks.  For those commands, omit the
 * MSG_MORE flag: this will increase the likelihood that data blocks
 * which are page aligned on the sender will end up page aligned on the
 * receiver.
 */

 msg_flags = data ? MSG_MORE : 0;

 header_size += prepare_header(connection, vnr, sock->sbuf, cmd,
          header_size + size);
 err = drbd_send_all(connection, sock->socket, sock->sbuf, header_size,
       msg_flags);
 if (data && !err)
  err = drbd_send_all(connection, sock->socket, data, size, 0);
 /* DRBD protocol "pings" are latency critical.
 * This is supposed to trigger tcp_push_pending_frames() */

 if (!err && (cmd == P_PING || cmd == P_PING_ACK))
  tcp_sock_set_nodelay(sock->socket->sk);

 return err;
}

static int __conn_send_command(struct drbd_connection *connection, struct drbd_socket *sock,
          enum drbd_packet cmd, unsigned int header_size,
          void *data, unsigned int size)
{
 return __send_command(connection, 0, sock, cmd, header_size, data, size);
}

int conn_send_command(struct drbd_connection *connection, struct drbd_socket *sock,
        enum drbd_packet cmd, unsigned int header_size,
        void *data, unsigned int size)
{
 int err;

 err = __conn_send_command(connection, sock, cmd, header_size, data, size);
 mutex_unlock(&sock->mutex);
 return err;
}

int drbd_send_command(struct drbd_peer_device *peer_device, struct drbd_socket *sock,
        enum drbd_packet cmd, unsigned int header_size,
        void *data, unsigned int size)
{
 int err;

 err = __send_command(peer_device->connection, peer_device->device->vnr,
        sock, cmd, header_size, data, size);
 mutex_unlock(&sock->mutex);
 return err;
}

int drbd_send_ping(struct drbd_connection *connection)
{
 struct drbd_socket *sock;

 sock = &connection->meta;
 if (!conn_prepare_command(connection, sock))
  return -EIO;
 return conn_send_command(connection, sock, P_PING, 0, NULL, 0);
}

int drbd_send_ping_ack(struct drbd_connection *connection)
{
 struct drbd_socket *sock;

 sock = &connection->meta;
 if (!conn_prepare_command(connection, sock))
  return -EIO;
 return conn_send_command(connection, sock, P_PING_ACK, 0, NULL, 0);
}

int drbd_send_sync_param(struct drbd_peer_device *peer_device)
{
 struct drbd_socket *sock;
 struct p_rs_param_95 *p;
 int size;
 const int apv = peer_device->connection->agreed_pro_version;
 enum drbd_packet cmd;
 struct net_conf *nc;
 struct disk_conf *dc;

 sock = &peer_device->connection->data;
 p = drbd_prepare_command(peer_device, sock);
 if (!p)
  return -EIO;

 rcu_read_lock();
 nc = rcu_dereference(peer_device->connection->net_conf);

 size = apv <= 87 ? sizeof(struct p_rs_param)
  : apv == 88 ? sizeof(struct p_rs_param)
   + strlen(nc->verify_alg) + 1
  : apv <= 94 ? sizeof(struct p_rs_param_89)
  : /* apv >= 95 */ sizeof(struct p_rs_param_95);

 cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;

 /* initialize verify_alg and csums_alg */
 BUILD_BUG_ON(sizeof(p->algs) != 2 * SHARED_SECRET_MAX);
 memset(&p->algs, 0, sizeof(p->algs));

 if (get_ldev(peer_device->device)) {
  dc = rcu_dereference(peer_device->device->ldev->disk_conf);
  p->resync_rate = cpu_to_be32(dc->resync_rate);
  p->c_plan_ahead = cpu_to_be32(dc->c_plan_ahead);
  p->c_delay_target = cpu_to_be32(dc->c_delay_target);
  p->c_fill_target = cpu_to_be32(dc->c_fill_target);
  p->c_max_rate = cpu_to_be32(dc->c_max_rate);
  put_ldev(peer_device->device);
 } else {
  p->resync_rate = cpu_to_be32(DRBD_RESYNC_RATE_DEF);
  p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
  p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
  p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
  p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
 }

 if (apv >= 88)
  strcpy(p->verify_alg, nc->verify_alg);
 if (apv >= 89)
  strcpy(p->csums_alg, nc->csums_alg);
 rcu_read_unlock();

 return drbd_send_command(peer_device, sock, cmd, size, NULL, 0);
}

int __drbd_send_protocol(struct drbd_connection *connection, enum drbd_packet cmd)
{
 struct drbd_socket *sock;
 struct p_protocol *p;
 struct net_conf *nc;
 int size, cf;

 sock = &connection->data;
 p = __conn_prepare_command(connection, sock);
 if (!p)
  return -EIO;

 rcu_read_lock();
 nc = rcu_dereference(connection->net_conf);

 if (nc->tentative && connection->agreed_pro_version < 92) {
  rcu_read_unlock();
  drbd_err(connection, "--dry-run is not supported by peer");
  return -EOPNOTSUPP;
 }

 size = sizeof(*p);
 if (connection->agreed_pro_version >= 87)
  size += strlen(nc->integrity_alg) + 1;

 p->protocol      = cpu_to_be32(nc->wire_protocol);
 p->after_sb_0p   = cpu_to_be32(nc->after_sb_0p);
 p->after_sb_1p   = cpu_to_be32(nc->after_sb_1p);
 p->after_sb_2p   = cpu_to_be32(nc->after_sb_2p);
 p->two_primaries = cpu_to_be32(nc->two_primaries);
 cf = 0;
 if (nc->discard_my_data)
  cf |= CF_DISCARD_MY_DATA;
 if (nc->tentative)
  cf |= CF_DRY_RUN;
 p->conn_flags    = cpu_to_be32(cf);

 if (connection->agreed_pro_version >= 87)
  strcpy(p->integrity_alg, nc->integrity_alg);
 rcu_read_unlock();

 return __conn_send_command(connection, sock, cmd, size, NULL, 0);
}

int drbd_send_protocol(struct drbd_connection *connection)
{
 int err;

 mutex_lock(&connection->data.mutex);
 err = __drbd_send_protocol(connection, P_PROTOCOL);
 mutex_unlock(&connection->data.mutex);

 return err;
}

static int _drbd_send_uuids(struct drbd_peer_device *peer_device, u64 uuid_flags)
{
 struct drbd_device *device = peer_device->device;
 struct drbd_socket *sock;
 struct p_uuids *p;
 int i;

 if (!get_ldev_if_state(device, D_NEGOTIATING))
  return 0;

 sock = &peer_device->connection->data;
 p = drbd_prepare_command(peer_device, sock);
 if (!p) {
  put_ldev(device);
  return -EIO;
 }
 spin_lock_irq(&device->ldev->md.uuid_lock);
 for (i = UI_CURRENT; i < UI_SIZE; i++)
  p->uuid[i] = cpu_to_be64(device->ldev->md.uuid[i]);
 spin_unlock_irq(&device->ldev->md.uuid_lock);

 device->comm_bm_set = drbd_bm_total_weight(device);
 p->uuid[UI_SIZE] = cpu_to_be64(device->comm_bm_set);
 rcu_read_lock();
 uuid_flags |= rcu_dereference(peer_device->connection->net_conf)->discard_my_data ? 1 : 0;
 rcu_read_unlock();
 uuid_flags |= test_bit(CRASHED_PRIMARY, &device->flags) ? 2 : 0;
 uuid_flags |= device->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
 p->uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);

 put_ldev(device);
 return drbd_send_command(peer_device, sock, P_UUIDS, sizeof(*p), NULL, 0);
}

int drbd_send_uuids(struct drbd_peer_device *peer_device)
{
 return _drbd_send_uuids(peer_device, 0);
}

int drbd_send_uuids_skip_initial_sync(struct drbd_peer_device *peer_device)
{
 return _drbd_send_uuids(peer_device, 8);
}

void drbd_print_uuids(struct drbd_device *device, const char *text)
{
 if (get_ldev_if_state(device, D_NEGOTIATING)) {
  u64 *uuid = device->ldev->md.uuid;
  drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX\n",
       text,
       (unsigned long long)uuid[UI_CURRENT],
       (unsigned long long)uuid[UI_BITMAP],
       (unsigned long long)uuid[UI_HISTORY_START],
       (unsigned long long)uuid[UI_HISTORY_END]);
  put_ldev(device);
 } else {
  drbd_info(device, "%s effective data uuid: %016llX\n",
    text,
    (unsigned long long)device->ed_uuid);
 }
}

void drbd_gen_and_send_sync_uuid(struct drbd_peer_device *peer_device)
{
 struct drbd_device *device = peer_device->device;
 struct drbd_socket *sock;
 struct p_rs_uuid *p;
 u64 uuid;

 D_ASSERT(device, device->state.disk == D_UP_TO_DATE);

 uuid = device->ldev->md.uuid[UI_BITMAP];
 if (uuid && uuid != UUID_JUST_CREATED)
  uuid = uuid + UUID_NEW_BM_OFFSET;
 else
  get_random_bytes(&uuid, sizeof(u64));
 drbd_uuid_set(device, UI_BITMAP, uuid);
 drbd_print_uuids(device, "updated sync UUID");
 drbd_md_sync(device);

 sock = &peer_device->connection->data;
 p = drbd_prepare_command(peer_device, sock);
 if (p) {
  p->uuid = cpu_to_be64(uuid);
  drbd_send_command(peer_device, sock, P_SYNC_UUID, sizeof(*p), NULL, 0);
 }
}

int drbd_send_sizes(struct drbd_peer_device *peer_device, int trigger_reply, enum dds_flags flags)
{
 struct drbd_device *device = peer_device->device;
 struct drbd_socket *sock;
 struct p_sizes *p;
 sector_t d_size, u_size;
 int q_order_type;
 unsigned int max_bio_size;
 unsigned int packet_size;

 sock = &peer_device->connection->data;
 p = drbd_prepare_command(peer_device, sock);
 if (!p)
  return -EIO;

 packet_size = sizeof(*p);
 if (peer_device->connection->agreed_features & DRBD_FF_WSAME)
  packet_size += sizeof(p->qlim[0]);

 memset(p, 0, packet_size);
 if (get_ldev_if_state(device, D_NEGOTIATING)) {
  struct block_device *bdev = device->ldev->backing_bdev;
  struct request_queue *q = bdev_get_queue(bdev);

  d_size = drbd_get_max_capacity(device->ldev);
  rcu_read_lock();
  u_size = rcu_dereference(device->ldev->disk_conf)->disk_size;
  rcu_read_unlock();
  q_order_type = drbd_queue_order_type(device);
  max_bio_size = queue_max_hw_sectors(q) << 9;
  max_bio_size = min(max_bio_size, DRBD_MAX_BIO_SIZE);
  p->qlim->physical_block_size =
   cpu_to_be32(bdev_physical_block_size(bdev));
  p->qlim->logical_block_size =
   cpu_to_be32(bdev_logical_block_size(bdev));
  p->qlim->alignment_offset =
   cpu_to_be32(bdev_alignment_offset(bdev));
  p->qlim->io_min = cpu_to_be32(bdev_io_min(bdev));
  p->qlim->io_opt = cpu_to_be32(bdev_io_opt(bdev));
  p->qlim->discard_enabled = !!bdev_max_discard_sectors(bdev);
  put_ldev(device);
 } else {
  struct request_queue *q = device->rq_queue;

  p->qlim->physical_block_size =
   cpu_to_be32(queue_physical_block_size(q));
  p->qlim->logical_block_size =
   cpu_to_be32(queue_logical_block_size(q));
  p->qlim->alignment_offset = 0;
  p->qlim->io_min = cpu_to_be32(queue_io_min(q));
  p->qlim->io_opt = cpu_to_be32(queue_io_opt(q));
  p->qlim->discard_enabled = 0;

  d_size = 0;
  u_size = 0;
  q_order_type = QUEUE_ORDERED_NONE;
  max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
 }

 if (peer_device->connection->agreed_pro_version <= 94)
  max_bio_size = min(max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
 else if (peer_device->connection->agreed_pro_version < 100)
  max_bio_size = min(max_bio_size, DRBD_MAX_BIO_SIZE_P95);

 p->d_size = cpu_to_be64(d_size);
 p->u_size = cpu_to_be64(u_size);
 if (trigger_reply)
  p->c_size = 0;
 else
  p->c_size = cpu_to_be64(get_capacity(device->vdisk));
 p->max_bio_size = cpu_to_be32(max_bio_size);
 p->queue_order_type = cpu_to_be16(q_order_type);
 p->dds_flags = cpu_to_be16(flags);

 return drbd_send_command(peer_device, sock, P_SIZES, packet_size, NULL, 0);
}

/**
 * drbd_send_current_state() - Sends the drbd state to the peer
 * @peer_device: DRBD peer device.
 */

int drbd_send_current_state(struct drbd_peer_device *peer_device)
{
 struct drbd_socket *sock;
 struct p_state *p;

 sock = &peer_device->connection->data;
 p = drbd_prepare_command(peer_device, sock);
 if (!p)
  return -EIO;
 p->state = cpu_to_be32(peer_device->device->state.i); /* Within the send mutex */
 return drbd_send_command(peer_device, sock, P_STATE, sizeof(*p), NULL, 0);
}

/**
 * drbd_send_state() - After a state change, sends the new state to the peer
 * @peer_device:      DRBD peer device.
 * @state:     the state to send, not necessarily the current state.
 *
 * Each state change queues an "after_state_ch" work, which will eventually
 * send the resulting new state to the peer. If more state changes happen
 * between queuing and processing of the after_state_ch work, we still
 * want to send each intermediary state in the order it occurred.
 */

int drbd_send_state(struct drbd_peer_device *peer_device, union drbd_state state)
{
 struct drbd_socket *sock;
 struct p_state *p;

 sock = &peer_device->connection->data;
 p = drbd_prepare_command(peer_device, sock);
 if (!p)
  return -EIO;
 p->state = cpu_to_be32(state.i); /* Within the send mutex */
 return drbd_send_command(peer_device, sock, P_STATE, sizeof(*p), NULL, 0);
}

int drbd_send_state_req(struct drbd_peer_device *peer_device, union drbd_state mask, union drbd_state val)
{
 struct drbd_socket *sock;
 struct p_req_state *p;

 sock = &peer_device->connection->data;
 p = drbd_prepare_command(peer_device, sock);
 if (!p)
  return -EIO;
 p->mask = cpu_to_be32(mask.i);
 p->val = cpu_to_be32(val.i);
 return drbd_send_command(peer_device, sock, P_STATE_CHG_REQ, sizeof(*p), NULL, 0);
}

int conn_send_state_req(struct drbd_connection *connection, union drbd_state mask, union drbd_state val)
{
 enum drbd_packet cmd;
 struct drbd_socket *sock;
 struct p_req_state *p;

 cmd = connection->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_ST_CHG_REQ;
 sock = &connection->data;
 p = conn_prepare_command(connection, sock);
 if (!p)
  return -EIO;
 p->mask = cpu_to_be32(mask.i);
 p->val = cpu_to_be32(val.i);
 return conn_send_command(connection, sock, cmd, sizeof(*p), NULL, 0);
}

void drbd_send_sr_reply(struct drbd_peer_device *peer_device, enum drbd_state_rv retcode)
{
 struct drbd_socket *sock;
 struct p_req_state_reply *p;

 sock = &peer_device->connection->meta;
 p = drbd_prepare_command(peer_device, sock);
 if (p) {
  p->retcode = cpu_to_be32(retcode);
  drbd_send_command(peer_device, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL, 0);
 }
}

void conn_send_sr_reply(struct drbd_connection *connection, enum drbd_state_rv retcode)
{
 struct drbd_socket *sock;
 struct p_req_state_reply *p;
 enum drbd_packet cmd = connection->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;

 sock = &connection->meta;
 p = conn_prepare_command(connection, sock);
 if (p) {
  p->retcode = cpu_to_be32(retcode);
  conn_send_command(connection, sock, cmd, sizeof(*p), NULL, 0);
 }
}

static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
{
 BUG_ON(code & ~0xf);
 p->encoding = (p->encoding & ~0xf) | code;
}

static void dcbp_set_start(struct p_compressed_bm *p, int set)
{
 p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
}

static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
{
 BUG_ON(n & ~0x7);
 p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
}

static int fill_bitmap_rle_bits(struct drbd_device *device,
    struct p_compressed_bm *p,
    unsigned int size,
    struct bm_xfer_ctx *c)
{
 struct bitstream bs;
 unsigned long plain_bits;
 unsigned long tmp;
 unsigned long rl;
 unsigned len;
 unsigned toggle;
 int bits, use_rle;

 /* may we use this feature? */
 rcu_read_lock();
 use_rle = rcu_dereference(first_peer_device(device)->connection->net_conf)->use_rle;
 rcu_read_unlock();
 if (!use_rle || first_peer_device(device)->connection->agreed_pro_version < 90)
  return 0;

 if (c->bit_offset >= c->bm_bits)
  return 0; /* nothing to do. */

 /* use at most thus many bytes */
 bitstream_init(&bs, p->code, size, 0);
 memset(p->code, 0, size);
 /* plain bits covered in this code string */
 plain_bits = 0;

 /* p->encoding & 0x80 stores whether the first run length is set.
 * bit offset is implicit.
 * start with toggle == 2 to be able to tell the first iteration */

 toggle = 2;

 /* see how much plain bits we can stuff into one packet
 * using RLE and VLI. */

 do {
  tmp = (toggle == 0) ? _drbd_bm_find_next_zero(device, c->bit_offset)
        : _drbd_bm_find_next(device, c->bit_offset);
  if (tmp == -1UL)
   tmp = c->bm_bits;
  rl = tmp - c->bit_offset;

  if (toggle == 2) { /* first iteration */
   if (rl == 0) {
    /* the first checked bit was set,
 * store start value, */

    dcbp_set_start(p, 1);
    /* but skip encoding of zero run length */
    toggle = !toggle;
    continue;
   }
   dcbp_set_start(p, 0);
  }

  /* paranoia: catch zero runlength.
 * can only happen if bitmap is modified while we scan it. */

  if (rl == 0) {
   drbd_err(device, "unexpected zero runlength while encoding bitmap "
       "t:%u bo:%lu\n", toggle, c->bit_offset);
   return -1;
  }

  bits = vli_encode_bits(&bs, rl);
  if (bits == -ENOBUFS) /* buffer full */
   break;
  if (bits <= 0) {
   drbd_err(device, "error while encoding bitmap: %d\n", bits);
   return 0;
  }

  toggle = !toggle;
  plain_bits += rl;
  c->bit_offset = tmp;
 } while (c->bit_offset < c->bm_bits);

 len = bs.cur.b - p->code + !!bs.cur.bit;

 if (plain_bits < (len << 3)) {
  /* incompressible with this method.
 * we need to rewind both word and bit position. */

  c->bit_offset -= plain_bits;
  bm_xfer_ctx_bit_to_word_offset(c);
  c->bit_offset = c->word_offset * BITS_PER_LONG;
  return 0;
 }

 /* RLE + VLI was able to compress it just fine.
 * update c->word_offset. */

 bm_xfer_ctx_bit_to_word_offset(c);

 /* store pad_bits */
 dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);

 return len;
}

/*
 * send_bitmap_rle_or_plain
 *
 * Return 0 when done, 1 when another iteration is needed, and a negative error
 * code upon failure.
 */

static int
send_bitmap_rle_or_plain(struct drbd_peer_device *peer_device, struct bm_xfer_ctx *c)
{
 struct drbd_device *device = peer_device->device;
 struct drbd_socket *sock = &peer_device->connection->data;
 unsigned int header_size = drbd_header_size(peer_device->connection);
 struct p_compressed_bm *p = sock->sbuf + header_size;
 int len, err;

 len = fill_bitmap_rle_bits(device, p,
   DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c);
 if (len < 0)
  return -EIO;

 if (len) {
  dcbp_set_code(p, RLE_VLI_Bits);
  err = __send_command(peer_device->connection, device->vnr, sock,
         P_COMPRESSED_BITMAP, sizeof(*p) + len,
         NULL, 0);
  c->packets[0]++;
  c->bytes[0] += header_size + sizeof(*p) + len;

  if (c->bit_offset >= c->bm_bits)
   len = 0; /* DONE */
 } else {
  /* was not compressible.
 * send a buffer full of plain text bits instead. */

  unsigned int data_size;
  unsigned long num_words;
  unsigned long *p = sock->sbuf + header_size;

  data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
  num_words = min_t(size_t, data_size / sizeof(*p),
      c->bm_words - c->word_offset);
  len = num_words * sizeof(*p);
  if (len)
   drbd_bm_get_lel(device, c->word_offset, num_words, p);
  err = __send_command(peer_device->connection, device->vnr, sock, P_BITMAP,
         len, NULL, 0);
  c->word_offset += num_words;
  c->bit_offset = c->word_offset * BITS_PER_LONG;

  c->packets[1]++;
  c->bytes[1] += header_size + len;

  if (c->bit_offset > c->bm_bits)
   c->bit_offset = c->bm_bits;
 }
 if (!err) {
  if (len == 0) {
   INFO_bm_xfer_stats(peer_device, "send", c);
   return 0;
  } else
   return 1;
 }
 return -EIO;
}

/* See the comment at receive_bitmap() */
static int _drbd_send_bitmap(struct drbd_device *device,
       struct drbd_peer_device *peer_device)
{
 struct bm_xfer_ctx c;
 int err;

 if (!expect(device, device->bitmap))
  return false;

 if (get_ldev(device)) {
  if (drbd_md_test_flag(device->ldev, MDF_FULL_SYNC)) {
   drbd_info(device, "Writing the whole bitmap, MDF_FullSync was set.\n");
   drbd_bm_set_all(device);
   if (drbd_bm_write(device, peer_device)) {
    /* write_bm did fail! Leave full sync flag set in Meta P_DATA
 * but otherwise process as per normal - need to tell other
 * side that a full resync is required! */

    drbd_err(device, "Failed to write bitmap to disk!\n");
   } else {
    drbd_md_clear_flag(device, MDF_FULL_SYNC);
    drbd_md_sync(device);
   }
  }
  put_ldev(device);
 }

 c = (struct bm_xfer_ctx) {
  .bm_bits = drbd_bm_bits(device),
  .bm_words = drbd_bm_words(device),
 };

 do {
  err = send_bitmap_rle_or_plain(peer_device, &c);
 } while (err > 0);

 return err == 0;
}

int drbd_send_bitmap(struct drbd_device *device, struct drbd_peer_device *peer_device)
{
 struct drbd_socket *sock = &peer_device->connection->data;
 int err = -1;

 mutex_lock(&sock->mutex);
 if (sock->socket)
  err = !_drbd_send_bitmap(device, peer_device);
 mutex_unlock(&sock->mutex);
 return err;
}

void drbd_send_b_ack(struct drbd_connection *connection, u32 barrier_nr, u32 set_size)
{
 struct drbd_socket *sock;
 struct p_barrier_ack *p;

 if (connection->cstate < C_WF_REPORT_PARAMS)
  return;

 sock = &connection->meta;
 p = conn_prepare_command(connection, sock);
 if (!p)
  return;
 p->barrier = barrier_nr;
 p->set_size = cpu_to_be32(set_size);
 conn_send_command(connection, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
}

/**
 * _drbd_send_ack() - Sends an ack packet
 * @peer_device: DRBD peer device.
 * @cmd: Packet command code.
 * @sector: sector, needs to be in big endian byte order
 * @blksize: size in byte, needs to be in big endian byte order
 * @block_id: Id, big endian byte order
 */

static int _drbd_send_ack(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
     u64 sector, u32 blksize, u64 block_id)
{
 struct drbd_socket *sock;
 struct p_block_ack *p;

 if (peer_device->device->state.conn < C_CONNECTED)
  return -EIO;

 sock = &peer_device->connection->meta;
 p = drbd_prepare_command(peer_device, sock);
 if (!p)
  return -EIO;
 p->sector = sector;
 p->block_id = block_id;
 p->blksize = blksize;
 p->seq_num = cpu_to_be32(atomic_inc_return(&peer_device->device->packet_seq));
 return drbd_send_command(peer_device, sock, cmd, sizeof(*p), NULL, 0);
}

/* dp->sector and dp->block_id already/still in network byte order,
 * data_size is payload size according to dp->head,
 * and may need to be corrected for digest size. */

void drbd_send_ack_dp(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
        struct p_data *dp, int data_size)
{
 if (peer_device->connection->peer_integrity_tfm)
  data_size -= crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
 _drbd_send_ack(peer_device, cmd, dp->sector, cpu_to_be32(data_size),
         dp->block_id);
}

void drbd_send_ack_rp(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
        struct p_block_req *rp)
{
 _drbd_send_ack(peer_device, cmd, rp->sector, rp->blksize, rp->block_id);
}

/**
 * drbd_send_ack() - Sends an ack packet
 * @peer_device: DRBD peer device
 * @cmd: packet command code
 * @peer_req: peer request
 */

int drbd_send_ack(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
    struct drbd_peer_request *peer_req)
{
 return _drbd_send_ack(peer_device, cmd,
         cpu_to_be64(peer_req->i.sector),
         cpu_to_be32(peer_req->i.size),
         peer_req->block_id);
}

/* This function misuses the block_id field to signal if the blocks
 * are is sync or not. */

int drbd_send_ack_ex(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
       sector_t sector, int blksize, u64 block_id)
{
 return _drbd_send_ack(peer_device, cmd,
         cpu_to_be64(sector),
         cpu_to_be32(blksize),
         cpu_to_be64(block_id));
}

int drbd_send_rs_deallocated(struct drbd_peer_device *peer_device,
        struct drbd_peer_request *peer_req)
{
 struct drbd_socket *sock;
 struct p_block_desc *p;

 sock = &peer_device->connection->data;
 p = drbd_prepare_command(peer_device, sock);
 if (!p)
  return -EIO;
 p->sector = cpu_to_be64(peer_req->i.sector);
 p->blksize = cpu_to_be32(peer_req->i.size);
 p->pad = 0;
 return drbd_send_command(peer_device, sock, P_RS_DEALLOCATED, sizeof(*p), NULL, 0);
}

int drbd_send_drequest(struct drbd_peer_device *peer_device, int cmd,
         sector_t sector, int size, u64 block_id)
{
 struct drbd_socket *sock;
 struct p_block_req *p;

 sock = &peer_device->connection->data;
 p = drbd_prepare_command(peer_device, sock);
 if (!p)
  return -EIO;
 p->sector = cpu_to_be64(sector);
 p->block_id = block_id;
 p->blksize = cpu_to_be32(size);
 return drbd_send_command(peer_device, sock, cmd, sizeof(*p), NULL, 0);
}

int drbd_send_drequest_csum(struct drbd_peer_device *peer_device, sector_t sector, int size,
       void *digest, int digest_size, enum drbd_packet cmd)
{
 struct drbd_socket *sock;
 struct p_block_req *p;

 /* FIXME: Put the digest into the preallocated socket buffer.  */

 sock = &peer_device->connection->data;
 p = drbd_prepare_command(peer_device, sock);
 if (!p)
  return -EIO;
 p->sector = cpu_to_be64(sector);
 p->block_id = ID_SYNCER /* unused */;
 p->blksize = cpu_to_be32(size);
 return drbd_send_command(peer_device, sock, cmd, sizeof(*p), digest, digest_size);
}

int drbd_send_ov_request(struct drbd_peer_device *peer_device, sector_t sector, int size)
{
 struct drbd_socket *sock;
 struct p_block_req *p;

 sock = &peer_device->connection->data;
 p = drbd_prepare_command(peer_device, sock);
 if (!p)
  return -EIO;
 p->sector = cpu_to_be64(sector);
 p->block_id = ID_SYNCER /* unused */;
 p->blksize = cpu_to_be32(size);
 return drbd_send_command(peer_device, sock, P_OV_REQUEST, sizeof(*p), NULL, 0);
}

/* called on sndtimeo
 * returns false if we should retry,
 * true if we think connection is dead
 */

static int we_should_drop_the_connection(struct drbd_connection *connection, struct socket *sock)
{
 int drop_it;
 /* long elapsed = (long)(jiffies - device->last_received); */

 drop_it =   connection->meta.socket == sock
  || !connection->ack_receiver.task
  || get_t_state(&connection->ack_receiver) != RUNNING
  || connection->cstate < C_WF_REPORT_PARAMS;

 if (drop_it)
  return true;

 drop_it = !--connection->ko_count;
 if (!drop_it) {
  drbd_err(connection, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
    current->comm, current->pid, connection->ko_count);
  request_ping(connection);
 }

 return drop_it; /* && (device->state == R_PRIMARY) */;
}

static void drbd_update_congested(struct drbd_connection *connection)
{
 struct sock *sk = connection->data.socket->sk;
 if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
  set_bit(NET_CONGESTED, &connection->flags);
}

/* The idea of sendpage seems to be to put some kind of reference
 * to the page into the skb, and to hand it over to the NIC. In
 * this process get_page() gets called.
 *
 * As soon as the page was really sent over the network put_page()
 * gets called by some part of the network layer. [ NIC driver? ]
 *
 * [ get_page() / put_page() increment/decrement the count. If count
 *   reaches 0 the page will be freed. ]
 *
 * This works nicely with pages from FSs.
 * But this means that in protocol A we might signal IO completion too early!
 *
 * In order not to corrupt data during a resync we must make sure
 * that we do not reuse our own buffer pages (EEs) to early, therefore
 * we have the net_ee list.
 *
 * XFS seems to have problems, still, it submits pages with page_count == 0!
 * As a workaround, we disable sendpage on pages
 * with page_count == 0 or PageSlab.
 */

static int _drbd_no_send_page(struct drbd_peer_device *peer_device, struct page *page,
         int offset, size_t size, unsigned msg_flags)
{
 struct socket *socket;
 void *addr;
 int err;

 socket = peer_device->connection->data.socket;
 addr = kmap(page) + offset;
 err = drbd_send_all(peer_device->connection, socket, addr, size, msg_flags);
 kunmap(page);
 if (!err)
  peer_device->device->send_cnt += size >> 9;
 return err;
}

static int _drbd_send_page(struct drbd_peer_device *peer_device, struct page *page,
      int offset, size_t size, unsigned msg_flags)
{
 struct socket *socket = peer_device->connection->data.socket;
 struct msghdr msg = { .msg_flags = msg_flags, };
 struct bio_vec bvec;
 int len = size;
 int err = -EIO;

 /* e.g. XFS meta- & log-data is in slab pages, which have a
 * page_count of 0 and/or have PageSlab() set.
 * we cannot use send_page for those, as that does get_page();
 * put_page(); and would cause either a VM_BUG directly, or
 * __page_cache_release a page that would actually still be referenced
 * by someone, leading to some obscure delayed Oops somewhere else. */

 if (!drbd_disable_sendpage && sendpages_ok(page, len, offset))
  msg.msg_flags |= MSG_NOSIGNAL | MSG_SPLICE_PAGES;

 drbd_update_congested(peer_device->connection);
 do {
  int sent;

  bvec_set_page(&bvec, page, len, offset);
  iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, len);

  sent = sock_sendmsg(socket, &msg);
  if (sent <= 0) {
   if (sent == -EAGAIN) {
    if (we_should_drop_the_connection(peer_device->connection, socket))
     break;
    continue;
   }
   drbd_warn(peer_device->device, "%s: size=%d len=%d sent=%d\n",
        __func__, (int)size, len, sent);
   if (sent < 0)
    err = sent;
   break;
  }
  len    -= sent;
  offset += sent;
 } while (len > 0 /* THINK && device->cstate >= C_CONNECTED*/);
 clear_bit(NET_CONGESTED, &peer_device->connection->flags);

 if (len == 0) {
  err = 0;
  peer_device->device->send_cnt += size >> 9;
 }
 return err;
}

static int _drbd_send_bio(struct drbd_peer_device *peer_device, struct bio *bio)
{
 struct bio_vec bvec;
 struct bvec_iter iter;

 /* hint all but last page with MSG_MORE */
 bio_for_each_segment(bvec, bio, iter) {
  int err;

  err = _drbd_no_send_page(peer_device, bvec.bv_page,
      bvec.bv_offset, bvec.bv_len,
      bio_iter_last(bvec, iter)
      ? 0 : MSG_MORE);
  if (err)
   return err;
 }
 return 0;
}

static int _drbd_send_zc_bio(struct drbd_peer_device *peer_device, struct bio *bio)
{
 struct bio_vec bvec;
 struct bvec_iter iter;

 /* hint all but last page with MSG_MORE */
 bio_for_each_segment(bvec, bio, iter) {
  int err;

  err = _drbd_send_page(peer_device, bvec.bv_page,
          bvec.bv_offset, bvec.bv_len,
          bio_iter_last(bvec, iter) ? 0 : MSG_MORE);
  if (err)
   return err;
 }
 return 0;
}

static int _drbd_send_zc_ee(struct drbd_peer_device *peer_device,
       struct drbd_peer_request *peer_req)
{
 bool use_sendpage = !(peer_req->flags & EE_RELEASE_TO_MEMPOOL);
 struct page *page = peer_req->pages;
 unsigned len = peer_req->i.size;
 int err;

 /* hint all but last page with MSG_MORE */
 page_chain_for_each(page) {
  unsigned l = min_t(unsigned, len, PAGE_SIZE);

  if (likely(use_sendpage))
   err = _drbd_send_page(peer_device, page, 0, l,
           page_chain_next(page) ? MSG_MORE : 0);
  else
   err = _drbd_no_send_page(peer_device, page, 0, l,
       page_chain_next(page) ? MSG_MORE : 0);

  if (err)
   return err;
  len -= l;
 }
 return 0;
}

static u32 bio_flags_to_wire(struct drbd_connection *connection,
        struct bio *bio)
{
 if (connection->agreed_pro_version >= 95)
  return  (bio->bi_opf & REQ_SYNC ? DP_RW_SYNC : 0) |
   (bio->bi_opf & REQ_FUA ? DP_FUA : 0) |
   (bio->bi_opf & REQ_PREFLUSH ? DP_FLUSH : 0) |
   (bio_op(bio) == REQ_OP_DISCARD ? DP_DISCARD : 0) |
   (bio_op(bio) == REQ_OP_WRITE_ZEROES ?
     ((connection->agreed_features & DRBD_FF_WZEROES) ?
      (DP_ZEROES |(!(bio->bi_opf & REQ_NOUNMAP) ? DP_DISCARD : 0))
      : DP_DISCARD)
   : 0);
 else
  return bio->bi_opf & REQ_SYNC ? DP_RW_SYNC : 0;
}

/* Used to send write or TRIM aka REQ_OP_DISCARD requests
 * R_PRIMARY -> Peer (P_DATA, P_TRIM)
 */

int drbd_send_dblock(struct drbd_peer_device *peer_device, struct drbd_request *req)
{
 struct drbd_device *device = peer_device->device;
 struct drbd_socket *sock;
 struct p_data *p;
 void *digest_out;
 unsigned int dp_flags = 0;
 int digest_size;
 int err;

 sock = &peer_device->connection->data;
 p = drbd_prepare_command(peer_device, sock);
 digest_size = peer_device->connection->integrity_tfm ?
        crypto_shash_digestsize(peer_device->connection->integrity_tfm) : 0;

 if (!p)
  return -EIO;
 p->sector = cpu_to_be64(req->i.sector);
 p->block_id = (unsigned long)req;
 p->seq_num = cpu_to_be32(atomic_inc_return(&device->packet_seq));
 dp_flags = bio_flags_to_wire(peer_device->connection, req->master_bio);
 if (device->state.conn >= C_SYNC_SOURCE &&
     device->state.conn <= C_PAUSED_SYNC_T)
  dp_flags |= DP_MAY_SET_IN_SYNC;
 if (peer_device->connection->agreed_pro_version >= 100) {
  if (req->rq_state & RQ_EXP_RECEIVE_ACK)
   dp_flags |= DP_SEND_RECEIVE_ACK;
  /* During resync, request an explicit write ack,
 * even in protocol != C */

  if (req->rq_state & RQ_EXP_WRITE_ACK
  || (dp_flags & DP_MAY_SET_IN_SYNC))
   dp_flags |= DP_SEND_WRITE_ACK;
 }
 p->dp_flags = cpu_to_be32(dp_flags);

 if (dp_flags & (DP_DISCARD|DP_ZEROES)) {
  enum drbd_packet cmd = (dp_flags & DP_ZEROES) ? P_ZEROES : P_TRIM;
  struct p_trim *t = (struct p_trim*)p;
  t->size = cpu_to_be32(req->i.size);
  err = __send_command(peer_device->connection, device->vnr, sock, cmd, sizeof(*t), NULL, 0);
  goto out;
 }
 digest_out = p + 1;

 /* our digest is still only over the payload.
 * TRIM does not carry any payload. */

 if (digest_size)
  drbd_csum_bio(peer_device->connection->integrity_tfm, req->master_bio, digest_out);
 err = __send_command(peer_device->connection, device->vnr, sock, P_DATA,
        sizeof(*p) + digest_size, NULL, req->i.size);
 if (!err) {
  /* For protocol A, we have to memcpy the payload into
 * socket buffers, as we may complete right away
 * as soon as we handed it over to tcp, at which point the data
 * pages may become invalid.
 *
 * For data-integrity enabled, we copy it as well, so we can be
 * sure that even if the bio pages may still be modified, it
 * won't change the data on the wire, thus if the digest checks
 * out ok after sending on this side, but does not fit on the
 * receiving side, we sure have detected corruption elsewhere.
 */

  if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || digest_size)
   err = _drbd_send_bio(peer_device, req->master_bio);
  else
   err = _drbd_send_zc_bio(peer_device, req->master_bio);

  /* double check digest, sometimes buffers have been modified in flight. */
  if (digest_size > 0 && digest_size <= 64) {
   /* 64 byte, 512 bit, is the largest digest size
 * currently supported in kernel crypto. */

   unsigned char digest[64];
   drbd_csum_bio(peer_device->connection->integrity_tfm, req->master_bio, digest);
   if (memcmp(p + 1, digest, digest_size)) {
    drbd_warn(device,
     "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
     (unsigned long long)req->i.sector, req->i.size);
   }
  } /* else if (digest_size > 64) {
     ... Be noisy about digest too large ...
} */

 }
out:
 mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */

 return err;
}

/* answer packet, used to send data back for read requests:
 *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
 *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
 */

int drbd_send_block(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
      struct drbd_peer_request *peer_req)
{
 struct drbd_device *device = peer_device->device;
 struct drbd_socket *sock;
 struct p_data *p;
 int err;
 int digest_size;

 sock = &peer_device->connection->data;
 p = drbd_prepare_command(peer_device, sock);

 digest_size = peer_device->connection->integrity_tfm ?
        crypto_shash_digestsize(peer_device->connection->integrity_tfm) : 0;

 if (!p)
  return -EIO;
 p->sector = cpu_to_be64(peer_req->i.sector);
 p->block_id = peer_req->block_id;
 p->seq_num = 0;  /* unused */
 p->dp_flags = 0;
 if (digest_size)
  drbd_csum_ee(peer_device->connection->integrity_tfm, peer_req, p + 1);
 err = __send_command(peer_device->connection, device->vnr, sock, cmd, sizeof(*p) + digest_size, NULL, peer_req->i.size);
 if (!err)
  err = _drbd_send_zc_ee(peer_device, peer_req);
 mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */

 return err;
}

int drbd_send_out_of_sync(struct drbd_peer_device *peer_device, struct drbd_request *req)
{
 struct drbd_socket *sock;
 struct p_block_desc *p;

 sock = &peer_device->connection->data;
 p = drbd_prepare_command(peer_device, sock);
 if (!p)
  return -EIO;
 p->sector = cpu_to_be64(req->i.sector);
 p->blksize = cpu_to_be32(req->i.size);
 return drbd_send_command(peer_device, sock, P_OUT_OF_SYNC, sizeof(*p), NULL, 0);
}

/*
  drbd_send distinguishes two cases:

  Packets sent via the data socket "sock"
  and packets sent via the meta data socket "msock"

    sock                      msock
  -----------------+-------------------------+------------------------------
  timeout           conf.timeout / 2          conf.timeout / 2
  timeout action    send a ping via msock     Abort communication
      and close all sockets
*/


/*
 * you must have down()ed the appropriate [m]sock_mutex elsewhere!
 */

int drbd_send(struct drbd_connection *connection, struct socket *sock,
       void *buf, size_t size, unsigned msg_flags)
{
 struct kvec iov = {.iov_base = buf, .iov_len = size};
 struct msghdr msg = {.msg_flags = msg_flags | MSG_NOSIGNAL};
 int rv, sent = 0;

 if (!sock)
  return -EBADR;

 /* THINK  if (signal_pending) return ... ? */

 iov_iter_kvec(&msg.msg_iter, ITER_SOURCE, &iov, 1, size);

 if (sock == connection->data.socket) {
  rcu_read_lock();
  connection->ko_count = rcu_dereference(connection->net_conf)->ko_count;
  rcu_read_unlock();
  drbd_update_congested(connection);
 }
 do {
  rv = sock_sendmsg(sock, &msg);
  if (rv == -EAGAIN) {
   if (we_should_drop_the_connection(connection, sock))
    break;
   else
    continue;
  }
  if (rv == -EINTR) {
   flush_signals(current);
   rv = 0;
  }
  if (rv < 0)
   break;
  sent += rv;
 } while (sent < size);

 if (sock == connection->data.socket)
  clear_bit(NET_CONGESTED, &connection->flags);

 if (rv <= 0) {
  if (rv != -EAGAIN) {
   drbd_err(connection, "%s_sendmsg returned %d\n",
     sock == connection->meta.socket ? "msock" : "sock",
     rv);
   conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
  } else
   conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
 }

 return sent;
}

/*
 * drbd_send_all  -  Send an entire buffer
 *
 * Returns 0 upon success and a negative error value otherwise.
 */

int drbd_send_all(struct drbd_connection *connection, struct socket *sock, void *buffer,
    size_t size, unsigned msg_flags)
{
 int err;

 err = drbd_send(connection, sock, buffer, size, msg_flags);
 if (err < 0)
  return err;
 if (err != size)
  return -EIO;
 return 0;
}

static int drbd_open(struct gendisk *disk, blk_mode_t mode)
{
 struct drbd_device *device = disk->private_data;
 unsigned long flags;
 int rv = 0;

 mutex_lock(&drbd_main_mutex);
 spin_lock_irqsave(&device->resource->req_lock, flags);
 /* to have a stable device->state.role
 * and no race with updating open_cnt */


 if (device->state.role != R_PRIMARY) {
  if (mode & BLK_OPEN_WRITE)
   rv = -EROFS;
  else if (!drbd_allow_oos)
   rv = -EMEDIUMTYPE;
 }

 if (!rv)
  device->open_cnt++;
 spin_unlock_irqrestore(&device->resource->req_lock, flags);
 mutex_unlock(&drbd_main_mutex);

 return rv;
}

static void drbd_release(struct gendisk *gd)
{
 struct drbd_device *device = gd->private_data;

 mutex_lock(&drbd_main_mutex);
 device->open_cnt--;
 mutex_unlock(&drbd_main_mutex);
}

/* need to hold resource->req_lock */
void drbd_queue_unplug(struct drbd_device *device)
{
 if (device->state.pdsk >= D_INCONSISTENT && device->state.conn >= C_CONNECTED) {
  D_ASSERT(device, device->state.role == R_PRIMARY);
  if (test_and_clear_bit(UNPLUG_REMOTE, &device->flags)) {
   drbd_queue_work_if_unqueued(
    &first_peer_device(device)->connection->sender_work,
    &device->unplug_work);
  }
 }
}

static void drbd_set_defaults(struct drbd_device *device)
{
 /* Beware! The actual layout differs
 * between big endian and little endian */

 device->state = (union drbd_dev_state) {
  { .role = R_SECONDARY,
    .peer = R_UNKNOWN,
    .conn = C_STANDALONE,
    .disk = D_DISKLESS,
    .pdsk = D_UNKNOWN,
  } };
}

void drbd_init_set_defaults(struct drbd_device *device)
{
 /* the memset(,0,) did most of this.
 * note: only assignments, no allocation in here */


 drbd_set_defaults(device);

 atomic_set(&device->ap_bio_cnt, 0);
 atomic_set(&device->ap_actlog_cnt, 0);
 atomic_set(&device->ap_pending_cnt, 0);
 atomic_set(&device->rs_pending_cnt, 0);
 atomic_set(&device->unacked_cnt, 0);
 atomic_set(&device->local_cnt, 0);
 atomic_set(&device->pp_in_use_by_net, 0);
 atomic_set(&device->rs_sect_in, 0);
 atomic_set(&device->rs_sect_ev, 0);
 atomic_set(&device->ap_in_flight, 0);
 atomic_set(&device->md_io.in_use, 0);

 mutex_init(&device->own_state_mutex);
 device->state_mutex = &device->own_state_mutex;

 spin_lock_init(&device->al_lock);
 spin_lock_init(&device->peer_seq_lock);

 INIT_LIST_HEAD(&device->active_ee);
 INIT_LIST_HEAD(&device->sync_ee);
 INIT_LIST_HEAD(&device->done_ee);
 INIT_LIST_HEAD(&device->read_ee);
 INIT_LIST_HEAD(&device->resync_reads);
 INIT_LIST_HEAD(&device->resync_work.list);
 INIT_LIST_HEAD(&device->unplug_work.list);
 INIT_LIST_HEAD(&device->bm_io_work.w.list);
 INIT_LIST_HEAD(&device->pending_master_completion[0]);
 INIT_LIST_HEAD(&device->pending_master_completion[1]);
 INIT_LIST_HEAD(&device->pending_completion[0]);
 INIT_LIST_HEAD(&device->pending_completion[1]);

 device->resync_work.cb  = w_resync_timer;
 device->unplug_work.cb  = w_send_write_hint;
 device->bm_io_work.w.cb = w_bitmap_io;

 timer_setup(&device->resync_timer, resync_timer_fn, 0);
 timer_setup(&device->md_sync_timer, md_sync_timer_fn, 0);
 timer_setup(&device->start_resync_timer, start_resync_timer_fn, 0);
 timer_setup(&device->request_timer, request_timer_fn, 0);

 init_waitqueue_head(&device->misc_wait);
 init_waitqueue_head(&device->state_wait);
 init_waitqueue_head(&device->ee_wait);
 init_waitqueue_head(&device->al_wait);
 init_waitqueue_head(&device->seq_wait);

 device->resync_wenr = LC_FREE;
 device->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
 device->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
}

void drbd_set_my_capacity(struct drbd_device *device, sector_t size)
{
 char ppb[10];

 set_capacity_and_notify(device->vdisk, size);

 drbd_info(device, "size = %s (%llu KB)\n",
  ppsize(ppb, size>>1), (unsigned long long)size>>1);
}

void drbd_device_cleanup(struct drbd_device *device)
{
 int i;
 if (first_peer_device(device)->connection->receiver.t_state != NONE)
  drbd_err(device, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
    first_peer_device(device)->connection->receiver.t_state);

 device->al_writ_cnt  =
 device->bm_writ_cnt  =
 device->read_cnt     =
 device->recv_cnt     =
 device->send_cnt     =
 device->writ_cnt     =
 device->p_size       =
 device->rs_start     =
 device->rs_total     =
 device->rs_failed    = 0;
 device->rs_last_events = 0;
 device->rs_last_sect_ev = 0;
 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
  device->rs_mark_left[i] = 0;
  device->rs_mark_time[i] = 0;
 }
 D_ASSERT(device, first_peer_device(device)->connection->net_conf == NULL);

 set_capacity_and_notify(device->vdisk, 0);
 if (device->bitmap) {
  /* maybe never allocated. */
  drbd_bm_resize(device, 0, 1);
  drbd_bm_cleanup(device);
 }

 drbd_backing_dev_free(device, device->ldev);
 device->ldev = NULL;

 clear_bit(AL_SUSPENDED, &device->flags);

 D_ASSERT(device, list_empty(&device->active_ee));
 D_ASSERT(device, list_empty(&device->sync_ee));
 D_ASSERT(device, list_empty(&device->done_ee));
 D_ASSERT(device, list_empty(&device->read_ee));
 D_ASSERT(device, list_empty(&device->resync_reads));
 D_ASSERT(device, list_empty(&first_peer_device(device)->connection->sender_work.q));
 D_ASSERT(device, list_empty(&device->resync_work.list));
 D_ASSERT(device, list_empty(&device->unplug_work.list));

 drbd_set_defaults(device);
}


static void drbd_destroy_mempools(void)
{
 /* D_ASSERT(device, atomic_read(&drbd_pp_vacant)==0); */

 bioset_exit(&drbd_io_bio_set);
 bioset_exit(&drbd_md_io_bio_set);
 mempool_exit(&drbd_buffer_page_pool);
 mempool_exit(&drbd_md_io_page_pool);
 mempool_exit(&drbd_ee_mempool);
 mempool_exit(&drbd_request_mempool);
 kmem_cache_destroy(drbd_ee_cache);
 kmem_cache_destroy(drbd_request_cache);
 kmem_cache_destroy(drbd_bm_ext_cache);
 kmem_cache_destroy(drbd_al_ext_cache);

 drbd_ee_cache        = NULL;
 drbd_request_cache   = NULL;
 drbd_bm_ext_cache    = NULL;
 drbd_al_ext_cache    = NULL;

 return;
}

static int drbd_create_mempools(void)
{
 const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count;
 int ret;

 /* caches */
 drbd_request_cache = kmem_cache_create(
  "drbd_req"sizeof(struct drbd_request), 0, 0, NULL);
 if (drbd_request_cache == NULL)
  goto Enomem;

 drbd_ee_cache = kmem_cache_create(
  "drbd_ee"sizeof(struct drbd_peer_request), 0, 0, NULL);
 if (drbd_ee_cache == NULL)
  goto Enomem;

 drbd_bm_ext_cache = kmem_cache_create(
  "drbd_bm"sizeof(struct bm_extent), 0, 0, NULL);
 if (drbd_bm_ext_cache == NULL)
  goto Enomem;

 drbd_al_ext_cache = kmem_cache_create(
  "drbd_al"sizeof(struct lc_element), 0, 0, NULL);
 if (drbd_al_ext_cache == NULL)
  goto Enomem;

 /* mempools */
 ret = bioset_init(&drbd_io_bio_set, BIO_POOL_SIZE, 0, 0);
 if (ret)
  goto Enomem;

 ret = bioset_init(&drbd_md_io_bio_set, DRBD_MIN_POOL_PAGES, 0,
     BIOSET_NEED_BVECS);
 if (ret)
  goto Enomem;

 ret = mempool_init_page_pool(&drbd_md_io_page_pool, DRBD_MIN_POOL_PAGES, 0);
 if (ret)
  goto Enomem;

 ret = mempool_init_page_pool(&drbd_buffer_page_pool, number, 0);
 if (ret)
  goto Enomem;

 ret = mempool_init_slab_pool(&drbd_request_mempool, number,
         drbd_request_cache);
 if (ret)
  goto Enomem;

 ret = mempool_init_slab_pool(&drbd_ee_mempool, number, drbd_ee_cache);
 if (ret)
  goto Enomem;

 return 0;

Enomem:
 drbd_destroy_mempools(); /* in case we allocated some */
 return -ENOMEM;
}

static void drbd_release_all_peer_reqs(struct drbd_device *device)
{
 int rr;

 rr = drbd_free_peer_reqs(device, &device->active_ee);
 if (rr)
  drbd_err(device, "%d EEs in active list found!\n", rr);

 rr = drbd_free_peer_reqs(device, &device->sync_ee);
 if (rr)
  drbd_err(device, "%d EEs in sync list found!\n", rr);

 rr = drbd_free_peer_reqs(device, &device->read_ee);
 if (rr)
  drbd_err(device, "%d EEs in read list found!\n", rr);

 rr = drbd_free_peer_reqs(device, &device->done_ee);
 if (rr)
  drbd_err(device, "%d EEs in done list found!\n", rr);
}

/* caution. no locking. */
void drbd_destroy_device(struct kref *kref)
{
 struct drbd_device *device = container_of(kref, struct drbd_device, kref);
 struct drbd_resource *resource = device->resource;
 struct drbd_peer_device *peer_device, *tmp_peer_device;

 timer_shutdown_sync(&device->request_timer);

 /* paranoia asserts */
 D_ASSERT(device, device->open_cnt == 0);
 /* end paranoia asserts */

 /* cleanup stuff that may have been allocated during
 * device (re-)configuration or state changes */


 drbd_backing_dev_free(device, device->ldev);
 device->ldev = NULL;

 drbd_release_all_peer_reqs(device);

 lc_destroy(device->act_log);
 lc_destroy(device->resync);

 kfree(device->p_uuid);
 /* device->p_uuid = NULL; */

 if (device->bitmap) /* should no longer be there. */
  drbd_bm_cleanup(device);
 __free_page(device->md_io.page);
 put_disk(device->vdisk);
 kfree(device->rs_plan_s);

 /* not for_each_connection(connection, resource):
 * those may have been cleaned up and disassociated already.
 */

 for_each_peer_device_safe(peer_device, tmp_peer_device, device) {
  kref_put(&peer_device->connection->kref, drbd_destroy_connection);
  kfree(peer_device);
 }
 if (device->submit.wq)
  destroy_workqueue(device->submit.wq);
 kfree(device);
 kref_put(&resource->kref, drbd_destroy_resource);
}

/* One global retry thread, if we need to push back some bio and have it
 * reinserted through our make request function.
 */

static struct retry_worker {
 struct workqueue_struct *wq;
 struct work_struct worker;

 spinlock_t lock;
 struct list_head writes;
} retry;

static void do_retry(struct work_struct *ws)
{
 struct retry_worker *retry = container_of(ws, struct retry_worker, worker);
 LIST_HEAD(writes);
 struct drbd_request *req, *tmp;

 spin_lock_irq(&retry->lock);
 list_splice_init(&retry->writes, &writes);
--> --------------------

--> maximum size reached

--> --------------------

Messung V0.5
C=91 H=97 G=93

¤ Dauer der Verarbeitung: 0.30 Sekunden  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.