/* rbd.c -- Export ceph rados objects as a Linux block device
based on drivers/block/osdblk.c:
Copyright 2009 Red Hat, Inc.
This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with this program; see the file COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
/* * Increment the given counter and return its updated value. * If the counter is already 0 it will not be incremented. * If the counter is already at its maximum value returns * -EINVAL without updating it.
*/ staticint atomic_inc_return_safe(atomic_t *v)
{ unsignedint counter;
#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
#define RBD_SNAP_HEAD_NAME "-"
#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
/* This allows a single page to hold an image name sent by OSD */ #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1) #define RBD_IMAGE_ID_LEN_MAX 64
/* Features supported by this (client software) implementation. */
#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
/* * An RBD device name will be "rbd#", where the "rbd" comes from * RBD_DRV_NAME above, and # is a unique integer identifier.
*/ #define DEV_NAME_LEN 32
/* * block device image metadata (in-memory version)
*/ struct rbd_image_header { /* These six fields never change for a given rbd image */ char *object_prefix;
__u8 obj_order;
u64 stripe_unit;
u64 stripe_count;
s64 data_pool_id;
u64 features; /* Might be changeable someday? */
/* The remaining fields need to be updated occasionally */
u64 image_size; struct ceph_snap_context *snapc; char *snap_names; /* format 1 only */
u64 *snap_sizes; /* format 1 only */
};
/* * An rbd image specification. * * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely * identify an image. Each rbd_dev structure includes a pointer to * an rbd_spec structure that encapsulates this identity. * * Each of the id's in an rbd_spec has an associated name. For a * user-mapped image, the names are supplied and the id's associated * with them are looked up. For a layered image, a parent image is * defined by the tuple, and the names are looked up. * * An rbd_dev structure contains a parent_spec pointer which is * non-null if the image it represents is a child in a layered * image. This pointer will refer to the rbd_spec structure used * by the parent rbd_dev for its own identity (i.e., the structure * is shared between the parent and child). * * Since these structures are populated once, during the discovery * phase of image construction, they are effectively immutable so * we make no effort to synchronize access to them. * * Note that code herein does not assume the image name is known (it * could be a null pointer).
*/ struct rbd_spec {
u64 pool_id; constchar *pool_name; constchar *pool_ns; /* NULL if default, never "" */
constchar *image_id; constchar *image_name;
u64 snap_id; constchar *snap_name;
struct kref kref;
};
/* * an instance of the client. multiple devices may share an rbd client.
*/ struct rbd_client { struct ceph_client *client; struct kref kref; struct list_head node;
};
struct pending_result { int result; /* first nonzero result */ int num_pending;
};
struct rbd_img_request;
enum obj_request_type {
OBJ_REQUEST_NODATA = 1,
OBJ_REQUEST_BIO, /* pointer into provided bio (list) */
OBJ_REQUEST_BVECS, /* pointer into provided bio_vec array */
OBJ_REQUEST_OWN_BVECS, /* private bio_vec array, doesn't own pages */
};
/* protects updating the header */ struct rw_semaphore header_rwsem;
struct rbd_mapping mapping;
struct list_head node;
/* sysfs related */ struct device dev; unsignedlong open_count; /* protected by lock */
};
/* * Flag bits for rbd_dev->flags: * - REMOVING (which is coupled with rbd_dev->open_count) is protected * by rbd_dev->lock
*/ enum rbd_dev_flags {
RBD_DEV_FLAG_EXISTS, /* rbd_dev_device_setup() ran */
RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
RBD_DEV_FLAG_READONLY, /* -o ro or snapshot */
};
/* * single-major requires >= 0.75 version of userspace rbd utility.
*/ staticbool single_major = true;
module_param(single_major, bool, 0444);
MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
rbdc->client = ceph_create_client(ceph_opts, rbdc); if (IS_ERR(rbdc->client)) goto out_rbdc;
ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
ret = ceph_open_session(rbdc->client); if (ret < 0) goto out_client;
/* * Drop reference to ceph client node. If it's not referenced anymore, release * it.
*/ staticvoid rbd_put_client(struct rbd_client *rbdc)
{ if (rbdc)
kref_put(&rbdc->kref, rbd_client_release);
}
/* * Get a ceph client with specific addr and configuration, if one does * not exist create it. Either way, ceph_opts is consumed by this * function.
*/ staticstruct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
{ struct rbd_client *rbdc; int ret;
mutex_lock(&client_mutex);
rbdc = rbd_client_find(ceph_opts); if (rbdc) {
ceph_destroy_options(ceph_opts);
/* * Using an existing client. Make sure ->pg_pools is up to * date before we look up the pool id in do_rbd_add().
*/
ret = ceph_wait_for_latest_osdmap(rbdc->client,
rbdc->client->options->mount_timeout); if (ret) {
rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
rbd_put_client(rbdc);
rbdc = ERR_PTR(ret);
}
} else {
rbdc = rbd_client_create(ceph_opts);
}
mutex_unlock(&client_mutex);
/* The header has to start with the magic rbd header text */ if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT))) returnfalse;
/* The bio layer requires at least sector-sized I/O */
if (ondisk->options.order < SECTOR_SHIFT) returnfalse;
/* If we use u64 in a few spots we may be able to loosen this */
if (ondisk->options.order > 8 * sizeof (int) - 1) returnfalse;
/* * The size of a snapshot header has to fit in a size_t, and * that limits the number of snapshots.
*/
snap_count = le32_to_cpu(ondisk->snap_count);
size = SIZE_MAX - sizeof (struct ceph_snap_context); if (snap_count > size / sizeof (__le64)) returnfalse;
/* * Not only that, but the size of the entire the snapshot * header must also be representable in a size_t.
*/
size -= snap_count * sizeof (__le64); if ((u64) size < le64_to_cpu(ondisk->snap_names_len)) returnfalse;
returntrue;
}
/* * returns the size of an object in the image
*/ static u32 rbd_obj_bytes(struct rbd_image_header *header)
{ return 1U << header->obj_order;
}
if (snap_names_len > (u64)SIZE_MAX) goto out_2big;
snap_names = kmalloc(snap_names_len, GFP_KERNEL); if (!snap_names) goto out_err;
/* ...as well as the array of their sizes. */
snap_sizes = kmalloc_array(snap_count, sizeof(*header->snap_sizes),
GFP_KERNEL); if (!snap_sizes) goto out_err;
/* * Copy the names, and fill in each snapshot's id * and size. * * Note that rbd_dev_v1_header_info() guarantees the * ondisk buffer we're working with has * snap_names_len bytes beyond the end of the * snapshot id array, this memcpy() is safe.
*/
memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
snaps = ondisk->snaps; for (i = 0; i < snap_count; i++) {
snapc->snaps[i] = le64_to_cpu(snaps[i].id);
snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
}
}
/* We won't fail any more, fill in the header */
if (first_time) {
header->object_prefix = object_prefix;
header->obj_order = ondisk->options.order;
}
/* The remaining fields always get updated (when we refresh) */
/* Skip over names until we find the one we are looking for */
snap_name = rbd_dev->header.snap_names; while (which--)
snap_name += strlen(snap_name) + 1;
return kstrdup(snap_name, GFP_KERNEL);
}
/* * Snapshot id comparison function for use with qsort()/bsearch(). * Note that result is for snapshots in *descending* order.
*/ staticint snapid_compare_reverse(constvoid *s1, constvoid *s2)
{
u64 snap_id1 = *(u64 *)s1;
u64 snap_id2 = *(u64 *)s2;
/* * Search a snapshot context to see if the given snapshot id is * present. * * Returns the position of the snapshot id in the array if it's found, * or BAD_SNAP_INDEX otherwise. * * Note: The snapshot array is in kept sorted (by the osd) in * reverse order, highest snapshot id first.
*/ static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
{ struct ceph_snap_context *snapc = rbd_dev->header.snapc;
u64 *found;
found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps, sizeof (snap_id), snapid_compare_reverse);
return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
}
/* * Zero a range in @obj_req data buffer defined by a bio (list) or * (private) bio_vec array. * * @off is relative to the start of the data buffer.
*/ staticvoid rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
u32 bytes)
{
dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
switch (obj_req->img_request->data_type) { case OBJ_REQUEST_BIO:
zero_bios(&obj_req->bio_pos, off, bytes); break; case OBJ_REQUEST_BVECS: case OBJ_REQUEST_OWN_BVECS:
zero_bvecs(&obj_req->bvec_pos, off, bytes); break; default:
BUG();
}
}
/* * The default/initial value for all image request flags is 0. Each * is conditionally set to 1 at image request initialization time * and currently never change thereafter.
*/ staticvoid img_request_layered_set(struct rbd_img_request *img_request)
{
set_bit(IMG_REQ_LAYERED, &img_request->flags);
}
/* * Must be called after rbd_obj_calc_img_extents().
*/ staticvoid rbd_obj_set_copyup_enabled(struct rbd_obj_request *obj_req)
{
rbd_assert(obj_req->img_request->snapc);
dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
osd_req->r_result, obj_req);
/* * Writes aren't allowed to return a data payload. In some * guarded write cases (e.g. stat + zero on an empty object) * a stat response makes it through, but we don't care.
*/ if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
result = 0; else
result = osd_req->r_result;
/* * Data objects may be stored in a separate pool, but always in * the same namespace in that pool as the header in its pool.
*/
ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
req->r_base_oloc.pool = rbd_dev->layout.pool_id;
ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
rbd_dev->header.object_prefix,
obj_req->ex.oe_objno); if (ret) return ERR_PTR(ret);
switch (obj_request->img_request->data_type) { case OBJ_REQUEST_NODATA: case OBJ_REQUEST_BIO: case OBJ_REQUEST_BVECS: break; /* Nothing to do */ case OBJ_REQUEST_OWN_BVECS:
kfree(obj_request->bvec_pos.bvecs); break; default:
BUG();
}
kfree(obj_request->img_extents); if (obj_request->copyup_bvecs) { for (i = 0; i < obj_request->copyup_bvec_count; i++) { if (obj_request->copyup_bvecs[i].bv_page)
__free_page(obj_request->copyup_bvecs[i].bv_page);
}
kfree(obj_request->copyup_bvecs);
}
/* * Parent image reference counting is used to determine when an * image's parent fields can be safely torn down--after there are no * more in-flight requests to the parent image. When the last * reference is dropped, cleaning them up is safe.
*/ staticvoid rbd_dev_parent_put(struct rbd_device *rbd_dev)
{ int counter;
if (!rbd_dev->parent_spec) return;
counter = atomic_dec_return_safe(&rbd_dev->parent_ref); if (counter > 0) return;
/* Last reference; clean up parent data structures */
if (!counter)
rbd_dev_unparent(rbd_dev); else
rbd_warn(rbd_dev, "parent reference underflow");
}
/* * If an image has a non-zero parent overlap, get a reference to its * parent. * * Returns true if the rbd device has a parent with a non-zero * overlap and a reference for it was successfully taken, or * false otherwise.
*/ staticbool rbd_dev_parent_get(struct rbd_device *rbd_dev)
{ int counter = 0;
if (!rbd_dev->parent_spec) returnfalse;
if (rbd_dev->parent_overlap)
counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
if (counter < 0)
rbd_warn(rbd_dev, "parent reference overflow");
/* * Only snap_id is captured here, for reads. For writes, snapshot * context is captured in rbd_img_object_requests() after exclusive * lock is ensured to be held.
*/ staticvoid rbd_img_capture_header(struct rbd_img_request *img_req)
{ struct rbd_device *rbd_dev = img_req->rbd_dev;
lockdep_assert_held(&rbd_dev->header_rwsem);
if (!rbd_img_is_write(img_req))
img_req->snap_id = rbd_dev->spec->snap_id;
if (rbd_dev_parent_get(rbd_dev))
img_request_layered_set(img_req);
}
spin_lock(&rbd_dev->object_map_lock);
state = __rbd_object_map_get(rbd_dev, objno);
spin_unlock(&rbd_dev->object_map_lock); return state;
}
staticbool use_object_map(struct rbd_device *rbd_dev)
{ /* * An image mapped read-only can't use the object map -- it isn't * loaded because the header lock isn't acquired. Someone else can * write to the image and update the object map behind our back. * * A snapshot can't be written to, so using the object map is always * safe.
*/ if (!rbd_is_snap(rbd_dev) && rbd_is_ro(rbd_dev)) returnfalse;
/* * This function needs snap_id (or more precisely just something to * distinguish between HEAD and snapshot object maps), new_state and * current_state that were passed to rbd_object_map_update(). * * To avoid allocating and stashing a context we piggyback on the OSD * request. A HEAD update has two ops (assert_locked). For new_state * and current_state we decode our own object_map_update op, encoded in * rbd_cls_object_map_update().
*/ staticint rbd_object_map_update_finish(struct rbd_obj_request *obj_req, struct ceph_osd_request *osd_req)
{ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; struct ceph_osd_data *osd_data;
u64 objno;
u8 state, new_state, current_state; bool has_current_state; void *p;
if (osd_req->r_result) return osd_req->r_result;
/* * Nothing to do for a snapshot object map.
*/ if (osd_req->r_num_ops == 1) return 0;
/* drop extents completely beyond the overlap */ while (cnt && img_extents[cnt - 1].fe_off >= overlap)
cnt--;
if (cnt) { struct ceph_file_extent *ex = &img_extents[cnt - 1];
/* trim final overlapping extent */ if (ex->fe_off + ex->fe_len > overlap)
ex->fe_len = overlap - ex->fe_off;
}
*num_img_extents = cnt;
}
/* * Determine the byte range(s) covered by either just the object extent * or the entire object in the parent image.
*/ staticint rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req, bool entire)
{ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; int ret;
if (!rbd_dev->parent_overlap) return 0;
ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
entire ? 0 : obj_req->ex.oe_off,
entire ? rbd_dev->layout.object_size :
obj_req->ex.oe_len,
&obj_req->img_extents,
&obj_req->num_img_extents); if (ret) return ret;
/* * Align the range to alloc_size boundary and punt on discards * that are too small to free up any space. * * alloc_size == object_size && is_tail() is a special case for * filestore with filestore_punch_hole = false, needed to allow * truncate (in addition to delete).
*/ if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
!rbd_obj_is_tail(obj_req)) {
off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
rbd_dev->opts->alloc_size); if (off >= next_off) return 1;
switch (obj_req->img_request->op_type) { case OBJ_OP_WRITE:
__rbd_osd_setup_write_ops(osd_req, which); break; case OBJ_OP_DISCARD:
__rbd_osd_setup_discard_ops(osd_req, which); break; case OBJ_OP_ZEROOUT:
__rbd_osd_setup_zeroout_ops(osd_req, which); break; default:
BUG();
}
}
/* * Prune the list of object requests (adjust offset and/or length, drop * redundant requests). Prepare object request state machines and image * request state machine for execution.
*/ staticint __rbd_img_fill_request(struct rbd_img_request *img_req)
{ struct rbd_obj_request *obj_req, *next_obj_req; int ret;
for_each_obj_request_safe(img_req, obj_req, next_obj_req) { switch (img_req->op_type) { case OBJ_OP_READ:
ret = rbd_obj_init_read(obj_req); break; case OBJ_OP_WRITE:
ret = rbd_obj_init_write(obj_req); break; case OBJ_OP_DISCARD:
ret = rbd_obj_init_discard(obj_req); break; case OBJ_OP_ZEROOUT:
ret = rbd_obj_init_zeroout(obj_req); break; default:
BUG();
} if (ret < 0) return ret; if (ret > 0) {
rbd_img_obj_request_del(img_req, obj_req); continue;
}
}
img_req->state = RBD_IMG_START; return 0;
}
union rbd_img_fill_iter { struct ceph_bio_iter bio_iter; struct ceph_bvec_iter bvec_iter;
};
struct rbd_img_fill_ctx { enum obj_request_type pos_type; union rbd_img_fill_iter *pos; union rbd_img_fill_iter iter;
ceph_object_extent_fn_t set_pos_fn;
ceph_object_extent_fn_t count_fn;
ceph_object_extent_fn_t copy_fn;
};
/* * While su != os && sc == 1 is technically not fancy (it's the same * layout as su == os && sc == 1), we can't use the nocopy path for it * because ->set_pos_fn() should be called only once per object. * ceph_file_to_extents() invokes action_fn once per stripe unit, so * treat su != os && sc == 1 as fancy.
*/ staticbool rbd_layout_is_fancy(struct ceph_file_layout *l)
{ return l->stripe_unit != l->object_size;
}
/* * Create object requests and set each object request's starting * position in the provided bio (list) or bio_vec array.
*/
fctx->iter = *fctx->pos; for (i = 0; i < num_img_extents; i++) {
ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
img_extents[i].fe_off,
img_extents[i].fe_len,
&img_req->object_extents,
alloc_object_extent, img_req,
fctx->set_pos_fn, &fctx->iter); if (ret) return ret;
}
return __rbd_img_fill_request(img_req);
}
/* * Map a list of image extents to a list of object extents, create the * corresponding object requests (normally each to a different object, * but not always) and add them to @img_req. For each object request, * set up its data descriptor to point to the corresponding chunk(s) of * @fctx->pos data buffer. * * Because ceph_file_to_extents() will merge adjacent object extents * together, each object request's data descriptor may point to multiple * different chunks of @fctx->pos data buffer. * * @fctx->pos data buffer is assumed to be large enough.
*/ staticint rbd_img_fill_request(struct rbd_img_request *img_req, struct ceph_file_extent *img_extents,
u32 num_img_extents, struct rbd_img_fill_ctx *fctx)
{ struct rbd_device *rbd_dev = img_req->rbd_dev; struct rbd_obj_request *obj_req;
u32 i; int ret;
/* * Create object requests and determine ->bvec_count for each object * request. Note that ->bvec_count sum over all object requests may * be greater than the number of bio_vecs in the provided bio (list) * or bio_vec array because when mapped, those bio_vecs can straddle * stripe unit boundaries.
*/
fctx->iter = *fctx->pos; for (i = 0; i < num_img_extents; i++) {
ret = ceph_file_to_extents(&rbd_dev->layout,
img_extents[i].fe_off,
img_extents[i].fe_len,
&img_req->object_extents,
alloc_object_extent, img_req,
fctx->count_fn, &fctx->iter); if (ret) return ret;
}
/* * Fill in each object request's private bio_vec array, splitting and * rearranging the provided bio_vecs in stripe unit chunks as needed.
*/
fctx->iter = *fctx->pos; for (i = 0; i < num_img_extents; i++) {
ret = ceph_iterate_extents(&rbd_dev->layout,
img_extents[i].fe_off,
img_extents[i].fe_len,
&img_req->object_extents,
fctx->copy_fn, &fctx->iter); if (ret) return ret;
}
ret = rbd_obj_read_object(obj_req); if (ret) {
*result = ret; returntrue;
}
obj_req->read_state = RBD_OBJ_READ_OBJECT; returnfalse; case RBD_OBJ_READ_OBJECT: if (*result == -ENOENT && rbd_dev->parent_overlap) { /* reverse map this object extent onto the parent */
ret = rbd_obj_calc_img_extents(obj_req, false); if (ret) {
*result = ret; returntrue;
} if (obj_req->num_img_extents) {
ret = rbd_obj_read_from_parent(obj_req); if (ret) {
*result = ret; returntrue;
}
obj_req->read_state = RBD_OBJ_READ_PARENT; returnfalse;
}
}
/* * -ENOENT means a hole in the image -- zero-fill the entire * length of the request. A short read also implies zero-fill * to the end of the request.
*/ if (*result == -ENOENT) {
rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
*result = 0;
} elseif (*result >= 0) { if (*result < obj_req->ex.oe_len)
rbd_obj_zero_range(obj_req, *result,
obj_req->ex.oe_len - *result); else
rbd_assert(*result == obj_req->ex.oe_len);
*result = 0;
} returntrue; case RBD_OBJ_READ_PARENT: /* * The parent image is read only up to the overlap -- zero-fill * from the overlap to the end of the request.
*/ if (!*result) {
u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req);
staticint rbd_obj_write_object(struct rbd_obj_request *obj_req)
{ struct ceph_osd_request *osd_req; int num_ops = count_write_ops(obj_req); int which = 0; int ret;
if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
num_ops++; /* stat */
osd_req = rbd_obj_add_osd_request(obj_req, num_ops); if (IS_ERR(osd_req)) return PTR_ERR(osd_req);
if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
ret = rbd_osd_setup_stat(osd_req, which++); if (ret) return ret;
}
osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1); if (IS_ERR(osd_req)) return PTR_ERR(osd_req);
ret = rbd_osd_setup_copyup(osd_req, 0, bytes); if (ret) return ret;
rbd_osd_format_write(osd_req);
ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO); if (ret) return ret;
rbd_osd_submit(osd_req); return 0;
}
staticint rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
u32 bytes)
{ struct ceph_osd_request *osd_req; int num_ops = count_write_ops(obj_req); int which = 0; int ret;
/* * The target object doesn't exist. Read the data for the entire * target object up to the overlap point (if any) from the parent, * so we can use it for a copyup.
*/ staticint rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
{ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; int ret;
rbd_assert(obj_req->num_img_extents);
prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
rbd_dev->parent_overlap); if (!obj_req->num_img_extents) { /* * The overlap has become 0 (most likely because the * image has been flattened). Re-submit the original write * request -- pass MODS_ONLY since the copyup isn't needed * anymore.
*/ return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
}
ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req)); if (ret) return ret;
/* * Only send non-zero copyup data to save some I/O and network * bandwidth -- zero copyup data is equivalent to the object not * existing.
*/ if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
bytes = 0;
if (obj_req->img_request->snapc->num_snaps && bytes > 0) { /* * Send a copyup request with an empty snapshot context to * deep-copyup the object through all existing snapshots. * A second request with the current snapshot context will be * sent for the actual modification.
*/
ret = rbd_obj_copyup_empty_snapc(obj_req, bytes); if (ret) {
obj_req->pending.result = ret; return;
}
if (done && *result) {
rbd_assert(*result < 0);
rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
} return done;
}
/* * This is open-coded in rbd_img_handle_request() to avoid parent chain * recursion.
*/ staticvoid rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
{ if (__rbd_obj_handle_request(obj_req, &result))
rbd_img_handle_request(obj_req->img_request, result);
}
/* * Note the use of mod_delayed_work() in rbd_acquire_lock() * and cancel_delayed_work() in wake_lock_waiters().
*/
dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); return 0;
}
/* * lock_rwsem must be held for write
*/ staticint rbd_lock(struct rbd_device *rbd_dev)
{ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; char cookie[32]; int ret;
format_lock_cookie(rbd_dev, cookie);
ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
RBD_LOCK_TAG, "", 0); if (ret && ret != -EEXIST) return ret;
__rbd_lock(rbd_dev, cookie); return 0;
}
/* * lock_rwsem must be held for write
*/ staticvoid rbd_unlock(struct rbd_device *rbd_dev)
{ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; int ret;
ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
RBD_LOCK_NAME, rbd_dev->lock_cookie); if (ret && ret != -ENOENT)
rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
/* treat errors as the image is unlocked */
rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
rbd_dev->lock_cookie[0] = '\0';
rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
}
ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
&rbd_dev->header_oloc, &watchers,
&num_watchers); if (ret) {
rbd_warn(rbd_dev, "failed to get watchers: %d", ret); return ret;
}
sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie); for (i = 0; i < num_watchers; i++) { /* * Ignore addr->type while comparing. This mimics * entity_addr_t::get_legacy_str() + strcmp().
*/ if (ceph_addr_equal_no_type(&watchers[i].addr,
&locker->info.addr) &&
watchers[i].cookie == cookie) { struct rbd_client_id cid = {
.gid = le64_to_cpu(watchers[i].name.num),
.handle = cookie,
};
dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
rbd_dev, cid.gid, cid.handle);
rbd_set_owner_cid(rbd_dev, &cid);
ret = 1; goto out;
}
}
dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
ret = 0;
out:
kfree(watchers); return ret;
}
/* * lock_rwsem must be held for write
*/ staticint rbd_try_lock(struct rbd_device *rbd_dev)
{ struct ceph_client *client = rbd_dev->rbd_client->client; struct ceph_locker *locker, *refreshed_locker; int ret;
for (;;) {
locker = refreshed_locker = NULL;
ret = rbd_lock(rbd_dev); if (!ret) goto out; if (ret != -EBUSY) {
rbd_warn(rbd_dev, "failed to lock header: %d", ret); goto out;
}
/* determine if the current lock holder is still alive */
locker = get_lock_owner_info(rbd_dev); if (IS_ERR(locker)) {
ret = PTR_ERR(locker);
locker = NULL; goto out;
} if (!locker) goto again;
ret = find_watcher(rbd_dev, locker); if (ret) goto out; /* request lock or error */
refreshed_locker = get_lock_owner_info(rbd_dev); if (IS_ERR(refreshed_locker)) {
ret = PTR_ERR(refreshed_locker);
refreshed_locker = NULL; goto out;
} if (!refreshed_locker ||
!locker_equal(locker, refreshed_locker)) goto again;
rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
ENTITY_NAME(locker->id.name));
ret = ceph_monc_blocklist_add(&client->monc,
&locker->info.addr); if (ret) {
rbd_warn(rbd_dev, "failed to blocklist %s%llu: %d",
ENTITY_NAME(locker->id.name), ret); goto out;
}
ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
&rbd_dev->header_oloc, RBD_LOCK_NAME,
locker->id.cookie, &locker->id.name); if (ret && ret != -ENOENT) {
rbd_warn(rbd_dev, "failed to break header lock: %d",
ret); goto out;
}
ret = rbd_post_acquire_action(rbd_dev); if (ret) {
rbd_warn(rbd_dev, "post-acquire action failed: %d", ret); /* * Can't stay in RBD_LOCK_STATE_LOCKED because * rbd_lock_add_request() would let the request through, * assuming that e.g. object map is locked and loaded.
*/
rbd_unlock(rbd_dev);
}
if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) returnfalse;
/* * Ensure that all in-flight IO is flushed.
*/
rbd_dev->lock_state = RBD_LOCK_STATE_QUIESCING;
rbd_assert(!completion_done(&rbd_dev->quiescing_wait)); if (list_empty(&rbd_dev->running_list)) returntrue;
/* * lock_rwsem must be held for write
*/ staticvoid rbd_release_lock(struct rbd_device *rbd_dev)
{ if (!rbd_quiesce_lock(rbd_dev)) return;
__rbd_release_lock(rbd_dev);
/* * Give others a chance to grab the lock - we would re-acquire * almost immediately if we got new IO while draining the running * list otherwise. We need to ack our own notifications, so this * lock_dwork will be requeued from rbd_handle_released_lock() by * way of maybe_kick_acquire().
*/
cancel_delayed_work(&rbd_dev->lock_dwork);
}
/* * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no * ResponseMessage is needed.
*/ staticint rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v, void **p)
{ struct rbd_client_id my_cid = rbd_get_cid(rbd_dev); struct rbd_client_id cid = { 0 }; int result = 1;
/* * header_rwsem must not be held to avoid a deadlock with * rbd_dev_refresh() when flushing notifies.
*/ staticvoid rbd_unregister_watch(struct rbd_device *rbd_dev)
{
cancel_tasks_sync(rbd_dev);
mutex_lock(&rbd_dev->watch_mutex); if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
__rbd_unregister_watch(rbd_dev);
rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
mutex_unlock(&rbd_dev->watch_mutex);
/* * lock_rwsem must be held for write
*/ staticvoid rbd_reacquire_lock(struct rbd_device *rbd_dev)
{ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; char cookie[32]; int ret;
if (!rbd_quiesce_lock(rbd_dev)) return;
format_lock_cookie(rbd_dev, cookie);
ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
&rbd_dev->header_oloc, RBD_LOCK_NAME,
CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
RBD_LOCK_TAG, cookie); if (ret) { if (ret != -EOPNOTSUPP)
rbd_warn(rbd_dev, "failed to update lock cookie: %d",
ret);
if (rbd_dev->opts->exclusive)
rbd_warn(rbd_dev, "temporarily releasing lock on exclusive mapping");
/* * Lock cookie cannot be updated on older OSDs, so do * a manual release and queue an acquire.
*/
__rbd_release_lock(rbd_dev);
queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
} else {
__rbd_lock(rbd_dev, cookie);
wake_lock_waiters(rbd_dev, 0);
}
}
down_write(&rbd_dev->lock_rwsem); if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
rbd_reacquire_lock(rbd_dev);
up_write(&rbd_dev->lock_rwsem);
ret = rbd_dev_refresh(rbd_dev); if (ret)
rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
}
/* * Synchronous osd object method call. Returns the number of bytes * returned in the outbound buffer, or a negative error code.
*/ staticint rbd_obj_method_sync(struct rbd_device *rbd_dev, struct ceph_object_id *oid, struct ceph_object_locator *oloc, constchar *method_name, constvoid *outbound,
size_t outbound_size, void *inbound,
size_t inbound_size)
{ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct page *req_page = NULL; struct page *reply_page; int ret;
/* * Method calls are ultimately read operations. The result * should placed into the inbound buffer provided. They * also supply outbound data--parameters for the object * method. Currently if this is present it will be a * snapshot id.
*/ if (outbound) { if (outbound_size > PAGE_SIZE) return -E2BIG;
req_page = alloc_page(GFP_KERNEL); if (!req_page) return -ENOMEM;
/* * Read the complete header for the given rbd device. On successful * return, the rbd_dev->header field will contain up-to-date * information about the image.
*/ staticint rbd_dev_v1_header_info(struct rbd_device *rbd_dev, struct rbd_image_header *header, bool first_time)
{ struct rbd_image_header_ondisk *ondisk = NULL;
u32 snap_count = 0;
u64 names_size = 0;
u32 want_count; int ret;
/* * The complete header will include an array of its 64-bit * snapshot ids, followed by the names of those snapshots as * a contiguous block of NUL-terminated strings. Note that * the number of snapshots could change by the time we read * it in, in which case we re-read it.
*/ do {
size_t size;
/* * If EXISTS is not set, rbd_dev->disk may be NULL, so don't * try to update its size. If REMOVING is set, updating size * is just useless work since the device can't be opened.
*/ if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
!test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
dout("setting size to %llu sectors", (unsignedlonglong)size);
set_capacity_and_notify(rbd_dev->disk, size);
}
}
/* * Shows the name of the currently-mapped snapshot (or * RBD_SNAP_HEAD_NAME for the base image).
*/ static ssize_t rbd_snap_show(struct device *dev, struct device_attribute *attr, char *buf)
{ struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
/* * For a v2 image, shows the chain of parent images, separated by empty * lines. For v1 images or if there is no parent, shows "(no parent * image)".
*/ static ssize_t rbd_parent_show(struct device *dev, struct device_attribute *attr, char *buf)
{ struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
ssize_t count = 0;
if (!rbd_dev->parent) return sprintf(buf, "(no parent image)\n");
if (need_put) {
destroy_workqueue(rbd_dev->task_wq);
ida_free(&rbd_dev_id_ida, rbd_dev->dev_id);
}
rbd_dev_free(rbd_dev);
/* * This is racy, but way better than putting module outside of * the release callback. The race window is pretty small, so * doing something similar to dm (dm-builtin.c) is overkill.
*/ if (need_put)
module_put(THIS_MODULE);
}
rbd_dev = __rbd_dev_create(spec); if (!rbd_dev) return NULL;
/* get an id and fill in device name */
rbd_dev->dev_id = ida_alloc_max(&rbd_dev_id_ida,
minor_to_rbd_dev_id(1 << MINORBITS) - 1,
GFP_KERNEL); if (rbd_dev->dev_id < 0) goto fail_rbd_dev;
staticvoid rbd_dev_destroy(struct rbd_device *rbd_dev)
{ if (rbd_dev)
put_device(&rbd_dev->dev);
}
/* * Get the size and object order for an image snapshot, or if * snap_id is CEPH_NOSNAP, gets this information for the base * image.
*/ staticint _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
u8 *order, u64 *snap_size)
{
__le64 snapid = cpu_to_le64(snap_id); int ret; struct {
u8 order;
__le64 size;
} __attribute__ ((packed)) size_buf = { 0 };
ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
&rbd_dev->header_oloc, "get_size",
&snapid, sizeof(snapid),
&size_buf, sizeof(size_buf));
dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) return ret; if (ret < sizeof (size_buf)) return -ERANGE;
if (order) {
*order = size_buf.order;
dout(" order %u", (unsignedint)*order);
}
*snap_size = le64_to_cpu(size_buf.size);
/* Response will be an encoded string, which includes a length */
size = sizeof(__le32) + RBD_OBJ_PREFIX_LEN_MAX;
reply_buf = kzalloc(size, GFP_KERNEL); if (!reply_buf) return -ENOMEM;
ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
&rbd_dev->header_oloc, "get_object_prefix",
NULL, 0, reply_buf, size);
dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) goto out;
p = reply_buf;
object_prefix = ceph_extract_encoded_string(&p, p + ret, NULL,
GFP_NOIO); if (IS_ERR(object_prefix)) {
ret = PTR_ERR(object_prefix); goto out;
}
ret = 0;
/* * These are generic image flags, but since they are used only for * object map, store them in rbd_dev->object_map_flags. * * For the same reason, this function is called only on object map * (re)load and not on header refresh.
*/ staticint rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
{
__le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
__le64 flags; int ret;
ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
&rbd_dev->header_oloc, "get_flags",
&snapid, sizeof(snapid),
&flags, sizeof(flags)); if (ret < 0) return ret; if (ret < sizeof(flags)) return -EBADMSG;
ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, "rbd", "parent_get", CEPH_OSD_FLAG_READ,
req_page, sizeof(u64), &reply_page, &reply_len); if (ret) return ret == -EOPNOTSUPP ? 1 : ret;
p = page_address(reply_page);
end = p + reply_len;
ret = decode_parent_image_spec(&p, end, pii); if (ret) return ret;
ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
req_page, sizeof(u64), &reply_page, &reply_len); if (ret) return ret;
p = page_address(reply_page);
end = p + reply_len;
ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval); if (pii->has_overlap)
ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
parent_spec = rbd_spec_alloc(); if (!parent_spec) return -ENOMEM;
ret = rbd_dev_v2_parent_info(rbd_dev, &pii); if (ret) goto out_err;
if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) goto out; /* No parent? No problem. */
/* The ceph file layout needs to fit pool id in 32 bits */
ret = -EIO; if (pii.pool_id > (u64)U32_MAX) {
rbd_warn(NULL, "parent pool id too large (%llu > %u)",
(unsignedlonglong)pii.pool_id, U32_MAX); goto out_err;
}
/* * The parent won't change except when the clone is flattened, * so we only need to record the parent image spec once.
*/
parent_spec->pool_id = pii.pool_id; if (pii.pool_ns && *pii.pool_ns) {
parent_spec->pool_ns = pii.pool_ns;
pii.pool_ns = NULL;
}
parent_spec->image_id = pii.image_id;
pii.image_id = NULL;
parent_spec->snap_id = pii.snap_id;
rbd_assert(!rbd_dev->parent_spec);
rbd_dev->parent_spec = parent_spec;
parent_spec = NULL; /* rbd_dev now owns this */
/* * Record the parent overlap. If it's zero, issue a warning as * we will proceed as if there is no parent.
*/ if (!pii.overlap)
rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
rbd_dev->parent_overlap = pii.overlap;
out:
ret = 0;
out_err:
rbd_parent_info_cleanup(&pii);
rbd_spec_put(parent_spec); return ret;
}
ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc, "dir_get_name", image_id, image_id_size,
reply_buf, size); if (ret < 0) goto out;
p = reply_buf;
end = reply_buf + ret;
image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL); if (IS_ERR(image_name))
image_name = NULL; else
dout("%s: name is %s len is %zd\n", __func__, image_name, len);
out:
kfree(reply_buf);
kfree(image_id);
for (which = 0; !found && which < snapc->num_snaps; which++) { constchar *snap_name;
snap_id = snapc->snaps[which];
snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id); if (IS_ERR(snap_name)) { /* ignore no-longer existing snapshots */ if (PTR_ERR(snap_name) == -ENOENT) continue; else break;
}
found = !strcmp(name, snap_name);
kfree(snap_name);
} return found ? snap_id : CEPH_NOSNAP;
}
/* * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if * no snapshot by that name is found, or if an error occurs.
*/ static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, constchar *name)
{ if (rbd_dev->image_format == 1) return rbd_v1_snap_id_by_name(rbd_dev, name);
return rbd_v2_snap_id_by_name(rbd_dev, name);
}
/* * An image being mapped will have everything but the snap id.
*/ staticint rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
{ struct rbd_spec *spec = rbd_dev->spec;
/* * A parent image will have all ids but none of the names. * * All names in an rbd spec are dynamically allocated. It's OK if we * can't figure out the name for an image id.
*/ staticint rbd_spec_fill_names(struct rbd_device *rbd_dev)
{ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_spec *spec = rbd_dev->spec; constchar *pool_name; constchar *image_name; constchar *snap_name; int ret;
/* Get the pool name; we have to make our own copy of this */
pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id); if (!pool_name) {
rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id); return -EIO;
}
pool_name = kstrdup(pool_name, GFP_KERNEL); if (!pool_name) return -ENOMEM;
/* Fetch the image name; tolerate failure here */
image_name = rbd_dev_image_name(rbd_dev); if (!image_name)
rbd_warn(rbd_dev, "unable to get image name");
/* Fetch the snapshot name */
snap_name = rbd_snap_name(rbd_dev, spec->snap_id); if (IS_ERR(snap_name)) {
ret = PTR_ERR(snap_name); goto out_err;
}
/* * We'll need room for the seq value (maximum snapshot id), * snapshot count, and array of that many snapshot ids. * For now we have a fixed upper limit on the number we're * prepared to receive.
*/
size = sizeof (__le64) + sizeof (__le32) +
RBD_MAX_SNAP_COUNT * sizeof (__le64);
reply_buf = kzalloc(size, GFP_KERNEL); if (!reply_buf) return -ENOMEM;
ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
&rbd_dev->header_oloc, "get_snapcontext",
NULL, 0, reply_buf, size);
dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) goto out;
p = reply_buf;
end = reply_buf + ret;
ret = -ERANGE;
ceph_decode_64_safe(&p, end, seq, out);
ceph_decode_32_safe(&p, end, snap_count, out);
/* * Make sure the reported number of snapshot ids wouldn't go * beyond the end of our buffer. But before checking that, * make sure the computed size of the snapshot context we * allocate is representable in a size_t.
*/ if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
/ sizeof (u64)) {
ret = -EINVAL; goto out;
} if (!ceph_has_room(&p, end, snap_count * sizeof (__le64))) goto out;
ret = 0;
snapc = ceph_create_snap_context(snap_count, GFP_KERNEL); if (!snapc) {
ret = -ENOMEM; goto out;
}
snapc->seq = seq; for (i = 0; i < snap_count; i++)
snapc->snaps[i] = ceph_decode_64(&p);
/* * Skips over white space at *buf, and updates *buf to point to the * first found non-space character (if any). Returns the length of * the token (string of non-white space characters) found. Note * that *buf must be terminated with '\0'.
*/ staticinline size_t next_token(constchar **buf)
{ /* * These are the characters that produce nonzero for * isspace() in the "C" and "POSIX" locales.
*/ staticconstchar spaces[] = " \f\n\r\t\v";
*buf += strspn(*buf, spaces); /* Find start of token */
/* * Finds the next token in *buf, dynamically allocates a buffer big * enough to hold a copy of it, and copies the token into the new * buffer. The copy is guaranteed to be terminated with '\0'. Note * that a duplicate buffer is created even for a zero-length token. * * Returns a pointer to the newly-allocated duplicate, or a null * pointer if memory for the duplicate was not available. If * the lenp argument is a non-null pointer, the length of the token * (not including the '\0') is returned in *lenp. * * If successful, the *buf pointer will be updated to point beyond * the end of the found token. * * Note: uses GFP_KERNEL for allocation.
*/ staticinlinechar *dup_token(constchar **buf, size_t *lenp)
{ char *dup;
size_t len;
len = next_token(buf);
dup = kmemdup(*buf, len + 1, GFP_KERNEL); if (!dup) return NULL;
*(dup + len) = '\0';
*buf += len;
switch (token) { case Opt_queue_depth: if (result.uint_32 < 1) goto out_of_range;
opt->queue_depth = result.uint_32; break; case Opt_alloc_size: if (result.uint_32 < SECTOR_SIZE) goto out_of_range; if (!is_power_of_2(result.uint_32)) return inval_plog(&log, "alloc_size must be a power of 2");
opt->alloc_size = result.uint_32; break; case Opt_lock_timeout: /* 0 is "wait forever" (i.e. infinite timeout) */ if (result.uint_32 > INT_MAX / 1000) goto out_of_range;
opt->lock_timeout = msecs_to_jiffies(result.uint_32 * 1000); break; case Opt_pool_ns:
kfree(pctx->spec->pool_ns);
pctx->spec->pool_ns = param->string;
param->string = NULL; break; case Opt_compression_hint: switch (result.uint_32) { case Opt_compression_hint_none:
opt->alloc_hint_flags &=
~(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE |
CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE); break; case Opt_compression_hint_compressible:
opt->alloc_hint_flags |=
CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
opt->alloc_hint_flags &=
~CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE; break; case Opt_compression_hint_incompressible:
opt->alloc_hint_flags |=
CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
opt->alloc_hint_flags &=
~CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE; break; default:
BUG();
} break; case Opt_read_only:
opt->read_only = true; break; case Opt_read_write:
opt->read_only = false; break; case Opt_lock_on_read:
opt->lock_on_read = true; break; case Opt_exclusive:
opt->exclusive = true; break; case Opt_notrim:
opt->trim = false; break; default:
BUG();
}
return 0;
out_of_range: return inval_plog(&log, "%s out of range", param->key);
}
/* * This duplicates most of generic_parse_monolithic(), untying it from * fs_context and skipping standard superblock and security options.
*/ staticint rbd_parse_options(char *options, struct rbd_parse_opts_ctx *pctx)
{ char *key; int ret = 0;
if (value) { if (value == key) continue;
*value++ = 0;
v_len = strlen(value);
param.string = kmemdup_nul(value, v_len,
GFP_KERNEL); if (!param.string) return -ENOMEM;
param.type = fs_value_is_string;
}
param.size = v_len;
ret = rbd_parse_param(¶m, pctx);
kfree(param.string); if (ret) break;
}
}
return ret;
}
/* * Parse the options provided for an "rbd add" (i.e., rbd image * mapping) request. These arrive via a write to /sys/bus/rbd/add, * and the data written is passed here via a NUL-terminated buffer. * Returns 0 if successful or an error code otherwise. * * The information extracted from these options is recorded in * the other parameters which return dynamically-allocated * structures: * ceph_opts * The address of a pointer that will refer to a ceph options * structure. Caller must release the returned pointer using * ceph_destroy_options() when it is no longer needed. * rbd_opts * Address of an rbd options pointer. Fully initialized by * this function; caller must release with kfree(). * spec * Address of an rbd image specification pointer. Fully * initialized by this function based on parsed options. * Caller must release with rbd_spec_put(). * * The options passed take this form: * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>] * where: * <mon_addrs> * A comma-separated list of one or more monitor addresses. * A monitor address is an ip address, optionally followed * by a port number (separated by a colon). * I.e.: ip1[:port1][,ip2[:port2]...] * <options> * A comma-separated list of ceph and/or rbd options. * <pool_name> * The name of the rados pool containing the rbd image. * <image_name> * The name of the image in that pool to map. * <snap_id> * An optional snapshot id. If provided, the mapping will * present data from the image at the time that snapshot was * created. The image head is used if no snapshot id is * provided. Snapshot mappings are always read-only.
*/ staticint rbd_add_parse_args(constchar *buf, struct ceph_options **ceph_opts, struct rbd_options **opts, struct rbd_spec **rbd_spec)
{
size_t len; char *options; constchar *mon_addrs; char *snap_name;
size_t mon_addrs_size; struct rbd_parse_opts_ctx pctx = { 0 }; int ret;
/* The first four tokens are required */
len = next_token(&buf); if (!len) {
rbd_warn(NULL, "no monitor address(es) provided"); return -EINVAL;
}
mon_addrs = buf;
mon_addrs_size = len;
buf += len;
ret = -EINVAL;
options = dup_token(&buf, NULL); if (!options) return -ENOMEM; if (!*options) {
rbd_warn(NULL, "no options provided"); goto out_err;
}
pctx.spec = rbd_spec_alloc(); if (!pctx.spec) goto out_mem;
pctx.spec->pool_name = dup_token(&buf, NULL); if (!pctx.spec->pool_name) goto out_mem; if (!*pctx.spec->pool_name) {
rbd_warn(NULL, "no pool name provided"); goto out_err;
}
pctx.spec->image_name = dup_token(&buf, NULL); if (!pctx.spec->image_name) goto out_mem; if (!*pctx.spec->image_name) {
rbd_warn(NULL, "no image name provided"); goto out_err;
}
/* * Snapshot name is optional; default is to use "-" * (indicating the head/no snapshot).
*/
len = next_token(&buf); if (!len) {
buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
} elseif (len > RBD_MAX_SNAP_NAME_LEN) {
ret = -ENAMETOOLONG; goto out_err;
}
snap_name = kmemdup(buf, len + 1, GFP_KERNEL); if (!snap_name) goto out_mem;
*(snap_name + len) = '\0';
pctx.spec->snap_name = snap_name;
pctx.copts = ceph_alloc_options(); if (!pctx.copts) goto out_mem;
/* Initialize all rbd options to the defaults */
pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL); if (!pctx.opts) goto out_mem;
/* * If the wait is interrupted, an error is returned even if the lock * was successfully acquired. rbd_dev_image_unlock() will release it * if needed.
*/ staticint rbd_add_acquire_lock(struct rbd_device *rbd_dev)
{ long ret;
if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) { if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read) return 0;
rbd_warn(rbd_dev, "exclusive-lock feature is not enabled"); return -EINVAL;
}
if (rbd_is_ro(rbd_dev)) return 0;
rbd_assert(!rbd_is_lock_owner(rbd_dev));
queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
ceph_timeout_jiffies(rbd_dev->opts->lock_timeout)); if (ret > 0) {
ret = rbd_dev->acquire_err;
} else {
cancel_delayed_work_sync(&rbd_dev->lock_dwork); if (!ret)
ret = -ETIMEDOUT;
rbd_warn(rbd_dev, "failed to acquire lock: %ld", ret);
} if (ret) return ret;
return 0;
}
/* * An rbd format 2 image has a unique identifier, distinct from the * name given to it by the user. Internally, that identifier is * what's used to specify the names of objects related to the image. * * A special "rbd id" object is used to map an rbd image name to its * id. If that object doesn't exist, then there is no v2 rbd image * with the supplied name. * * This function will record the given rbd_dev's image_id field if * it can be determined, and in that case will return 0. If any * errors occur a negative errno will be returned and the rbd_dev's * image_id field will be unchanged (and should be NULL).
*/ staticint rbd_dev_image_id(struct rbd_device *rbd_dev)
{ int ret;
size_t size;
CEPH_DEFINE_OID_ONSTACK(oid); void *response; char *image_id;
/* * When probing a parent image, the image id is already * known (and the image name likely is not). There's no * need to fetch the image id again in this case. We * do still need to set the image format though.
*/ if (rbd_dev->spec->image_id) {
rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
return 0;
}
/* * First, see if the format 2 image id file exists, and if * so, get the image's persistent id from it.
*/
ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
rbd_dev->spec->image_name); if (ret) return ret;
dout("rbd id object name is %s\n", oid.name);
/* Response will be an encoded string, which includes a length */
size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
response = kzalloc(size, GFP_NOIO); if (!response) {
ret = -ENOMEM; goto out;
}
/* If it doesn't exist we'll assume it's a format 1 image */
ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc, "get_id", NULL, 0,
response, size);
dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret == -ENOENT) {
image_id = kstrdup("", GFP_KERNEL);
ret = image_id ? 0 : -ENOMEM; if (!ret)
rbd_dev->image_format = 1;
} elseif (ret >= 0) { void *p = response;
image_id = ceph_extract_encoded_string(&p, p + ret,
NULL, GFP_NOIO);
ret = PTR_ERR_OR_ZERO(image_id); if (!ret)
rbd_dev->image_format = 2;
}
if (!ret) {
rbd_dev->spec->image_id = image_id;
dout("image_id is %s\n", image_id);
}
out:
kfree(response);
ceph_oid_destroy(&oid); return ret;
}
/* * Undo whatever state changes are made by v1 or v2 header info * call.
*/ staticvoid rbd_dev_unprobe(struct rbd_device *rbd_dev)
{
rbd_dev_parent_put(rbd_dev);
rbd_object_map_free(rbd_dev);
rbd_dev_mapping_clear(rbd_dev);
/* Free dynamic fields from the header, then zero it out */
rbd_image_header_cleanup(&rbd_dev->header);
}
staticint rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev, struct rbd_image_header *header)
{ int ret;
ret = rbd_dev_v2_object_prefix(rbd_dev, &header->object_prefix); if (ret) return ret;
/* * Get the and check features for the image. Currently the * features are assumed to never change.
*/
ret = _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
rbd_is_ro(rbd_dev), &header->features); if (ret) return ret;
/* If the image supports fancy striping, get its parameters */
if (header->features & RBD_FEATURE_STRIPINGV2) {
ret = rbd_dev_v2_striping_info(rbd_dev, &header->stripe_unit,
&header->stripe_count); if (ret) return ret;
}
if (header->features & RBD_FEATURE_DATA_POOL) {
ret = rbd_dev_v2_data_pool(rbd_dev, &header->data_pool_id); if (ret) return ret;
}
return 0;
}
/* * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() -> * rbd_dev_image_probe() recursion depth, which means it's also the * length of the already discovered part of the parent chain.
*/ staticint rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
{ struct rbd_device *parent = NULL; int ret;
if (!rbd_dev->parent_spec) return 0;
if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
pr_info("parent chain is too long (%d)\n", depth);
ret = -EINVAL; goto out_err;
}
parent = __rbd_dev_create(rbd_dev->parent_spec); if (!parent) {
ret = -ENOMEM; goto out_err;
}
/* * Images related by parent/child relationships always share * rbd_client and spec/parent_spec, so bump their refcounts.
*/
parent->rbd_client = __rbd_get_client(rbd_dev->rbd_client);
parent->spec = rbd_spec_get(rbd_dev->parent_spec);
__set_bit(RBD_DEV_FLAG_READONLY, &parent->flags);
ret = rbd_dev_image_probe(parent, depth); if (ret < 0) goto out_err;
/* * rbd_dev->header_rwsem must be locked for write and will be unlocked * upon return.
*/ staticint rbd_dev_device_setup(struct rbd_device *rbd_dev)
{ int ret;
/* Record our major and minor device numbers. */
if (!single_major) {
ret = register_blkdev(0, rbd_dev->name); if (ret < 0) goto err_out_unlock;
/* * Probe for the existence of the header object for the given rbd * device. If this image is the one being mapped (i.e., not a * parent), initiate a watch on its header object before using that * object to get detailed information about the rbd image. * * On success, returns with header_rwsem held for write if called * with @depth == 0.
*/ staticint rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
{ bool need_watch = !rbd_is_ro(rbd_dev); int ret;
/* * Get the id from the image id object. Unless there's an * error, rbd_dev->spec->image_id will be filled in with * a dynamically-allocated string, and rbd_dev->image_format * will be set to either 1 or 2.
*/
ret = rbd_dev_image_id(rbd_dev); if (ret) return ret;
ret = rbd_dev_header_name(rbd_dev); if (ret) goto err_out_format;
if (need_watch) {
ret = rbd_register_watch(rbd_dev); if (ret) { if (ret == -ENOENT)
rbd_print_dne(rbd_dev, false); goto err_out_format;
}
}
if (!depth)
down_write(&rbd_dev->header_rwsem);
ret = rbd_dev_header_info(rbd_dev, &rbd_dev->header, true); if (ret) { if (ret == -ENOENT && !need_watch)
rbd_print_dne(rbd_dev, false); goto err_out_probe;
}
rbd_init_layout(rbd_dev);
/* * If this image is the one being mapped, we have pool name and * id, image name and id, and snap name - need to fill snap id. * Otherwise this is a parent image, identified by pool, image * and snap ids - need to fill in names for those ids.
*/ if (!depth)
ret = rbd_spec_fill_snap_id(rbd_dev); else
ret = rbd_spec_fill_names(rbd_dev); if (ret) { if (ret == -ENOENT)
rbd_print_dne(rbd_dev, true); goto err_out_probe;
}
ret = rbd_dev_mapping_set(rbd_dev); if (ret) goto err_out_probe;
if (rbd_is_snap(rbd_dev) &&
(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
ret = rbd_object_map_load(rbd_dev); if (ret) goto err_out_probe;
}
if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
ret = rbd_dev_setup_parent(rbd_dev); if (ret) goto err_out_probe;
}
ret = rbd_dev_probe_parent(rbd_dev, depth); if (ret) goto err_out_probe;
dout("discovered format %u image, header name is %s\n",
rbd_dev->image_format, rbd_dev->header_oid.name); return 0;
err_out_probe: if (!depth)
up_write(&rbd_dev->header_rwsem); if (need_watch)
rbd_unregister_watch(rbd_dev);
rbd_dev_unprobe(rbd_dev);
err_out_format:
rbd_dev->image_format = 0;
kfree(rbd_dev->spec->image_id);
rbd_dev->spec->image_id = NULL; return ret;
}
staticvoid rbd_dev_update_parent(struct rbd_device *rbd_dev, struct parent_image_info *pii)
{ if (pii->pool_id == CEPH_NOPOOL || !pii->has_overlap) { /* * Either the parent never existed, or we have * record of it but the image got flattened so it no * longer has a parent. When the parent of a * layered image disappears we immediately set the * overlap to 0. The effect of this is that all new * requests will be treated as if the image had no * parent. * * If !pii.has_overlap, the parent image spec is not * applicable. It's there to avoid duplication in each * snapshot record.
*/ if (rbd_dev->parent_overlap) {
rbd_dev->parent_overlap = 0;
rbd_dev_parent_put(rbd_dev);
pr_info("%s: clone has been flattened\n",
rbd_dev->disk->disk_name);
}
} else {
rbd_assert(rbd_dev->parent_spec);
/* * Update the parent overlap. If it became zero, issue * a warning as we will proceed as if there is no parent.
*/ if (!pii->overlap && rbd_dev->parent_overlap)
rbd_warn(rbd_dev, "clone has become standalone (overlap 0)");
rbd_dev->parent_overlap = pii->overlap;
}
}
ret = rbd_dev_header_info(rbd_dev, &header, false); if (ret) goto out;
/* * If there is a parent, see if it has disappeared due to the * mapped image getting flattened.
*/ if (rbd_dev->parent) {
ret = rbd_dev_v2_parent_info(rbd_dev, &pii); if (ret) goto out;
}
down_write(&rbd_dev->header_rwsem);
rbd_dev_update_header(rbd_dev, &header); if (rbd_dev->parent)
rbd_dev_update_parent(rbd_dev, &pii);
up_write(&rbd_dev->header_rwsem);
/* pick the pool */
rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name); if (rc < 0) { if (rc == -ENOENT)
pr_info("pool %s does not exist\n", spec->pool_name); goto err_out_client;
}
spec->pool_id = (u64)rc;
rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts); if (!rbd_dev) {
rc = -ENOMEM; goto err_out_client;
}
rbdc = NULL; /* rbd_dev now owns this */
spec = NULL; /* rbd_dev now owns this */
rbd_opts = NULL; /* rbd_dev now owns this */
/* if we are mapping a snapshot it will be a read-only mapping */ if (rbd_dev->opts->read_only ||
strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME))
__set_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
/* * Follow to the parent with no grandparent and * remove it.
*/ while (second && (third = second->parent)) {
first = second;
second = third;
}
rbd_assert(second);
rbd_dev_image_release(second);
rbd_dev_destroy(second);
first->parent = NULL;
first->parent_overlap = 0;
static ssize_t do_rbd_remove(constchar *buf, size_t count)
{ struct rbd_device *rbd_dev = NULL; int dev_id; char opt_buf[6]; bool force = false; int ret;
if (!capable(CAP_SYS_ADMIN)) return -EPERM;
dev_id = -1;
opt_buf[0] = '\0';
sscanf(buf, "%d %5s", &dev_id, opt_buf); if (dev_id < 0) {
pr_err("dev_id out of range\n"); return -EINVAL;
} if (opt_buf[0] != '\0') { if (!strcmp(opt_buf, "force")) {
force = true;
} else {
pr_err("bad remove option at '%s'\n", opt_buf); return -EINVAL;
}
}
ret = -ENOENT;
spin_lock(&rbd_dev_list_lock);
list_for_each_entry(rbd_dev, &rbd_dev_list, node) { if (rbd_dev->dev_id == dev_id) {
ret = 0; break;
}
} if (!ret) {
spin_lock_irq(&rbd_dev->lock); if (rbd_dev->open_count && !force)
ret = -EBUSY; elseif (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
&rbd_dev->flags))
ret = -EINPROGRESS;
spin_unlock_irq(&rbd_dev->lock);
}
spin_unlock(&rbd_dev_list_lock); if (ret) return ret;
if (force) { /* * Prevent new IO from being queued and wait for existing * IO to complete/fail.
*/ unsignedint memflags = blk_mq_freeze_queue(rbd_dev->disk->queue);
if (!libceph_compatible(NULL)) {
rbd_warn(NULL, "libceph incompatibility (quitting)"); return -EINVAL;
}
rc = rbd_slab_init(); if (rc) return rc;
/* * The number of active work items is limited by the number of * rbd devices * queue depth, so leave @max_active at default.
*/
rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0); if (!rbd_wq) {
rc = -ENOMEM; goto err_out_slab;
}
if (single_major) {
rbd_major = register_blkdev(0, RBD_DRV_NAME); if (rbd_major < 0) {
rc = rbd_major; goto err_out_wq;
}
}
rc = rbd_sysfs_init(); if (rc) goto err_out_blkdev;
if (single_major)
pr_info("loaded (major %d)\n", rbd_major); else
pr_info("loaded\n");
MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>"); /* following authorship retained from original osdblk.c */
MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.