/* rbd.c -- Export ceph rados objects as a Linux block device
based on drivers/block/osdblk.c:
Copyright 2009 Red Hat, Inc.
This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with this program; see the file COPYING. If not, write to the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
/* * Increment the given counter and return its updated value. * If the counter is already 0 it will not be incremented. * If the counter is already at its maximum value returns * -EINVAL without updating it.
*/ staticint atomic_inc_return_safe(atomic_t *v)
{ unsignedint counter;
#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
#define RBD_SNAP_HEAD_NAME "-"
#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
/* This allows a single page to hold an image name sent by OSD */ #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1) #define RBD_IMAGE_ID_LEN_MAX 64
/* Features supported by this (client software) implementation. */
#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
/* * An RBD device name will be "rbd#", where the "rbd" comes from * RBD_DRV_NAME above, and # is a unique integer identifier.
*/ #define DEV_NAME_LEN 32
/* * block device image metadata (in-memory version)
*/ struct rbd_image_header { /* These six fields never change for a given rbd image */ char *object_prefix;
__u8 obj_order;
u64 stripe_unit;
u64 stripe_count;
s64 data_pool_id;
u64 features; /* Might be changeable someday? */
/* The remaining fields need to be updated occasionally */
u64 image_size; struct ceph_snap_context *snapc; char *snap_names; /* format 1 only */
u64 *snap_sizes; /* format 1 only */
};
/* * An rbd image specification. * * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely * identify an image. Each rbd_dev structure includes a pointer to * an rbd_spec structure that encapsulates this identity. * * Each of the id's in an rbd_spec has an associated name. For a * user-mapped image, the names are supplied and the id's associated * with them are looked up. For a layered image, a parent image is * defined by the tuple, and the names are looked up. * * An rbd_dev structure contains a parent_spec pointer which is * non-null if the image it represents is a child in a layered * image. This pointer will refer to the rbd_spec structure used * by the parent rbd_dev for its own identity (i.e., the structure * is shared between the parent and child). * * Since these structures are populated once, during the discovery * phase of image construction, they are effectively immutable so * we make no effort to synchronize access to them. * * Note that code herein does not assume the image name is known (it * could be a null pointer).
*/ struct rbd_spec {
u64 pool_id; constchar *pool_name; constchar *pool_ns; /* NULL if default, never "" */
constchar *image_id; constchar *image_name;
u64 snap_id; constchar *snap_name;
struct kref kref;
};
/* * an instance of the client. multiple devices may share an rbd client.
*/ struct rbd_client { struct ceph_client *client; struct kref kref; struct list_head node;
};
struct pending_result { int result; /* first nonzero result */ int num_pending;
};
struct rbd_img_request;
enum obj_request_type {
OBJ_REQUEST_NODATA = 1,
OBJ_REQUEST_BIO, /* pointer into provided bio (list) */
OBJ_REQUEST_BVECS, /* pointer into provided bio_vec array */
OBJ_REQUEST_OWN_BVECS, /* private bio_vec array, doesn't own pages */
};
/* protects updating the header */ struct rw_semaphore header_rwsem;
struct rbd_mapping mapping;
struct list_head node;
/* sysfs related */ struct device dev; unsignedlong open_count; /* protected by lock */
};
/* * Flag bits for rbd_dev->flags: * - REMOVING (which is coupled with rbd_dev->open_count) is protected * by rbd_dev->lock
*/ enum rbd_dev_flags {
RBD_DEV_FLAG_EXISTS, /* rbd_dev_device_setup() ran */
RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
RBD_DEV_FLAG_READONLY, /* -o ro or snapshot */
};
/* * single-major requires >= 0.75 version of userspace rbd utility.
*/ staticbool single_major = true;
module_param(single_major, bool, 0444);
MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
rbdc->client = ceph_create_client(ceph_opts, rbdc); if (IS_ERR(rbdc->client)) goto out_rbdc;
ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
ret = ceph_open_session(rbdc->client); if (ret < 0) goto out_client;
/* * Drop reference to ceph client node. If it's not referenced anymore, release * it.
*/ staticvoid rbd_put_client(struct rbd_client *rbdc)
{ if (rbdc)
kref_put(&rbdc->kref, rbd_client_release);
}
/* * Get a ceph client with specific addr and configuration, if one does * not exist create it. Either way, ceph_opts is consumed by this * function.
*/ staticstruct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
{ struct rbd_client *rbdc; int ret;
mutex_lock(&client_mutex);
rbdc = rbd_client_find(ceph_opts); if (rbdc) {
ceph_destroy_options(ceph_opts);
/* * Using an existing client. Make sure ->pg_pools is up to * date before we look up the pool id in do_rbd_add().
*/
ret = ceph_wait_for_latest_osdmap(rbdc->client,
rbdc->client->options->mount_timeout); if (ret) {
rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
rbd_put_client(rbdc);
rbdc = ERR_PTR(ret);
}
} else {
rbdc = rbd_client_create(ceph_opts);
}
mutex_unlock(&client_mutex);
/* The header has to start with the magic rbd header text */ if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT))) returnfalse;
/* The bio layer requires at least sector-sized I/O */
if (ondisk->options.order < SECTOR_SHIFT) returnfalse;
/* If we use u64 in a few spots we may be able to loosen this */
if (ondisk->options.order > 8 * sizeof (int) - 1) returnfalse;
/* * The size of a snapshot header has to fit in a size_t, and * that limits the number of snapshots.
*/
snap_count = le32_to_cpu(ondisk->snap_count);
size = SIZE_MAX - sizeof (struct ceph_snap_context); if (snap_count > size / sizeof (__le64)) returnfalse;
/* * Not only that, but the size of the entire the snapshot * header must also be representable in a size_t.
*/
size -= snap_count * sizeof (__le64); if ((u64) size < le64_to_cpu(ondisk->snap_names_len)) returnfalse;
returntrue;
}
/* * returns the size of an object in the image
*/ static u32 rbd_obj_bytes(struct rbd_image_header *header)
{ return 1U << header->obj_order;
}
if (snap_names_len > (u64)SIZE_MAX) goto out_2big;
snap_names = kmalloc(snap_names_len, GFP_KERNEL); if (!snap_names) goto out_err;
/* ...as well as the array of their sizes. */
snap_sizes = kmalloc_array(snap_count, sizeof(*header->snap_sizes),
GFP_KERNEL); if (!snap_sizes) goto out_err;
/* * Copy the names, and fill in each snapshot's id * and size. * * Note that rbd_dev_v1_header_info() guarantees the * ondisk buffer we're working with has * snap_names_len bytes beyond the end of the * snapshot id array, this memcpy() is safe.
*/
memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
snaps = ondisk->snaps; for (i = 0; i < snap_count; i++) {
snapc->snaps[i] = le64_to_cpu(snaps[i].id);
snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
}
}
/* We won't fail any more, fill in the header */
if (first_time) {
header->object_prefix = object_prefix;
header->obj_order = ondisk->options.order;
}
/* The remaining fields always get updated (when we refresh) */
/* Skip over names until we find the one we are looking for */
snap_name = rbd_dev->header.snap_names; while (which--)
snap_name += strlen(snap_name) + 1;
return kstrdup(snap_name, GFP_KERNEL);
}
/* * Snapshot id comparison function for use with qsort()/bsearch(). * Note that result is for snapshots in *descending* order.
*/ staticint snapid_compare_reverse(constvoid *s1, constvoid *s2)
{
u64 snap_id1 = *(u64 *)s1;
u64 snap_id2 = *(u64 *)s2;
/* * Search a snapshot context to see if the given snapshot id is * present. * * Returns the position of the snapshot id in the array if it's found, * or BAD_SNAP_INDEX otherwise. * * Note: The snapshot array is in kept sorted (by the osd) in * reverse order, highest snapshot id first.
*/ static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
{ struct ceph_snap_context *snapc = rbd_dev->header.snapc;
u64 *found;
found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps, sizeof (snap_id), snapid_compare_reverse);
return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
}
/* * Zero a range in @obj_req data buffer defined by a bio (list) or * (private) bio_vec array. * * @off is relative to the start of the data buffer.
*/ staticvoid rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
u32 bytes)
{
dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
switch (obj_req->img_request->data_type) { case OBJ_REQUEST_BIO:
zero_bios(&obj_req->bio_pos, off, bytes); break; case OBJ_REQUEST_BVECS: case OBJ_REQUEST_OWN_BVECS:
zero_bvecs(&obj_req->bvec_pos, off, bytes); break; default:
BUG();
}
}
/* * The default/initial value for all image request flags is 0. Each * is conditionally set to 1 at image request initialization time * and currently never change thereafter.
*/ staticvoid img_request_layered_set(struct rbd_img_request *img_request)
{
set_bit(IMG_REQ_LAYERED, &img_request->flags);
}
/* * Must be called after rbd_obj_calc_img_extents().
*/ staticvoid rbd_obj_set_copyup_enabled(struct rbd_obj_request *obj_req)
{
rbd_assert(obj_req->img_request->snapc);
dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
osd_req->r_result, obj_req);
/* * Writes aren't allowed to return a data payload. In some * guarded write cases (e.g. stat + zero on an empty object) * a stat response makes it through, but we don't care.
*/ if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
result = 0; else
result = osd_req->r_result;
/* * Data objects may be stored in a separate pool, but always in * the same namespace in that pool as the header in its pool.
*/
ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
req->r_base_oloc.pool = rbd_dev->layout.pool_id;
ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
rbd_dev->header.object_prefix,
obj_req->ex.oe_objno); if (ret) return ERR_PTR(ret);
switch (obj_request->img_request->data_type) { case OBJ_REQUEST_NODATA: case OBJ_REQUEST_BIO: case OBJ_REQUEST_BVECS: break; /* Nothing to do */ case OBJ_REQUEST_OWN_BVECS:
kfree(obj_request->bvec_pos.bvecs); break; default:
BUG();
}
kfree(obj_request->img_extents); if (obj_request->copyup_bvecs) { for (i = 0; i < obj_request->copyup_bvec_count; i++) { if (obj_request->copyup_bvecs[i].bv_page)
__free_page(obj_request->copyup_bvecs[i].bv_page);
}
kfree(obj_request->copyup_bvecs);
}
/* * Parent image reference counting is used to determine when an * image's parent fields can be safely torn down--after there are no * more in-flight requests to the parent image. When the last * reference is dropped, cleaning them up is safe.
*/ staticvoid rbd_dev_parent_put(struct rbd_device *rbd_dev)
{ int counter;
if (!rbd_dev->parent_spec) return;
counter = atomic_dec_return_safe(&rbd_dev->parent_ref); if (counter > 0) return;
/* Last reference; clean up parent data structures */
if (!counter)
rbd_dev_unparent(rbd_dev); else
rbd_warn(rbd_dev, "parent reference underflow");
}
/* * If an image has a non-zero parent overlap, get a reference to its * parent. * * Returns true if the rbd device has a parent with a non-zero * overlap and a reference for it was successfully taken, or * false otherwise.
*/ staticbool rbd_dev_parent_get(struct rbd_device *rbd_dev)
{ int counter = 0;
if (!rbd_dev->parent_spec) returnfalse;
if (rbd_dev->parent_overlap)
counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
if (counter < 0)
rbd_warn(rbd_dev, "parent reference overflow");
/* * Only snap_id is captured here, for reads. For writes, snapshot * context is captured in rbd_img_object_requests() after exclusive * lock is ensured to be held.
*/ staticvoid rbd_img_capture_header(struct rbd_img_request *img_req)
{ struct rbd_device *rbd_dev = img_req->rbd_dev;
lockdep_assert_held(&rbd_dev->header_rwsem);
if (!rbd_img_is_write(img_req))
img_req->snap_id = rbd_dev->spec->snap_id;
if (rbd_dev_parent_get(rbd_dev))
img_request_layered_set(img_req);
}
spin_lock(&rbd_dev->object_map_lock);
state = __rbd_object_map_get(rbd_dev, objno);
spin_unlock(&rbd_dev->object_map_lock); return state;
}
staticbool use_object_map(struct rbd_device *rbd_dev)
{ /* * An image mapped read-only can't use the object map -- it isn't * loaded because the header lock isn't acquired. Someone else can * write to the image and update the object map behind our back. * * A snapshot can't be written to, so using the object map is always * safe.
*/ if (!rbd_is_snap(rbd_dev) && rbd_is_ro(rbd_dev)) returnfalse;
/* * This function needs snap_id (or more precisely just something to * distinguish between HEAD and snapshot object maps), new_state and * current_state that were passed to rbd_object_map_update(). * * To avoid allocating and stashing a context we piggyback on the OSD * request. A HEAD update has two ops (assert_locked). For new_state * and current_state we decode our own object_map_update op, encoded in * rbd_cls_object_map_update().
*/ staticint rbd_object_map_update_finish(struct rbd_obj_request *obj_req, struct ceph_osd_request *osd_req)
{ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; struct ceph_osd_data *osd_data;
u64 objno;
u8 state, new_state, current_state; bool has_current_state; void *p;
if (osd_req->r_result) return osd_req->r_result;
/* * Nothing to do for a snapshot object map.
*/ if (osd_req->r_num_ops == 1) return 0;
/* drop extents completely beyond the overlap */ while (cnt && img_extents[cnt - 1].fe_off >= overlap)
cnt--;
if (cnt) { struct ceph_file_extent *ex = &img_extents[cnt - 1];
/* trim final overlapping extent */ if (ex->fe_off + ex->fe_len > overlap)
ex->fe_len = overlap - ex->fe_off;
}
*num_img_extents = cnt;
}
/* * Determine the byte range(s) covered by either just the object extent * or the entire object in the parent image.
*/ staticint rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req, bool entire)
{ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; int ret;
if (!rbd_dev->parent_overlap) return 0;
ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
entire ? 0 : obj_req->ex.oe_off,
entire ? rbd_dev->layout.object_size :
obj_req->ex.oe_len,
&obj_req->img_extents,
&obj_req->num_img_extents); if (ret) return ret;
/* * Align the range to alloc_size boundary and punt on discards * that are too small to free up any space. * * alloc_size == object_size && is_tail() is a special case for * filestore with filestore_punch_hole = false, needed to allow * truncate (in addition to delete).
*/ if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
!rbd_obj_is_tail(obj_req)) {
off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
rbd_dev->opts->alloc_size); if (off >= next_off) return 1;
switch (obj_req->img_request->op_type) { case OBJ_OP_WRITE:
__rbd_osd_setup_write_ops(osd_req, which); break; case OBJ_OP_DISCARD:
__rbd_osd_setup_discard_ops(osd_req, which); break; case OBJ_OP_ZEROOUT:
__rbd_osd_setup_zeroout_ops(osd_req, which); break; default:
BUG();
}
}
/* * Prune the list of object requests (adjust offset and/or length, drop * redundant requests). Prepare object request state machines and image * request state machine for execution.
*/ staticint __rbd_img_fill_request(struct rbd_img_request *img_req)
{ struct rbd_obj_request *obj_req, *next_obj_req; int ret;
for_each_obj_request_safe(img_req, obj_req, next_obj_req) { switch (img_req->op_type) { case OBJ_OP_READ:
ret = rbd_obj_init_read(obj_req); break; case OBJ_OP_WRITE:
ret = rbd_obj_init_write(obj_req); break; case OBJ_OP_DISCARD:
ret = rbd_obj_init_discard(obj_req); break; case OBJ_OP_ZEROOUT:
ret = rbd_obj_init_zeroout(obj_req); break; default:
BUG();
} if (ret < 0) return ret; if (ret > 0) {
rbd_img_obj_request_del(img_req, obj_req); continue;
}
}
img_req->state = RBD_IMG_START; return 0;
}
union rbd_img_fill_iter { struct ceph_bio_iter bio_iter; struct ceph_bvec_iter bvec_iter;
};
struct rbd_img_fill_ctx { enum obj_request_type pos_type; union rbd_img_fill_iter *pos; union rbd_img_fill_iter iter;
ceph_object_extent_fn_t set_pos_fn;
ceph_object_extent_fn_t count_fn;
ceph_object_extent_fn_t copy_fn;
};
/* * While su != os && sc == 1 is technically not fancy (it's the same * layout as su == os && sc == 1), we can't use the nocopy path for it * because ->set_pos_fn() should be called only once per object. * ceph_file_to_extents() invokes action_fn once per stripe unit, so * treat su != os && sc == 1 as fancy.
*/ staticbool rbd_layout_is_fancy(struct ceph_file_layout *l)
{ return l->stripe_unit != l->object_size;
}
/* * Create object requests and set each object request's starting * position in the provided bio (list) or bio_vec array.
*/
fctx->iter = *fctx->pos; for (i = 0; i < num_img_extents; i++) {
ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
img_extents[i].fe_off,
img_extents[i].fe_len,
&img_req->object_extents,
alloc_object_extent, img_req,
fctx->set_pos_fn, &fctx->iter); if (ret) return ret;
}
return __rbd_img_fill_request(img_req);
}
/*
--> --------------------
--> maximum size reached
--> --------------------
Messung V0.5
¤ Dauer der Verarbeitung: 0.59 Sekunden
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.