Quellcodebibliothek Statistik Leitseite products/sources/formale Sprachen/C/Linux/drivers/block/drbd/   (Open Source Betriebssystem Version 6.17.9©)  Datei vom 24.10.2025 mit Größe 143 kB image not shown  

Quelle  drbd_nl.c   Sprache: C

 
// SPDX-License-Identifier: GPL-2.0-only
/*
   drbd_nl.c

   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.

   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.


 */


#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt

#include <linux/module.h>
#include <linux/drbd.h>
#include <linux/in.h>
#include <linux/fs.h>
#include <linux/file.h>
#include <linux/slab.h>
#include <linux/blkpg.h>
#include <linux/cpumask.h>
#include "drbd_int.h"
#include "drbd_protocol.h"
#include "drbd_req.h"
#include "drbd_state_change.h"
#include <linux/unaligned.h>
#include <linux/drbd_limits.h>
#include <linux/kthread.h>

#include <net/genetlink.h>

/* .doit */
// int drbd_adm_create_resource(struct sk_buff *skb, struct genl_info *info);
// int drbd_adm_delete_resource(struct sk_buff *skb, struct genl_info *info);

int drbd_adm_new_minor(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_del_minor(struct sk_buff *skb, struct genl_info *info);

int drbd_adm_new_resource(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_del_resource(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_down(struct sk_buff *skb, struct genl_info *info);

int drbd_adm_set_role(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_disk_opts(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_detach(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_connect(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_net_opts(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_resize(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_start_ov(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_new_c_uuid(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_disconnect(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_invalidate(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_invalidate_peer(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_pause_sync(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_resume_sync(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_suspend_io(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_resume_io(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_outdate(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_resource_opts(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_get_status(struct sk_buff *skb, struct genl_info *info);
int drbd_adm_get_timeout_type(struct sk_buff *skb, struct genl_info *info);
/* .dumpit */
int drbd_adm_get_status_all(struct sk_buff *skb, struct netlink_callback *cb);
int drbd_adm_dump_resources(struct sk_buff *skb, struct netlink_callback *cb);
int drbd_adm_dump_devices(struct sk_buff *skb, struct netlink_callback *cb);
int drbd_adm_dump_devices_done(struct netlink_callback *cb);
int drbd_adm_dump_connections(struct sk_buff *skb, struct netlink_callback *cb);
int drbd_adm_dump_connections_done(struct netlink_callback *cb);
int drbd_adm_dump_peer_devices(struct sk_buff *skb, struct netlink_callback *cb);
int drbd_adm_dump_peer_devices_done(struct netlink_callback *cb);
int drbd_adm_get_initial_state(struct sk_buff *skb, struct netlink_callback *cb);

#include <linux/drbd_genl_api.h>
#include "drbd_nla.h"
#include <linux/genl_magic_func.h>

static atomic_t drbd_genl_seq = ATOMIC_INIT(2); /* two. */
static atomic_t notify_genl_seq = ATOMIC_INIT(2); /* two. */

DEFINE_MUTEX(notification_mutex);

/* used bdev_open_by_path, to claim our meta data device(s) */
static char *drbd_m_holder = "Hands off! this is DRBD's meta data device.";

static void drbd_adm_send_reply(struct sk_buff *skb, struct genl_info *info)
{
 genlmsg_end(skb, genlmsg_data(nlmsg_data(nlmsg_hdr(skb))));
 if (genlmsg_reply(skb, info))
  pr_err("error sending genl reply\n");
}

/* Used on a fresh "drbd_adm_prepare"d reply_skb, this cannot fail: The only
 * reason it could fail was no space in skb, and there are 4k available. */

static int drbd_msg_put_info(struct sk_buff *skb, const char *info)
{
 struct nlattr *nla;
 int err = -EMSGSIZE;

 if (!info || !info[0])
  return 0;

 nla = nla_nest_start_noflag(skb, DRBD_NLA_CFG_REPLY);
 if (!nla)
  return err;

 err = nla_put_string(skb, T_info_text, info);
 if (err) {
  nla_nest_cancel(skb, nla);
  return err;
 } else
  nla_nest_end(skb, nla);
 return 0;
}

__printf(2, 3)
static int drbd_msg_sprintf_info(struct sk_buff *skb, const char *fmt, ...)
{
 va_list args;
 struct nlattr *nla, *txt;
 int err = -EMSGSIZE;
 int len;

 nla = nla_nest_start_noflag(skb, DRBD_NLA_CFG_REPLY);
 if (!nla)
  return err;

 txt = nla_reserve(skb, T_info_text, 256);
 if (!txt) {
  nla_nest_cancel(skb, nla);
  return err;
 }
 va_start(args, fmt);
 len = vscnprintf(nla_data(txt), 256, fmt, args);
 va_end(args);

 /* maybe: retry with larger reserve, if truncated */
 txt->nla_len = nla_attr_size(len+1);
 nlmsg_trim(skb, (char*)txt + NLA_ALIGN(txt->nla_len));
 nla_nest_end(skb, nla);

 return 0;
}

/* This would be a good candidate for a "pre_doit" hook,
 * and per-family private info->pointers.
 * But we need to stay compatible with older kernels.
 * If it returns successfully, adm_ctx members are valid.
 *
 * At this point, we still rely on the global genl_lock().
 * If we want to avoid that, and allow "genl_family.parallel_ops", we may need
 * to add additional synchronization against object destruction/modification.
 */

#define DRBD_ADM_NEED_MINOR 1
#define DRBD_ADM_NEED_RESOURCE 2
#define DRBD_ADM_NEED_CONNECTION 4
static int drbd_adm_prepare(struct drbd_config_context *adm_ctx,
 struct sk_buff *skb, struct genl_info *info, unsigned flags)
{
 struct drbd_genlmsghdr *d_in = genl_info_userhdr(info);
 const u8 cmd = info->genlhdr->cmd;
 int err;

 memset(adm_ctx, 0, sizeof(*adm_ctx));

 /* genl_rcv_msg only checks for CAP_NET_ADMIN on "GENL_ADMIN_PERM" :( */
 if (cmd != DRBD_ADM_GET_STATUS && !capable(CAP_NET_ADMIN))
        return -EPERM;

 adm_ctx->reply_skb = genlmsg_new(NLMSG_GOODSIZE, GFP_KERNEL);
 if (!adm_ctx->reply_skb) {
  err = -ENOMEM;
  goto fail;
 }

 adm_ctx->reply_dh = genlmsg_put_reply(adm_ctx->reply_skb,
     info, &drbd_genl_family, 0, cmd);
 /* put of a few bytes into a fresh skb of >= 4k will always succeed.
 * but anyways */

 if (!adm_ctx->reply_dh) {
  err = -ENOMEM;
  goto fail;
 }

 adm_ctx->reply_dh->minor = d_in->minor;
 adm_ctx->reply_dh->ret_code = NO_ERROR;

 adm_ctx->volume = VOLUME_UNSPECIFIED;
 if (info->attrs[DRBD_NLA_CFG_CONTEXT]) {
  struct nlattr *nla;
  /* parse and validate only */
  err = drbd_cfg_context_from_attrs(NULL, info);
  if (err)
   goto fail;

  /* It was present, and valid,
 * copy it over to the reply skb. */

  err = nla_put_nohdr(adm_ctx->reply_skb,
    info->attrs[DRBD_NLA_CFG_CONTEXT]->nla_len,
    info->attrs[DRBD_NLA_CFG_CONTEXT]);
  if (err)
   goto fail;

  /* and assign stuff to the adm_ctx */
  nla = nested_attr_tb[__nla_type(T_ctx_volume)];
  if (nla)
   adm_ctx->volume = nla_get_u32(nla);
  nla = nested_attr_tb[__nla_type(T_ctx_resource_name)];
  if (nla)
   adm_ctx->resource_name = nla_data(nla);
  adm_ctx->my_addr = nested_attr_tb[__nla_type(T_ctx_my_addr)];
  adm_ctx->peer_addr = nested_attr_tb[__nla_type(T_ctx_peer_addr)];
  if ((adm_ctx->my_addr &&
       nla_len(adm_ctx->my_addr) > sizeof(adm_ctx->connection->my_addr)) ||
      (adm_ctx->peer_addr &&
       nla_len(adm_ctx->peer_addr) > sizeof(adm_ctx->connection->peer_addr))) {
   err = -EINVAL;
   goto fail;
  }
 }

 adm_ctx->minor = d_in->minor;
 adm_ctx->device = minor_to_device(d_in->minor);

 /* We are protected by the global genl_lock().
 * But we may explicitly drop it/retake it in drbd_adm_set_role(),
 * so make sure this object stays around. */

 if (adm_ctx->device)
  kref_get(&adm_ctx->device->kref);

 if (adm_ctx->resource_name) {
  adm_ctx->resource = drbd_find_resource(adm_ctx->resource_name);
 }

 if (!adm_ctx->device && (flags & DRBD_ADM_NEED_MINOR)) {
  drbd_msg_put_info(adm_ctx->reply_skb, "unknown minor");
  return ERR_MINOR_INVALID;
 }
 if (!adm_ctx->resource && (flags & DRBD_ADM_NEED_RESOURCE)) {
  drbd_msg_put_info(adm_ctx->reply_skb, "unknown resource");
  if (adm_ctx->resource_name)
   return ERR_RES_NOT_KNOWN;
  return ERR_INVALID_REQUEST;
 }

 if (flags & DRBD_ADM_NEED_CONNECTION) {
  if (adm_ctx->resource) {
   drbd_msg_put_info(adm_ctx->reply_skb, "no resource name expected");
   return ERR_INVALID_REQUEST;
  }
  if (adm_ctx->device) {
   drbd_msg_put_info(adm_ctx->reply_skb, "no minor number expected");
   return ERR_INVALID_REQUEST;
  }
  if (adm_ctx->my_addr && adm_ctx->peer_addr)
   adm_ctx->connection = conn_get_by_addrs(nla_data(adm_ctx->my_addr),
         nla_len(adm_ctx->my_addr),
         nla_data(adm_ctx->peer_addr),
         nla_len(adm_ctx->peer_addr));
  if (!adm_ctx->connection) {
   drbd_msg_put_info(adm_ctx->reply_skb, "unknown connection");
   return ERR_INVALID_REQUEST;
  }
 }

 /* some more paranoia, if the request was over-determined */
 if (adm_ctx->device && adm_ctx->resource &&
     adm_ctx->device->resource != adm_ctx->resource) {
  pr_warn("request: minor=%u, resource=%s; but that minor belongs to resource %s\n",
   adm_ctx->minor, adm_ctx->resource->name,
   adm_ctx->device->resource->name);
  drbd_msg_put_info(adm_ctx->reply_skb, "minor exists in different resource");
  return ERR_INVALID_REQUEST;
 }
 if (adm_ctx->device &&
     adm_ctx->volume != VOLUME_UNSPECIFIED &&
     adm_ctx->volume != adm_ctx->device->vnr) {
  pr_warn("request: minor=%u, volume=%u; but that minor is volume %u in %s\n",
   adm_ctx->minor, adm_ctx->volume,
   adm_ctx->device->vnr, adm_ctx->device->resource->name);
  drbd_msg_put_info(adm_ctx->reply_skb, "minor exists as different volume");
  return ERR_INVALID_REQUEST;
 }

 /* still, provide adm_ctx->resource always, if possible. */
 if (!adm_ctx->resource) {
  adm_ctx->resource = adm_ctx->device ? adm_ctx->device->resource
   : adm_ctx->connection ? adm_ctx->connection->resource : NULL;
  if (adm_ctx->resource)
   kref_get(&adm_ctx->resource->kref);
 }

 return NO_ERROR;

fail:
 nlmsg_free(adm_ctx->reply_skb);
 adm_ctx->reply_skb = NULL;
 return err;
}

static int drbd_adm_finish(struct drbd_config_context *adm_ctx,
 struct genl_info *info, int retcode)
{
 if (adm_ctx->device) {
  kref_put(&adm_ctx->device->kref, drbd_destroy_device);
  adm_ctx->device = NULL;
 }
 if (adm_ctx->connection) {
  kref_put(&adm_ctx->connection->kref, &drbd_destroy_connection);
  adm_ctx->connection = NULL;
 }
 if (adm_ctx->resource) {
  kref_put(&adm_ctx->resource->kref, drbd_destroy_resource);
  adm_ctx->resource = NULL;
 }

 if (!adm_ctx->reply_skb)
  return -ENOMEM;

 adm_ctx->reply_dh->ret_code = retcode;
 drbd_adm_send_reply(adm_ctx->reply_skb, info);
 return 0;
}

static void setup_khelper_env(struct drbd_connection *connection, char **envp)
{
 char *afs;

 /* FIXME: A future version will not allow this case. */
 if (connection->my_addr_len == 0 || connection->peer_addr_len == 0)
  return;

 switch (((struct sockaddr *)&connection->peer_addr)->sa_family) {
 case AF_INET6:
  afs = "ipv6";
  snprintf(envp[4], 60, "DRBD_PEER_ADDRESS=%pI6",
    &((struct sockaddr_in6 *)&connection->peer_addr)->sin6_addr);
  break;
 case AF_INET:
  afs = "ipv4";
  snprintf(envp[4], 60, "DRBD_PEER_ADDRESS=%pI4",
    &((struct sockaddr_in *)&connection->peer_addr)->sin_addr);
  break;
 default:
  afs = "ssocks";
  snprintf(envp[4], 60, "DRBD_PEER_ADDRESS=%pI4",
    &((struct sockaddr_in *)&connection->peer_addr)->sin_addr);
 }
 snprintf(envp[3], 20, "DRBD_PEER_AF=%s", afs);
}

int drbd_khelper(struct drbd_device *device, char *cmd)
{
 char *envp[] = { "HOME=/",
   "TERM=linux",
   "PATH=/sbin:/usr/sbin:/bin:/usr/bin",
    (char[20]) { }, /* address family */
    (char[60]) { }, /* address */
   NULL };
 char mb[14];
 char *argv[] = {drbd_usermode_helper, cmd, mb, NULL };
 struct drbd_connection *connection = first_peer_device(device)->connection;
 struct sib_info sib;
 int ret;

 if (current == connection->worker.task)
  set_bit(CALLBACK_PENDING, &connection->flags);

 snprintf(mb, 14, "minor-%d", device_to_minor(device));
 setup_khelper_env(connection, envp);

 /* The helper may take some time.
 * write out any unsynced meta data changes now */

 drbd_md_sync(device);

 drbd_info(device, "helper command: %s %s %s\n", drbd_usermode_helper, cmd, mb);
 sib.sib_reason = SIB_HELPER_PRE;
 sib.helper_name = cmd;
 drbd_bcast_event(device, &sib);
 notify_helper(NOTIFY_CALL, device, connection, cmd, 0);
 ret = call_usermodehelper(drbd_usermode_helper, argv, envp, UMH_WAIT_PROC);
 if (ret)
  drbd_warn(device, "helper command: %s %s %s exit code %u (0x%x)\n",
    drbd_usermode_helper, cmd, mb,
    (ret >> 8) & 0xff, ret);
 else
  drbd_info(device, "helper command: %s %s %s exit code %u (0x%x)\n",
    drbd_usermode_helper, cmd, mb,
    (ret >> 8) & 0xff, ret);
 sib.sib_reason = SIB_HELPER_POST;
 sib.helper_exit_code = ret;
 drbd_bcast_event(device, &sib);
 notify_helper(NOTIFY_RESPONSE, device, connection, cmd, ret);

 if (current == connection->worker.task)
  clear_bit(CALLBACK_PENDING, &connection->flags);

 if (ret < 0) /* Ignore any ERRNOs we got. */
  ret = 0;

 return ret;
}

enum drbd_peer_state conn_khelper(struct drbd_connection *connection, char *cmd)
{
 char *envp[] = { "HOME=/",
   "TERM=linux",
   "PATH=/sbin:/usr/sbin:/bin:/usr/bin",
    (char[20]) { }, /* address family */
    (char[60]) { }, /* address */
   NULL };
 char *resource_name = connection->resource->name;
 char *argv[] = {drbd_usermode_helper, cmd, resource_name, NULL };
 int ret;

 setup_khelper_env(connection, envp);
 conn_md_sync(connection);

 drbd_info(connection, "helper command: %s %s %s\n", drbd_usermode_helper, cmd, resource_name);
 /* TODO: conn_bcast_event() ?? */
 notify_helper(NOTIFY_CALL, NULL, connection, cmd, 0);

 ret = call_usermodehelper(drbd_usermode_helper, argv, envp, UMH_WAIT_PROC);
 if (ret)
  drbd_warn(connection, "helper command: %s %s %s exit code %u (0x%x)\n",
     drbd_usermode_helper, cmd, resource_name,
     (ret >> 8) & 0xff, ret);
 else
  drbd_info(connection, "helper command: %s %s %s exit code %u (0x%x)\n",
     drbd_usermode_helper, cmd, resource_name,
     (ret >> 8) & 0xff, ret);
 /* TODO: conn_bcast_event() ?? */
 notify_helper(NOTIFY_RESPONSE, NULL, connection, cmd, ret);

 if (ret < 0) /* Ignore any ERRNOs we got. */
  ret = 0;

 return ret;
}

static enum drbd_fencing_p highest_fencing_policy(struct drbd_connection *connection)
{
 enum drbd_fencing_p fp = FP_NOT_AVAIL;
 struct drbd_peer_device *peer_device;
 int vnr;

 rcu_read_lock();
 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
  struct drbd_device *device = peer_device->device;
  if (get_ldev_if_state(device, D_CONSISTENT)) {
   struct disk_conf *disk_conf =
    rcu_dereference(peer_device->device->ldev->disk_conf);
   fp = max_t(enum drbd_fencing_p, fp, disk_conf->fencing);
   put_ldev(device);
  }
 }
 rcu_read_unlock();

 return fp;
}

static bool resource_is_supended(struct drbd_resource *resource)
{
 return resource->susp || resource->susp_fen || resource->susp_nod;
}

bool conn_try_outdate_peer(struct drbd_connection *connection)
{
 struct drbd_resource * const resource = connection->resource;
 unsigned int connect_cnt;
 union drbd_state mask = { };
 union drbd_state val = { };
 enum drbd_fencing_p fp;
 char *ex_to_string;
 int r;

 spin_lock_irq(&resource->req_lock);
 if (connection->cstate >= C_WF_REPORT_PARAMS) {
  drbd_err(connection, "Expected cstate < C_WF_REPORT_PARAMS\n");
  spin_unlock_irq(&resource->req_lock);
  return false;
 }

 connect_cnt = connection->connect_cnt;
 spin_unlock_irq(&resource->req_lock);

 fp = highest_fencing_policy(connection);
 switch (fp) {
 case FP_NOT_AVAIL:
  drbd_warn(connection, "Not fencing peer, I'm not even Consistent myself.\n");
  spin_lock_irq(&resource->req_lock);
  if (connection->cstate < C_WF_REPORT_PARAMS) {
   _conn_request_state(connection,
         (union drbd_state) { { .susp_fen = 1 } },
         (union drbd_state) { { .susp_fen = 0 } },
         CS_VERBOSE | CS_HARD | CS_DC_SUSP);
   /* We are no longer suspended due to the fencing policy.
 * We may still be suspended due to the on-no-data-accessible policy.
 * If that was OND_IO_ERROR, fail pending requests. */

   if (!resource_is_supended(resource))
    _tl_restart(connection, CONNECTION_LOST_WHILE_PENDING);
  }
  /* Else: in case we raced with a connection handshake,
 * let the handshake figure out if we maybe can RESEND,
 * and do not resume/fail pending requests here.
 * Worst case is we stay suspended for now, which may be
 * resolved by either re-establishing the replication link, or
 * the next link failure, or eventually the administrator.  */

  spin_unlock_irq(&resource->req_lock);
  return false;

 case FP_DONT_CARE:
  return true;
 default: ;
 }

 r = conn_khelper(connection, "fence-peer");

 switch ((r>>8) & 0xff) {
 case P_INCONSISTENT: /* peer is inconsistent */
  ex_to_string = "peer is inconsistent or worse";
  mask.pdsk = D_MASK;
  val.pdsk = D_INCONSISTENT;
  break;
 case P_OUTDATED: /* peer got outdated, or was already outdated */
  ex_to_string = "peer was fenced";
  mask.pdsk = D_MASK;
  val.pdsk = D_OUTDATED;
  break;
 case P_DOWN: /* peer was down */
  if (conn_highest_disk(connection) == D_UP_TO_DATE) {
   /* we will(have) create(d) a new UUID anyways... */
   ex_to_string = "peer is unreachable, assumed to be dead";
   mask.pdsk = D_MASK;
   val.pdsk = D_OUTDATED;
  } else {
   ex_to_string = "peer unreachable, doing nothing since disk != UpToDate";
  }
  break;
 case P_PRIMARY: /* Peer is primary, voluntarily outdate myself.
 * This is useful when an unconnected R_SECONDARY is asked to
 * become R_PRIMARY, but finds the other peer being active. */

  ex_to_string = "peer is active";
  drbd_warn(connection, "Peer is primary, outdating myself.\n");
  mask.disk = D_MASK;
  val.disk = D_OUTDATED;
  break;
 case P_FENCING:
  /* THINK: do we need to handle this
 * like case 4, or more like case 5? */

  if (fp != FP_STONITH)
   drbd_err(connection, "fence-peer() = 7 && fencing != Stonith !!!\n");
  ex_to_string = "peer was stonithed";
  mask.pdsk = D_MASK;
  val.pdsk = D_OUTDATED;
  break;
 default:
  /* The script is broken ... */
  drbd_err(connection, "fence-peer helper broken, returned %d\n", (r>>8)&0xff);
  return false/* Eventually leave IO frozen */
 }

 drbd_info(connection, "fence-peer helper returned %d (%s)\n",
    (r>>8) & 0xff, ex_to_string);

 /* Not using
   conn_request_state(connection, mask, val, CS_VERBOSE);
   here, because we might were able to re-establish the connection in the
   meantime. */

 spin_lock_irq(&resource->req_lock);
 if (connection->cstate < C_WF_REPORT_PARAMS && !test_bit(STATE_SENT, &connection->flags)) {
  if (connection->connect_cnt != connect_cnt)
   /* In case the connection was established and droped
   while the fence-peer handler was running, ignore it */

   drbd_info(connection, "Ignoring fence-peer exit code\n");
  else
   _conn_request_state(connection, mask, val, CS_VERBOSE);
 }
 spin_unlock_irq(&resource->req_lock);

 return conn_highest_pdsk(connection) <= D_OUTDATED;
}

static int _try_outdate_peer_async(void *data)
{
 struct drbd_connection *connection = (struct drbd_connection *)data;

 conn_try_outdate_peer(connection);

 kref_put(&connection->kref, drbd_destroy_connection);
 return 0;
}

void conn_try_outdate_peer_async(struct drbd_connection *connection)
{
 struct task_struct *opa;

 kref_get(&connection->kref);
 /* We may have just sent a signal to this thread
 * to get it out of some blocking network function.
 * Clear signals; otherwise kthread_run(), which internally uses
 * wait_on_completion_killable(), will mistake our pending signal
 * for a new fatal signal and fail. */

 flush_signals(current);
 opa = kthread_run(_try_outdate_peer_async, connection, "drbd_async_h");
 if (IS_ERR(opa)) {
  drbd_err(connection, "out of mem, failed to invoke fence-peer helper\n");
  kref_put(&connection->kref, drbd_destroy_connection);
 }
}

enum drbd_state_rv
drbd_set_role(struct drbd_device *const device, enum drbd_role new_role, int force)
{
 struct drbd_peer_device *const peer_device = first_peer_device(device);
 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
 const int max_tries = 4;
 enum drbd_state_rv rv = SS_UNKNOWN_ERROR;
 struct net_conf *nc;
 int try = 0;
 int forced = 0;
 union drbd_state mask, val;

 if (new_role == R_PRIMARY) {
  struct drbd_connection *connection;

  /* Detect dead peers as soon as possible.  */

  rcu_read_lock();
  for_each_connection(connection, device->resource)
   request_ping(connection);
  rcu_read_unlock();
 }

 mutex_lock(device->state_mutex);

 mask.i = 0; mask.role = R_MASK;
 val.i  = 0; val.role  = new_role;

 while (try++ < max_tries) {
  rv = _drbd_request_state_holding_state_mutex(device, mask, val, CS_WAIT_COMPLETE);

  /* in case we first succeeded to outdate,
 * but now suddenly could establish a connection */

  if (rv == SS_CW_FAILED_BY_PEER && mask.pdsk != 0) {
   val.pdsk = 0;
   mask.pdsk = 0;
   continue;
  }

  if (rv == SS_NO_UP_TO_DATE_DISK && force &&
      (device->state.disk < D_UP_TO_DATE &&
       device->state.disk >= D_INCONSISTENT)) {
   mask.disk = D_MASK;
   val.disk  = D_UP_TO_DATE;
   forced = 1;
   continue;
  }

  if (rv == SS_NO_UP_TO_DATE_DISK &&
      device->state.disk == D_CONSISTENT && mask.pdsk == 0) {
   D_ASSERT(device, device->state.pdsk == D_UNKNOWN);

   if (conn_try_outdate_peer(connection)) {
    val.disk = D_UP_TO_DATE;
    mask.disk = D_MASK;
   }
   continue;
  }

  if (rv == SS_NOTHING_TO_DO)
   goto out;
  if (rv == SS_PRIMARY_NOP && mask.pdsk == 0) {
   if (!conn_try_outdate_peer(connection) && force) {
    drbd_warn(device, "Forced into split brain situation!\n");
    mask.pdsk = D_MASK;
    val.pdsk  = D_OUTDATED;

   }
   continue;
  }
  if (rv == SS_TWO_PRIMARIES) {
   /* Maybe the peer is detected as dead very soon...
   retry at most once more in this case. */

   if (try < max_tries) {
    int timeo;
    try = max_tries - 1;
    rcu_read_lock();
    nc = rcu_dereference(connection->net_conf);
    timeo = nc ? (nc->ping_timeo + 1) * HZ / 10 : 1;
    rcu_read_unlock();
    schedule_timeout_interruptible(timeo);
   }
   continue;
  }
  if (rv < SS_SUCCESS) {
   rv = _drbd_request_state(device, mask, val,
      CS_VERBOSE + CS_WAIT_COMPLETE);
   if (rv < SS_SUCCESS)
    goto out;
  }
  break;
 }

 if (rv < SS_SUCCESS)
  goto out;

 if (forced)
  drbd_warn(device, "Forced to consider local data as UpToDate!\n");

 /* Wait until nothing is on the fly :) */
 wait_event(device->misc_wait, atomic_read(&device->ap_pending_cnt) == 0);

 /* FIXME also wait for all pending P_BARRIER_ACK? */

 if (new_role == R_SECONDARY) {
  if (get_ldev(device)) {
   device->ldev->md.uuid[UI_CURRENT] &= ~(u64)1;
   put_ldev(device);
  }
 } else {
  mutex_lock(&device->resource->conf_update);
  nc = connection->net_conf;
  if (nc)
   nc->discard_my_data = 0; /* without copy; single bit op is atomic */
  mutex_unlock(&device->resource->conf_update);

  if (get_ldev(device)) {
   if (((device->state.conn < C_CONNECTED ||
          device->state.pdsk <= D_FAILED)
         && device->ldev->md.uuid[UI_BITMAP] == 0) || forced)
    drbd_uuid_new_current(device);

   device->ldev->md.uuid[UI_CURRENT] |=  (u64)1;
   put_ldev(device);
  }
 }

 /* writeout of activity log covered areas of the bitmap
 * to stable storage done in after state change already */


 if (device->state.conn >= C_WF_REPORT_PARAMS) {
  /* if this was forced, we should consider sync */
  if (forced)
   drbd_send_uuids(peer_device);
  drbd_send_current_state(peer_device);
 }

 drbd_md_sync(device);
 set_disk_ro(device->vdisk, new_role == R_SECONDARY);
 kobject_uevent(&disk_to_dev(device->vdisk)->kobj, KOBJ_CHANGE);
out:
 mutex_unlock(device->state_mutex);
 return rv;
}

static const char *from_attrs_err_to_txt(int err)
{
 return err == -ENOMSG ? "required attribute missing" :
  err == -EOPNOTSUPP ? "unknown mandatory attribute" :
  err == -EEXIST ? "can not change invariant setting" :
  "invalid attribute value";
}

int drbd_adm_set_role(struct sk_buff *skb, struct genl_info *info)
{
 struct drbd_config_context adm_ctx;
 struct set_role_parms parms;
 int err;
 enum drbd_ret_code retcode;
 enum drbd_state_rv rv;

 retcode = drbd_adm_prepare(&adm_ctx, skb, info, DRBD_ADM_NEED_MINOR);
 if (!adm_ctx.reply_skb)
  return retcode;
 if (retcode != NO_ERROR)
  goto out;

 memset(&parms, 0, sizeof(parms));
 if (info->attrs[DRBD_NLA_SET_ROLE_PARMS]) {
  err = set_role_parms_from_attrs(&parms, info);
  if (err) {
   retcode = ERR_MANDATORY_TAG;
   drbd_msg_put_info(adm_ctx.reply_skb, from_attrs_err_to_txt(err));
   goto out;
  }
 }
 genl_unlock();
 mutex_lock(&adm_ctx.resource->adm_mutex);

 if (info->genlhdr->cmd == DRBD_ADM_PRIMARY)
  rv = drbd_set_role(adm_ctx.device, R_PRIMARY, parms.assume_uptodate);
 else
  rv = drbd_set_role(adm_ctx.device, R_SECONDARY, 0);

 mutex_unlock(&adm_ctx.resource->adm_mutex);
 genl_lock();
 drbd_adm_finish(&adm_ctx, info, rv);
 return 0;
out:
 drbd_adm_finish(&adm_ctx, info, retcode);
 return 0;
}

/* Initializes the md.*_offset members, so we are able to find
 * the on disk meta data.
 *
 * We currently have two possible layouts:
 * external:
 *   |----------- md_size_sect ------------------|
 *   [ 4k superblock ][ activity log ][  Bitmap  ]
 *   | al_offset == 8 |
 *   | bm_offset = al_offset + X      |
 *  ==> bitmap sectors = md_size_sect - bm_offset
 *
 * internal:
 *            |----------- md_size_sect ------------------|
 * [data.....][  Bitmap  ][ activity log ][ 4k superblock ]
 *                        | al_offset < 0 |
 *            | bm_offset = al_offset - Y |
 *  ==> bitmap sectors = Y = al_offset - bm_offset
 *
 *  Activity log size used to be fixed 32kB,
 *  but is about to become configurable.
 */

static void drbd_md_set_sector_offsets(struct drbd_device *device,
           struct drbd_backing_dev *bdev)
{
 sector_t md_size_sect = 0;
 unsigned int al_size_sect = bdev->md.al_size_4k * 8;

 bdev->md.md_offset = drbd_md_ss(bdev);

 switch (bdev->md.meta_dev_idx) {
 default:
  /* v07 style fixed size indexed meta data */
  bdev->md.md_size_sect = MD_128MB_SECT;
  bdev->md.al_offset = MD_4kB_SECT;
  bdev->md.bm_offset = MD_4kB_SECT + al_size_sect;
  break;
 case DRBD_MD_INDEX_FLEX_EXT:
  /* just occupy the full device; unit: sectors */
  bdev->md.md_size_sect = drbd_get_capacity(bdev->md_bdev);
  bdev->md.al_offset = MD_4kB_SECT;
  bdev->md.bm_offset = MD_4kB_SECT + al_size_sect;
  break;
 case DRBD_MD_INDEX_INTERNAL:
 case DRBD_MD_INDEX_FLEX_INT:
  /* al size is still fixed */
  bdev->md.al_offset = -al_size_sect;
  /* we need (slightly less than) ~ this much bitmap sectors: */
  md_size_sect = drbd_get_capacity(bdev->backing_bdev);
  md_size_sect = ALIGN(md_size_sect, BM_SECT_PER_EXT);
  md_size_sect = BM_SECT_TO_EXT(md_size_sect);
  md_size_sect = ALIGN(md_size_sect, 8);

  /* plus the "drbd meta data super block",
 * and the activity log; */

  md_size_sect += MD_4kB_SECT + al_size_sect;

  bdev->md.md_size_sect = md_size_sect;
  /* bitmap offset is adjusted by 'super' block size */
  bdev->md.bm_offset   = -md_size_sect + MD_4kB_SECT;
  break;
 }
}

/* input size is expected to be in KB */
char *ppsize(char *buf, unsigned long long size)
{
 /* Needs 9 bytes at max including trailing NUL:
 * -1ULL ==> "16384 EB" */

 static char units[] = { 'K''M''G''T''P''E' };
 int base = 0;
 while (size >= 10000 && base < sizeof(units)-1) {
  /* shift + round */
  size = (size >> 10) + !!(size & (1<<9));
  base++;
 }
 sprintf(buf, "%u %cB", (unsigned)size, units[base]);

 return buf;
}

/* there is still a theoretical deadlock when called from receiver
 * on an D_INCONSISTENT R_PRIMARY:
 *  remote READ does inc_ap_bio, receiver would need to receive answer
 *  packet from remote to dec_ap_bio again.
 *  receiver receive_sizes(), comes here,
 *  waits for ap_bio_cnt == 0. -> deadlock.
 * but this cannot happen, actually, because:
 *  R_PRIMARY D_INCONSISTENT, and peer's disk is unreachable
 *  (not connected, or bad/no disk on peer):
 *  see drbd_fail_request_early, ap_bio_cnt is zero.
 *  R_PRIMARY D_INCONSISTENT, and C_SYNC_TARGET:
 *  peer may not initiate a resize.
 */

/* Note these are not to be confused with
 * drbd_adm_suspend_io/drbd_adm_resume_io,
 * which are (sub) state changes triggered by admin (drbdsetup),
 * and can be long lived.
 * This changes an device->flag, is triggered by drbd internals,
 * and should be short-lived. */

/* It needs to be a counter, since multiple threads might
   independently suspend and resume IO. */

void drbd_suspend_io(struct drbd_device *device)
{
 atomic_inc(&device->suspend_cnt);
 if (drbd_suspended(device))
  return;
 wait_event(device->misc_wait, !atomic_read(&device->ap_bio_cnt));
}

void drbd_resume_io(struct drbd_device *device)
{
 if (atomic_dec_and_test(&device->suspend_cnt))
  wake_up(&device->misc_wait);
}

/*
 * drbd_determine_dev_size() -  Sets the right device size obeying all constraints
 * @device: DRBD device.
 *
 * Returns 0 on success, negative return values indicate errors.
 * You should call drbd_md_sync() after calling this function.
 */

enum determine_dev_size
drbd_determine_dev_size(struct drbd_device *device, enum dds_flags flags, struct resize_parms *rs) __must_hold(local)
{
 struct md_offsets_and_sizes {
  u64 last_agreed_sect;
  u64 md_offset;
  s32 al_offset;
  s32 bm_offset;
  u32 md_size_sect;

  u32 al_stripes;
  u32 al_stripe_size_4k;
 } prev;
 sector_t u_size, size;
 struct drbd_md *md = &device->ldev->md;
 void *buffer;

 int md_moved, la_size_changed;
 enum determine_dev_size rv = DS_UNCHANGED;

 /* We may change the on-disk offsets of our meta data below.  Lock out
 * anything that may cause meta data IO, to avoid acting on incomplete
 * layout changes or scribbling over meta data that is in the process
 * of being moved.
 *
 * Move is not exactly correct, btw, currently we have all our meta
 * data in core memory, to "move" it we just write it all out, there
 * are no reads. */

 drbd_suspend_io(device);
 buffer = drbd_md_get_buffer(device, __func__); /* Lock meta-data IO */
 if (!buffer) {
  drbd_resume_io(device);
  return DS_ERROR;
 }

 /* remember current offset and sizes */
 prev.last_agreed_sect = md->la_size_sect;
 prev.md_offset = md->md_offset;
 prev.al_offset = md->al_offset;
 prev.bm_offset = md->bm_offset;
 prev.md_size_sect = md->md_size_sect;
 prev.al_stripes = md->al_stripes;
 prev.al_stripe_size_4k = md->al_stripe_size_4k;

 if (rs) {
  /* rs is non NULL if we should change the AL layout only */
  md->al_stripes = rs->al_stripes;
  md->al_stripe_size_4k = rs->al_stripe_size / 4;
  md->al_size_4k = (u64)rs->al_stripes * rs->al_stripe_size / 4;
 }

 drbd_md_set_sector_offsets(device, device->ldev);

 rcu_read_lock();
 u_size = rcu_dereference(device->ldev->disk_conf)->disk_size;
 rcu_read_unlock();
 size = drbd_new_dev_size(device, device->ldev, u_size, flags & DDSF_FORCED);

 if (size < prev.last_agreed_sect) {
  if (rs && u_size == 0) {
   /* Remove "rs &&" later. This check should always be active, but
   right now the receiver expects the permissive behavior */

   drbd_warn(device, "Implicit shrink not allowed. "
     "Use --size=%llus for explicit shrink.\n",
     (unsigned long long)size);
   rv = DS_ERROR_SHRINK;
  }
  if (u_size > size)
   rv = DS_ERROR_SPACE_MD;
  if (rv != DS_UNCHANGED)
   goto err_out;
 }

 if (get_capacity(device->vdisk) != size ||
     drbd_bm_capacity(device) != size) {
  int err;
  err = drbd_bm_resize(device, size, !(flags & DDSF_NO_RESYNC));
  if (unlikely(err)) {
   /* currently there is only one error: ENOMEM! */
   size = drbd_bm_capacity(device);
   if (size == 0) {
    drbd_err(device, "OUT OF MEMORY! "
        "Could not allocate bitmap!\n");
   } else {
    drbd_err(device, "BM resizing failed. "
        "Leaving size unchanged\n");
   }
   rv = DS_ERROR;
  }
  /* racy, see comments above. */
  drbd_set_my_capacity(device, size);
  md->la_size_sect = size;
 }
 if (rv <= DS_ERROR)
  goto err_out;

 la_size_changed = (prev.last_agreed_sect != md->la_size_sect);

 md_moved = prev.md_offset    != md->md_offset
  || prev.md_size_sect != md->md_size_sect;

 if (la_size_changed || md_moved || rs) {
  u32 prev_flags;

  /* We do some synchronous IO below, which may take some time.
 * Clear the timer, to avoid scary "timer expired!" messages,
 * "Superblock" is written out at least twice below, anyways. */

  timer_delete(&device->md_sync_timer);

  /* We won't change the "al-extents" setting, we just may need
 * to move the on-disk location of the activity log ringbuffer.
 * Lock for transaction is good enough, it may well be "dirty"
 * or even "starving". */

  wait_event(device->al_wait, lc_try_lock_for_transaction(device->act_log));

  /* mark current on-disk bitmap and activity log as unreliable */
  prev_flags = md->flags;
  md->flags |= MDF_FULL_SYNC | MDF_AL_DISABLED;
  drbd_md_write(device, buffer);

  drbd_al_initialize(device, buffer);

  drbd_info(device, "Writing the whole bitmap, %s\n",
    la_size_changed && md_moved ? "size changed and md moved" :
    la_size_changed ? "size changed" : "md moved");
  /* next line implicitly does drbd_suspend_io()+drbd_resume_io() */
  drbd_bitmap_io(device, md_moved ? &drbd_bm_write_all : &drbd_bm_write,
          "size changed", BM_LOCKED_MASK, NULL);

  /* on-disk bitmap and activity log is authoritative again
 * (unless there was an IO error meanwhile...) */

  md->flags = prev_flags;
  drbd_md_write(device, buffer);

  if (rs)
   drbd_info(device, "Changed AL layout to al-stripes = %d, al-stripe-size-kB = %d\n",
      md->al_stripes, md->al_stripe_size_4k * 4);
 }

 if (size > prev.last_agreed_sect)
  rv = prev.last_agreed_sect ? DS_GREW : DS_GREW_FROM_ZERO;
 if (size < prev.last_agreed_sect)
  rv = DS_SHRUNK;

 if (0) {
 err_out:
  /* restore previous offset and sizes */
  md->la_size_sect = prev.last_agreed_sect;
  md->md_offset = prev.md_offset;
  md->al_offset = prev.al_offset;
  md->bm_offset = prev.bm_offset;
  md->md_size_sect = prev.md_size_sect;
  md->al_stripes = prev.al_stripes;
  md->al_stripe_size_4k = prev.al_stripe_size_4k;
  md->al_size_4k = (u64)prev.al_stripes * prev.al_stripe_size_4k;
 }
 lc_unlock(device->act_log);
 wake_up(&device->al_wait);
 drbd_md_put_buffer(device);
 drbd_resume_io(device);

 return rv;
}

sector_t
drbd_new_dev_size(struct drbd_device *device, struct drbd_backing_dev *bdev,
    sector_t u_size, int assume_peer_has_space)
{
 sector_t p_size = device->p_size;   /* partner's disk size. */
 sector_t la_size_sect = bdev->md.la_size_sect; /* last agreed size. */
 sector_t m_size; /* my size */
 sector_t size = 0;

 m_size = drbd_get_max_capacity(bdev);

 if (device->state.conn < C_CONNECTED && assume_peer_has_space) {
  drbd_warn(device, "Resize while not connected was forced by the user!\n");
  p_size = m_size;
 }

 if (p_size && m_size) {
  size = min_t(sector_t, p_size, m_size);
 } else {
  if (la_size_sect) {
   size = la_size_sect;
   if (m_size && m_size < size)
    size = m_size;
   if (p_size && p_size < size)
    size = p_size;
  } else {
   if (m_size)
    size = m_size;
   if (p_size)
    size = p_size;
  }
 }

 if (size == 0)
  drbd_err(device, "Both nodes diskless!\n");

 if (u_size) {
  if (u_size > size)
   drbd_err(device, "Requested disk size is too big (%lu > %lu)\n",
       (unsigned long)u_size>>1, (unsigned long)size>>1);
  else
   size = u_size;
 }

 return size;
}

/*
 * drbd_check_al_size() - Ensures that the AL is of the right size
 * @device: DRBD device.
 *
 * Returns -EBUSY if current al lru is still used, -ENOMEM when allocation
 * failed, and 0 on success. You should call drbd_md_sync() after you called
 * this function.
 */

static int drbd_check_al_size(struct drbd_device *device, struct disk_conf *dc)
{
 struct lru_cache *n, *t;
 struct lc_element *e;
 unsigned int in_use;
 int i;

 if (device->act_log &&
     device->act_log->nr_elements == dc->al_extents)
  return 0;

 in_use = 0;
 t = device->act_log;
 n = lc_create("act_log", drbd_al_ext_cache, AL_UPDATES_PER_TRANSACTION,
  dc->al_extents, sizeof(struct lc_element), 0);

 if (n == NULL) {
  drbd_err(device, "Cannot allocate act_log lru!\n");
  return -ENOMEM;
 }
 spin_lock_irq(&device->al_lock);
 if (t) {
  for (i = 0; i < t->nr_elements; i++) {
   e = lc_element_by_index(t, i);
   if (e->refcnt)
    drbd_err(device, "refcnt(%d)==%d\n",
        e->lc_number, e->refcnt);
   in_use += e->refcnt;
  }
 }
 if (!in_use)
  device->act_log = n;
 spin_unlock_irq(&device->al_lock);
 if (in_use) {
  drbd_err(device, "Activity log still in use!\n");
  lc_destroy(n);
  return -EBUSY;
 } else {
  lc_destroy(t);
 }
 drbd_md_mark_dirty(device); /* we changed device->act_log->nr_elemens */
 return 0;
}

static unsigned int drbd_max_peer_bio_size(struct drbd_device *device)
{
 /*
 * We may ignore peer limits if the peer is modern enough.  From 8.3.8
 * onwards the peer can use multiple BIOs for a single peer_request.
 */

 if (device->state.conn < C_WF_REPORT_PARAMS)
  return device->peer_max_bio_size;

 if (first_peer_device(device)->connection->agreed_pro_version < 94)
  return min(device->peer_max_bio_size, DRBD_MAX_SIZE_H80_PACKET);

 /*
 * Correct old drbd (up to 8.3.7) if it believes it can do more than
 * 32KiB.
 */

 if (first_peer_device(device)->connection->agreed_pro_version == 94)
  return DRBD_MAX_SIZE_H80_PACKET;

 /*
 * drbd 8.3.8 onwards, before 8.4.0
 */

 if (first_peer_device(device)->connection->agreed_pro_version < 100)
  return DRBD_MAX_BIO_SIZE_P95;
 return DRBD_MAX_BIO_SIZE;
}

static unsigned int drbd_max_discard_sectors(struct drbd_connection *connection)
{
 /* when we introduced REQ_WRITE_SAME support, we also bumped
 * our maximum supported batch bio size used for discards. */

 if (connection->agreed_features & DRBD_FF_WSAME)
  return DRBD_MAX_BBIO_SECTORS;
 /* before, with DRBD <= 8.4.6, we only allowed up to one AL_EXTENT_SIZE. */
 return AL_EXTENT_SIZE >> 9;
}

static bool drbd_discard_supported(struct drbd_connection *connection,
  struct drbd_backing_dev *bdev)
{
 if (bdev && !bdev_max_discard_sectors(bdev->backing_bdev))
  return false;

 if (connection->cstate >= C_CONNECTED &&
     !(connection->agreed_features & DRBD_FF_TRIM)) {
  drbd_info(connection,
   "peer DRBD too old, does not support TRIM: disabling discards\n");
  return false;
 }

 return true;
}

/* This is the workaround for "bio would need to, but cannot, be split" */
static unsigned int drbd_backing_dev_max_segments(struct drbd_device *device)
{
 unsigned int max_segments;

 rcu_read_lock();
 max_segments = rcu_dereference(device->ldev->disk_conf)->max_bio_bvecs;
 rcu_read_unlock();

 if (!max_segments)
  return BLK_MAX_SEGMENTS;
 return max_segments;
}

void drbd_reconsider_queue_parameters(struct drbd_device *device,
  struct drbd_backing_dev *bdev, struct o_qlim *o)
{
 struct drbd_connection *connection =
  first_peer_device(device)->connection;
 struct request_queue * const q = device->rq_queue;
 unsigned int now = queue_max_hw_sectors(q) << 9;
 struct queue_limits lim;
 struct request_queue *b = NULL;
 unsigned int new;

 if (bdev) {
  b = bdev->backing_bdev->bd_disk->queue;

  device->local_max_bio_size =
   queue_max_hw_sectors(b) << SECTOR_SHIFT;
 }

 /*
 * We may later detach and re-attach on a disconnected Primary.  Avoid
 * decreasing the value in this case.
 *
 * We want to store what we know the peer DRBD can handle, not what the
 * peer IO backend can handle.
 */

 new = min3(DRBD_MAX_BIO_SIZE, device->local_max_bio_size,
  max(drbd_max_peer_bio_size(device), device->peer_max_bio_size));
 if (new != now) {
  if (device->state.role == R_PRIMARY && new < now)
   drbd_err(device, "ASSERT FAILED new < now; (%u < %u)\n",
     new, now);
  drbd_info(device, "max BIO size = %u\n"new);
 }

 lim = queue_limits_start_update(q);
 if (bdev) {
  blk_set_stacking_limits(&lim);
  lim.max_segments = drbd_backing_dev_max_segments(device);
 } else {
  lim.max_segments = BLK_MAX_SEGMENTS;
 }

 lim.max_hw_sectors = new >> SECTOR_SHIFT;
 lim.seg_boundary_mask = PAGE_SIZE - 1;

 /*
 * We don't care for the granularity, really.
 *
 * Stacking limits below should fix it for the local device.  Whether or
 * not it is a suitable granularity on the remote device is not our
 * problem, really. If you care, you need to use devices with similar
 * topology on all peers.
 */

 if (drbd_discard_supported(connection, bdev)) {
  lim.discard_granularity = 512;
  lim.max_hw_discard_sectors =
   drbd_max_discard_sectors(connection);
 } else {
  lim.discard_granularity = 0;
  lim.max_hw_discard_sectors = 0;
 }

 if (bdev)
  blk_stack_limits(&lim, &b->limits, 0);

 /*
 * If we can handle "zeroes" efficiently on the protocol, we want to do
 * that, even if our backend does not announce max_write_zeroes_sectors
 * itself.
 */

 if (connection->agreed_features & DRBD_FF_WZEROES)
  lim.max_write_zeroes_sectors = DRBD_MAX_BBIO_SECTORS;
 else
  lim.max_write_zeroes_sectors = 0;
 lim.max_hw_wzeroes_unmap_sectors = 0;

 if ((lim.discard_granularity >> SECTOR_SHIFT) >
     lim.max_hw_discard_sectors) {
  lim.discard_granularity = 0;
  lim.max_hw_discard_sectors = 0;
 }

 if (queue_limits_commit_update(q, &lim))
  drbd_err(device, "setting new queue limits failed\n");
}

/* Starts the worker thread */
static void conn_reconfig_start(struct drbd_connection *connection)
{
 drbd_thread_start(&connection->worker);
 drbd_flush_workqueue(&connection->sender_work);
}

/* if still unconfigured, stops worker again. */
static void conn_reconfig_done(struct drbd_connection *connection)
{
 bool stop_threads;
 spin_lock_irq(&connection->resource->req_lock);
 stop_threads = conn_all_vols_unconf(connection) &&
  connection->cstate == C_STANDALONE;
 spin_unlock_irq(&connection->resource->req_lock);
 if (stop_threads) {
  /* ack_receiver thread and ack_sender workqueue are implicitly
 * stopped by receiver in conn_disconnect() */

  drbd_thread_stop(&connection->receiver);
  drbd_thread_stop(&connection->worker);
 }
}

/* Make sure IO is suspended before calling this function(). */
static void drbd_suspend_al(struct drbd_device *device)
{
 int s = 0;

 if (!lc_try_lock(device->act_log)) {
  drbd_warn(device, "Failed to lock al in drbd_suspend_al()\n");
  return;
 }

 drbd_al_shrink(device);
 spin_lock_irq(&device->resource->req_lock);
 if (device->state.conn < C_CONNECTED)
  s = !test_and_set_bit(AL_SUSPENDED, &device->flags);
 spin_unlock_irq(&device->resource->req_lock);
 lc_unlock(device->act_log);

 if (s)
  drbd_info(device, "Suspended AL updates\n");
}


static bool should_set_defaults(struct genl_info *info)
{
 struct drbd_genlmsghdr *dh = genl_info_userhdr(info);

 return 0 != (dh->flags & DRBD_GENL_F_SET_DEFAULTS);
}

static unsigned int drbd_al_extents_max(struct drbd_backing_dev *bdev)
{
 /* This is limited by 16 bit "slot" numbers,
 * and by available on-disk context storage.
 *
 * Also (u16)~0 is special (denotes a "free" extent).
 *
 * One transaction occupies one 4kB on-disk block,
 * we have n such blocks in the on disk ring buffer,
 * the "current" transaction may fail (n-1),
 * and there is 919 slot numbers context information per transaction.
 *
 * 72 transaction blocks amounts to more than 2**16 context slots,
 * so cap there first.
 */

 const unsigned int max_al_nr = DRBD_AL_EXTENTS_MAX;
 const unsigned int sufficient_on_disk =
  (max_al_nr + AL_CONTEXT_PER_TRANSACTION -1)
  /AL_CONTEXT_PER_TRANSACTION;

 unsigned int al_size_4k = bdev->md.al_size_4k;

 if (al_size_4k > sufficient_on_disk)
  return max_al_nr;

 return (al_size_4k - 1) * AL_CONTEXT_PER_TRANSACTION;
}

static bool write_ordering_changed(struct disk_conf *a, struct disk_conf *b)
{
 return a->disk_barrier != b->disk_barrier ||
  a->disk_flushes != b->disk_flushes ||
  a->disk_drain != b->disk_drain;
}

static void sanitize_disk_conf(struct drbd_device *device, struct disk_conf *disk_conf,
          struct drbd_backing_dev *nbc)
{
 struct block_device *bdev = nbc->backing_bdev;

 if (disk_conf->al_extents < DRBD_AL_EXTENTS_MIN)
  disk_conf->al_extents = DRBD_AL_EXTENTS_MIN;
 if (disk_conf->al_extents > drbd_al_extents_max(nbc))
  disk_conf->al_extents = drbd_al_extents_max(nbc);

 if (!bdev_max_discard_sectors(bdev)) {
  if (disk_conf->rs_discard_granularity) {
   disk_conf->rs_discard_granularity = 0; /* disable feature */
   drbd_info(device, "rs_discard_granularity feature disabled\n");
  }
 }

 if (disk_conf->rs_discard_granularity) {
  int orig_value = disk_conf->rs_discard_granularity;
  sector_t discard_size = bdev_max_discard_sectors(bdev) << 9;
  unsigned int discard_granularity = bdev_discard_granularity(bdev);
  int remainder;

  if (discard_granularity > disk_conf->rs_discard_granularity)
   disk_conf->rs_discard_granularity = discard_granularity;

  remainder = disk_conf->rs_discard_granularity %
    discard_granularity;
  disk_conf->rs_discard_granularity += remainder;

  if (disk_conf->rs_discard_granularity > discard_size)
   disk_conf->rs_discard_granularity = discard_size;

  if (disk_conf->rs_discard_granularity != orig_value)
   drbd_info(device, "rs_discard_granularity changed to %d\n",
      disk_conf->rs_discard_granularity);
 }
}

static int disk_opts_check_al_size(struct drbd_device *device, struct disk_conf *dc)
{
 int err = -EBUSY;

 if (device->act_log &&
     device->act_log->nr_elements == dc->al_extents)
  return 0;

 drbd_suspend_io(device);
 /* If IO completion is currently blocked, we would likely wait
 * "forever" for the activity log to become unused. So we don't. */

 if (atomic_read(&device->ap_bio_cnt))
  goto out;

 wait_event(device->al_wait, lc_try_lock(device->act_log));
 drbd_al_shrink(device);
 err = drbd_check_al_size(device, dc);
 lc_unlock(device->act_log);
 wake_up(&device->al_wait);
out:
 drbd_resume_io(device);
 return err;
}

int drbd_adm_disk_opts(struct sk_buff *skb, struct genl_info *info)
{
 struct drbd_config_context adm_ctx;
 enum drbd_ret_code retcode;
 struct drbd_device *device;
 struct disk_conf *new_disk_conf, *old_disk_conf;
 struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
 int err;
 unsigned int fifo_size;

 retcode = drbd_adm_prepare(&adm_ctx, skb, info, DRBD_ADM_NEED_MINOR);
 if (!adm_ctx.reply_skb)
  return retcode;
 if (retcode != NO_ERROR)
  goto finish;

 device = adm_ctx.device;
 mutex_lock(&adm_ctx.resource->adm_mutex);

 /* we also need a disk
 * to change the options on */

 if (!get_ldev(device)) {
  retcode = ERR_NO_DISK;
  goto out;
 }

 new_disk_conf = kmalloc(sizeof(struct disk_conf), GFP_KERNEL);
 if (!new_disk_conf) {
  retcode = ERR_NOMEM;
  goto fail;
 }

 mutex_lock(&device->resource->conf_update);
 old_disk_conf = device->ldev->disk_conf;
 *new_disk_conf = *old_disk_conf;
 if (should_set_defaults(info))
  set_disk_conf_defaults(new_disk_conf);

 err = disk_conf_from_attrs_for_change(new_disk_conf, info);
 if (err && err != -ENOMSG) {
  retcode = ERR_MANDATORY_TAG;
  drbd_msg_put_info(adm_ctx.reply_skb, from_attrs_err_to_txt(err));
  goto fail_unlock;
 }

 if (!expect(device, new_disk_conf->resync_rate >= 1))
  new_disk_conf->resync_rate = 1;

 sanitize_disk_conf(device, new_disk_conf, device->ldev);

 if (new_disk_conf->c_plan_ahead > DRBD_C_PLAN_AHEAD_MAX)
  new_disk_conf->c_plan_ahead = DRBD_C_PLAN_AHEAD_MAX;

 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
 if (fifo_size != device->rs_plan_s->size) {
  new_plan = fifo_alloc(fifo_size);
  if (!new_plan) {
   drbd_err(device, "kmalloc of fifo_buffer failed");
   retcode = ERR_NOMEM;
   goto fail_unlock;
  }
 }

 err = disk_opts_check_al_size(device, new_disk_conf);
 if (err) {
  /* Could be just "busy". Ignore?
 * Introduce dedicated error code? */

  drbd_msg_put_info(adm_ctx.reply_skb,
   "Try again without changing current al-extents setting");
  retcode = ERR_NOMEM;
  goto fail_unlock;
 }

 lock_all_resources();
 retcode = drbd_resync_after_valid(device, new_disk_conf->resync_after);
 if (retcode == NO_ERROR) {
  rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
  drbd_resync_after_changed(device);
 }
 unlock_all_resources();

 if (retcode != NO_ERROR)
  goto fail_unlock;

 if (new_plan) {
  old_plan = device->rs_plan_s;
  rcu_assign_pointer(device->rs_plan_s, new_plan);
 }

 mutex_unlock(&device->resource->conf_update);

 if (new_disk_conf->al_updates)
  device->ldev->md.flags &= ~MDF_AL_DISABLED;
 else
  device->ldev->md.flags |= MDF_AL_DISABLED;

 if (new_disk_conf->md_flushes)
  clear_bit(MD_NO_FUA, &device->flags);
 else
  set_bit(MD_NO_FUA, &device->flags);

 if (write_ordering_changed(old_disk_conf, new_disk_conf))
  drbd_bump_write_ordering(device->resource, NULL, WO_BDEV_FLUSH);

 if (old_disk_conf->discard_zeroes_if_aligned !=
     new_disk_conf->discard_zeroes_if_aligned)
  drbd_reconsider_queue_parameters(device, device->ldev, NULL);

 drbd_md_sync(device);

 if (device->state.conn >= C_CONNECTED) {
  struct drbd_peer_device *peer_device;

  for_each_peer_device(peer_device, device)
   drbd_send_sync_param(peer_device);
 }

 kvfree_rcu_mightsleep(old_disk_conf);
 kfree(old_plan);
 mod_timer(&device->request_timer, jiffies + HZ);
 goto success;

fail_unlock:
 mutex_unlock(&device->resource->conf_update);
 fail:
 kfree(new_disk_conf);
 kfree(new_plan);
success:
 put_ldev(device);
 out:
 mutex_unlock(&adm_ctx.resource->adm_mutex);
 finish:
 drbd_adm_finish(&adm_ctx, info, retcode);
 return 0;
}

static struct file *open_backing_dev(struct drbd_device *device,
  const char *bdev_path, void *claim_ptr, bool do_bd_link)
{
 struct file *file;
 int err = 0;

 file = bdev_file_open_by_path(bdev_path, BLK_OPEN_READ | BLK_OPEN_WRITE,
          claim_ptr, NULL);
 if (IS_ERR(file)) {
  drbd_err(device, "open(\"%s\") failed with %ld\n",
    bdev_path, PTR_ERR(file));
  return file;
 }

 if (!do_bd_link)
  return file;

 err = bd_link_disk_holder(file_bdev(file), device->vdisk);
 if (err) {
  fput(file);
  drbd_err(device, "bd_link_disk_holder(\"%s\", ...) failed with %d\n",
    bdev_path, err);
  file = ERR_PTR(err);
 }
 return file;
}

static int open_backing_devices(struct drbd_device *device,
  struct disk_conf *new_disk_conf,
  struct drbd_backing_dev *nbc)
{
 struct file *file;

 file = open_backing_dev(device, new_disk_conf->backing_dev, device,
      true);
 if (IS_ERR(file))
  return ERR_OPEN_DISK;
 nbc->backing_bdev = file_bdev(file);
 nbc->backing_bdev_file = file;

 /*
 * meta_dev_idx >= 0: external fixed size, possibly multiple
 * drbd sharing one meta device.  TODO in that case, paranoia
 * check that [md_bdev, meta_dev_idx] is not yet used by some
 * other drbd minor!  (if you use drbd.conf + drbdadm, that
 * should check it for you already; but if you don't, or
 * someone fooled it, we need to double check here)
 */

 file = open_backing_dev(device, new_disk_conf->meta_dev,
  /* claim ptr: device, if claimed exclusively; shared drbd_m_holder,
 * if potentially shared with other drbd minors */

   (new_disk_conf->meta_dev_idx < 0) ? (void*)device : (void*)drbd_m_holder,
  /* avoid double bd_claim_by_disk() for the same (source,target) tuple,
 * as would happen with internal metadata. */

   (new_disk_conf->meta_dev_idx != DRBD_MD_INDEX_FLEX_INT &&
    new_disk_conf->meta_dev_idx != DRBD_MD_INDEX_INTERNAL));
 if (IS_ERR(file))
  return ERR_OPEN_MD_DISK;
 nbc->md_bdev = file_bdev(file);
 nbc->f_md_bdev = file;
 return NO_ERROR;
}

static void close_backing_dev(struct drbd_device *device,
  struct file *bdev_file, bool do_bd_unlink)
{
 if (!bdev_file)
  return;
 if (do_bd_unlink)
  bd_unlink_disk_holder(file_bdev(bdev_file), device->vdisk);
 fput(bdev_file);
}

void drbd_backing_dev_free(struct drbd_device *device, struct drbd_backing_dev *ldev)
{
 if (ldev == NULL)
  return;

 close_backing_dev(device, ldev->f_md_bdev,
     ldev->md_bdev != ldev->backing_bdev);
 close_backing_dev(device, ldev->backing_bdev_file, true);

 kfree(ldev->disk_conf);
 kfree(ldev);
}

int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
{
 struct drbd_config_context adm_ctx;
 struct drbd_device *device;
 struct drbd_peer_device *peer_device;
 struct drbd_connection *connection;
 int err;
 enum drbd_ret_code retcode;
 enum determine_dev_size dd;
 sector_t max_possible_sectors;
 sector_t min_md_device_sectors;
 struct drbd_backing_dev *nbc = NULL; /* new_backing_conf */
 struct disk_conf *new_disk_conf = NULL;
 struct lru_cache *resync_lru = NULL;
 struct fifo_buffer *new_plan = NULL;
 union drbd_state ns, os;
 enum drbd_state_rv rv;
 struct net_conf *nc;

 retcode = drbd_adm_prepare(&adm_ctx, skb, info, DRBD_ADM_NEED_MINOR);
 if (!adm_ctx.reply_skb)
  return retcode;
 if (retcode != NO_ERROR)
  goto finish;

 device = adm_ctx.device;
 mutex_lock(&adm_ctx.resource->adm_mutex);
 peer_device = first_peer_device(device);
 connection = peer_device->connection;
 conn_reconfig_start(connection);

 /* if you want to reconfigure, please tear down first */
 if (device->state.disk > D_DISKLESS) {
  retcode = ERR_DISK_CONFIGURED;
  goto fail;
 }
 /* It may just now have detached because of IO error.  Make sure
 * drbd_ldev_destroy is done already, we may end up here very fast,
 * e.g. if someone calls attach from the on-io-error handler,
 * to realize a "hot spare" feature (not that I'd recommend that) */

 wait_event(device->misc_wait, !test_bit(GOING_DISKLESS, &device->flags));

 /* make sure there is no leftover from previous force-detach attempts */
 clear_bit(FORCE_DETACH, &device->flags);
 clear_bit(WAS_IO_ERROR, &device->flags);
 clear_bit(WAS_READ_ERROR, &device->flags);

 /* and no leftover from previously aborted resync or verify, either */
 device->rs_total = 0;
 device->rs_failed = 0;
 atomic_set(&device->rs_pending_cnt, 0);

 /* allocation not in the IO path, drbdsetup context */
 nbc = kzalloc(sizeof(struct drbd_backing_dev), GFP_KERNEL);
 if (!nbc) {
  retcode = ERR_NOMEM;
  goto fail;
 }
 spin_lock_init(&nbc->md.uuid_lock);

 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
 if (!new_disk_conf) {
  retcode = ERR_NOMEM;
  goto fail;
 }
 nbc->disk_conf = new_disk_conf;

 set_disk_conf_defaults(new_disk_conf);
 err = disk_conf_from_attrs(new_disk_conf, info);
 if (err) {
  retcode = ERR_MANDATORY_TAG;
  drbd_msg_put_info(adm_ctx.reply_skb, from_attrs_err_to_txt(err));
  goto fail;
 }

 if (new_disk_conf->c_plan_ahead > DRBD_C_PLAN_AHEAD_MAX)
  new_disk_conf->c_plan_ahead = DRBD_C_PLAN_AHEAD_MAX;

 new_plan = fifo_alloc((new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ);
 if (!new_plan) {
  retcode = ERR_NOMEM;
  goto fail;
 }

 if (new_disk_conf->meta_dev_idx < DRBD_MD_INDEX_FLEX_INT) {
  retcode = ERR_MD_IDX_INVALID;
  goto fail;
 }

 rcu_read_lock();
 nc = rcu_dereference(connection->net_conf);
 if (nc) {
  if (new_disk_conf->fencing == FP_STONITH && nc->wire_protocol == DRBD_PROT_A) {
   rcu_read_unlock();
   retcode = ERR_STONITH_AND_PROT_A;
   goto fail;
  }
 }
 rcu_read_unlock();

 retcode = open_backing_devices(device, new_disk_conf, nbc);
 if (retcode != NO_ERROR)
  goto fail;

 if ((nbc->backing_bdev == nbc->md_bdev) !=
     (new_disk_conf->meta_dev_idx == DRBD_MD_INDEX_INTERNAL ||
      new_disk_conf->meta_dev_idx == DRBD_MD_INDEX_FLEX_INT)) {
  retcode = ERR_MD_IDX_INVALID;
  goto fail;
 }

 resync_lru = lc_create("resync", drbd_bm_ext_cache,
   1, 61, sizeof(struct bm_extent),
   offsetof(struct bm_extent, lce));
 if (!resync_lru) {
  retcode = ERR_NOMEM;
  goto fail;
 }

 /* Read our meta data super block early.
 * This also sets other on-disk offsets. */

 retcode = drbd_md_read(device, nbc);
 if (retcode != NO_ERROR)
  goto fail;

 sanitize_disk_conf(device, new_disk_conf, nbc);

 if (drbd_get_max_capacity(nbc) < new_disk_conf->disk_size) {
  drbd_err(device, "max capacity %llu smaller than disk size %llu\n",
   (unsigned long long) drbd_get_max_capacity(nbc),
   (unsigned long long) new_disk_conf->disk_size);
  retcode = ERR_DISK_TOO_SMALL;
  goto fail;
 }

 if (new_disk_conf->meta_dev_idx < 0) {
  max_possible_sectors = DRBD_MAX_SECTORS_FLEX;
  /* at least one MB, otherwise it does not make sense */
  min_md_device_sectors = (2<<10);
 } else {
  max_possible_sectors = DRBD_MAX_SECTORS;
  min_md_device_sectors = MD_128MB_SECT * (new_disk_conf->meta_dev_idx + 1);
 }

 if (drbd_get_capacity(nbc->md_bdev) < min_md_device_sectors) {
  retcode = ERR_MD_DISK_TOO_SMALL;
  drbd_warn(device, "refusing attach: md-device too small, "
       "at least %llu sectors needed for this meta-disk type\n",
       (unsigned long long) min_md_device_sectors);
  goto fail;
 }

 /* Make sure the new disk is big enough
 * (we may currently be R_PRIMARY with no local disk...) */

 if (drbd_get_max_capacity(nbc) < get_capacity(device->vdisk)) {
  retcode = ERR_DISK_TOO_SMALL;
  goto fail;
 }

 nbc->known_size = drbd_get_capacity(nbc->backing_bdev);

 if (nbc->known_size > max_possible_sectors) {
  drbd_warn(device, "==> truncating very big lower level device "
   "to currently maximum possible %llu sectors <==\n",
   (unsigned long long) max_possible_sectors);
  if (new_disk_conf->meta_dev_idx >= 0)
   drbd_warn(device, "==>> using internal or flexible "
          "meta data may help <<==\n");
 }

 drbd_suspend_io(device);
 /* also wait for the last barrier ack. */
 /* FIXME see also https://daiquiri.linbit/cgi-bin/bugzilla/show_bug.cgi?id=171
 * We need a way to either ignore barrier acks for barriers sent before a device
 * was attached, or a way to wait for all pending barrier acks to come in.
 * As barriers are counted per resource,
 * we'd need to suspend io on all devices of a resource.
 */

 wait_event(device->misc_wait, !atomic_read(&device->ap_pending_cnt) || drbd_suspended(device));
 /* and for any other previously queued work */
 drbd_flush_workqueue(&connection->sender_work);

 rv = _drbd_request_state(device, NS(disk, D_ATTACHING), CS_VERBOSE);
 retcode = (enum drbd_ret_code)rv;
 drbd_resume_io(device);
 if (rv < SS_SUCCESS)
  goto fail;

 if (!get_ldev_if_state(device, D_ATTACHING))
  goto force_diskless;

 if (!device->bitmap) {
  if (drbd_bm_init(device)) {
   retcode = ERR_NOMEM;
   goto force_diskless_dec;
  }
 }

 if (device->state.pdsk != D_UP_TO_DATE && device->ed_uuid &&
     (device->state.role == R_PRIMARY || device->state.peer == R_PRIMARY) &&
            (device->ed_uuid & ~((u64)1)) != (nbc->md.uuid[UI_CURRENT] & ~((u64)1))) {
  drbd_err(device, "Can only attach to data with current UUID=%016llX\n",
      (unsigned long long)device->ed_uuid);
  retcode = ERR_DATA_NOT_CURRENT;
  goto force_diskless_dec;
 }

 /* Since we are diskless, fix the activity log first... */
 if (drbd_check_al_size(device, new_disk_conf)) {
  retcode = ERR_NOMEM;
  goto force_diskless_dec;
 }

 /* Prevent shrinking of consistent devices ! */
 {
 unsigned long long nsz = drbd_new_dev_size(device, nbc, nbc->disk_conf->disk_size, 0);
 unsigned long long eff = nbc->md.la_size_sect;
 if (drbd_md_test_flag(nbc, MDF_CONSISTENT) && nsz < eff) {
  if (nsz == nbc->disk_conf->disk_size) {
   drbd_warn(device, "truncating a consistent device during attach (%llu < %llu)\n", nsz, eff);
  } else {
   drbd_warn(device, "refusing to truncate a consistent device (%llu < %llu)\n", nsz, eff);
   drbd_msg_sprintf_info(adm_ctx.reply_skb,
    "To-be-attached device has last effective > current size, and is consistent\n"
    "(%llu > %llu sectors). Refusing to attach.", eff, nsz);
   retcode = ERR_IMPLICIT_SHRINK;
   goto force_diskless_dec;
  }
 }
 }

 lock_all_resources();
 retcode = drbd_resync_after_valid(device, new_disk_conf->resync_after);
 if (retcode != NO_ERROR) {
  unlock_all_resources();
  goto force_diskless_dec;
 }

 /* Reset the "barriers don't work" bits here, then force meta data to
 * be written, to ensure we determine if barriers are supported. */

 if (new_disk_conf->md_flushes)
  clear_bit(MD_NO_FUA, &device->flags);
 else
  set_bit(MD_NO_FUA, &device->flags);

 /* Point of no return reached.
 * Devices and memory are no longer released by error cleanup below.
 * now device takes over responsibility, and the state engine should
 * clean it up somewhere.  */

 D_ASSERT(device, device->ldev == NULL);
 device->ldev = nbc;
 device->resync = resync_lru;
 device->rs_plan_s = new_plan;
 nbc = NULL;
 resync_lru = NULL;
 new_disk_conf = NULL;
 new_plan = NULL;

 drbd_resync_after_changed(device);
 drbd_bump_write_ordering(device->resource, device->ldev, WO_BDEV_FLUSH);
 unlock_all_resources();

 if (drbd_md_test_flag(device->ldev, MDF_CRASHED_PRIMARY))
  set_bit(CRASHED_PRIMARY, &device->flags);
 else
  clear_bit(CRASHED_PRIMARY, &device->flags);

 if (drbd_md_test_flag(device->ldev, MDF_PRIMARY_IND) &&
     !(device->state.role == R_PRIMARY && device->resource->susp_nod))
  set_bit(CRASHED_PRIMARY, &device->flags);

 device->send_cnt = 0;
 device->recv_cnt = 0;
 device->read_cnt = 0;
 device->writ_cnt = 0;

 drbd_reconsider_queue_parameters(device, device->ldev, NULL);

 /* If I am currently not R_PRIMARY,
 * but meta data primary indicator is set,
 * I just now recover from a hard crash,
 * and have been R_PRIMARY before that crash.
 *
 * Now, if I had no connection before that crash
 * (have been degraded R_PRIMARY), chances are that
 * I won't find my peer now either.
 *
 * In that case, and _only_ in that case,
 * we use the degr-wfc-timeout instead of the default,
 * so we can automatically recover from a crash of a
 * degraded but active "cluster" after a certain timeout.
 */

 clear_bit(USE_DEGR_WFC_T, &device->flags);
 if (device->state.role != R_PRIMARY &&
      drbd_md_test_flag(device->ldev, MDF_PRIMARY_IND) &&
     !drbd_md_test_flag(device->ldev, MDF_CONNECTED_IND))
  set_bit(USE_DEGR_WFC_T, &device->flags);

 dd = drbd_determine_dev_size(device, 0, NULL);
 if (dd <= DS_ERROR) {
  retcode = ERR_NOMEM_BITMAP;
  goto force_diskless_dec;
 } else if (dd == DS_GREW)
  set_bit(RESYNC_AFTER_NEG, &device->flags);

 if (drbd_md_test_flag(device->ldev, MDF_FULL_SYNC) ||
     (test_bit(CRASHED_PRIMARY, &device->flags) &&
      drbd_md_test_flag(device->ldev, MDF_AL_DISABLED))) {
  drbd_info(device, "Assuming that all blocks are out of sync "
       "(aka FullSync)\n");
  if (drbd_bitmap_io(device, &drbd_bmio_set_n_write,
   "set_n_write from attaching", BM_LOCKED_MASK,
   NULL)) {
   retcode = ERR_IO_MD_DISK;
   goto force_diskless_dec;
  }
 } else {
  if (drbd_bitmap_io(device, &drbd_bm_read,
   "read from attaching", BM_LOCKED_MASK,
   NULL)) {
   retcode = ERR_IO_MD_DISK;
   goto force_diskless_dec;
  }
 }

 if (_drbd_bm_total_weight(device) == drbd_bm_bits(device))
  drbd_suspend_al(device); /* IO is still suspended here... */

 spin_lock_irq(&device->resource->req_lock);
 os = drbd_read_state(device);
 ns = os;
 /* If MDF_CONSISTENT is not set go into inconsistent state,
   otherwise investigate MDF_WasUpToDate...
   If MDF_WAS_UP_TO_DATE is not set go into D_OUTDATED disk state,
   otherwise into D_CONSISTENT state.
*/

 if (drbd_md_test_flag(device->ldev, MDF_CONSISTENT)) {
  if (drbd_md_test_flag(device->ldev, MDF_WAS_UP_TO_DATE))
   ns.disk = D_CONSISTENT;
  else
   ns.disk = D_OUTDATED;
 } else {
  ns.disk = D_INCONSISTENT;
 }

 if (drbd_md_test_flag(device->ldev, MDF_PEER_OUT_DATED))
  ns.pdsk = D_OUTDATED;

 rcu_read_lock();
 if (ns.disk == D_CONSISTENT &&
     (ns.pdsk == D_OUTDATED || rcu_dereference(device->ldev->disk_conf)->fencing == FP_DONT_CARE))
  ns.disk = D_UP_TO_DATE;

 /* All tests on MDF_PRIMARY_IND, MDF_CONNECTED_IND,
   MDF_CONSISTENT and MDF_WAS_UP_TO_DATE must happen before
   this point, because drbd_request_state() modifies these
   flags. */


 if (rcu_dereference(device->ldev->disk_conf)->al_updates)
  device->ldev->md.flags &= ~MDF_AL_DISABLED;
 else
  device->ldev->md.flags |= MDF_AL_DISABLED;

 rcu_read_unlock();

 /* In case we are C_CONNECTED postpone any decision on the new disk
   state after the negotiation phase. */

 if (device->state.conn == C_CONNECTED) {
  device->new_state_tmp.i = ns.i;
  ns.i = os.i;
  ns.disk = D_NEGOTIATING;

  /* We expect to receive up-to-date UUIDs soon.
   To avoid a race in receive_state, free p_uuid while
   holding req_lock. I.e. atomic with the state change */

  kfree(device->p_uuid);
  device->p_uuid = NULL;
 }

 rv = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
 spin_unlock_irq(&device->resource->req_lock);

 if (rv < SS_SUCCESS)
  goto force_diskless_dec;

 mod_timer(&device->request_timer, jiffies + HZ);

 if (device->state.role == R_PRIMARY)
  device->ldev->md.uuid[UI_CURRENT] |=  (u64)1;
 else
  device->ldev->md.uuid[UI_CURRENT] &= ~(u64)1;

 drbd_md_mark_dirty(device);
 drbd_md_sync(device);

 kobject_uevent(&disk_to_dev(device->vdisk)->kobj, KOBJ_CHANGE);
 put_ldev(device);
 conn_reconfig_done(connection);
 mutex_unlock(&adm_ctx.resource->adm_mutex);
 drbd_adm_finish(&adm_ctx, info, retcode);
 return 0;

 force_diskless_dec:
 put_ldev(device);
 force_diskless:
 drbd_force_state(device, NS(disk, D_DISKLESS));
 drbd_md_sync(device);
 fail:
 conn_reconfig_done(connection);
 if (nbc) {
  close_backing_dev(device, nbc->f_md_bdev,
     nbc->md_bdev != nbc->backing_bdev);
--> --------------------

--> maximum size reached

--> --------------------

Messung V0.5
C=92 H=92 G=91

¤ Dauer der Verarbeitung: 0.17 Sekunden  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.