/** * DOC: Bio flags. * * For certain flags set on user bios, if the user bio has not yet been acknowledged, setting those * flags on our own bio(s) for that request may help underlying layers better fulfill the user * bio's needs. This constant contains the aggregate of those flags; VDO strips all the other * flags, as they convey incorrect information. * * These flags are always irrelevant if we have already finished the user bio as they are only * hints on IO importance. If VDO has finished the user bio, any remaining IO done doesn't care how * important finishing the finished bio was. * * Note that bio.c contains the complete list of flags we believe may be set; the following list * explains the action taken with each of those flags VDO could receive: * * * REQ_SYNC: Passed down if the user bio is not yet completed, since it indicates the user bio * completion is required for further work to be done by the issuer. * * REQ_META: Passed down if the user bio is not yet completed, since it may mean the lower layer * treats it as more urgent, similar to REQ_SYNC. * * REQ_PRIO: Passed down if the user bio is not yet completed, since it indicates the user bio is * important. * * REQ_NOMERGE: Set only if the incoming bio was split; irrelevant to VDO IO. * * REQ_IDLE: Set if the incoming bio had more IO quickly following; VDO's IO pattern doesn't * match incoming IO, so this flag is incorrect for it. * * REQ_FUA: Handled separately, and irrelevant to VDO IO otherwise. * * REQ_RAHEAD: Passed down, as, for reads, it indicates trivial importance. * * REQ_BACKGROUND: Not passed down, as VIOs are a limited resource and VDO needs them recycled * ASAP to service heavy load, which is the only place where REQ_BACKGROUND might aid in load * prioritization.
*/ static blk_opf_t PASSTHROUGH_FLAGS = (REQ_PRIO | REQ_META | REQ_SYNC | REQ_RAHEAD);
/** * DOC: * * The data_vio_pool maintains the pool of data_vios which a vdo uses to service incoming bios. For * correctness, and in order to avoid potentially expensive or blocking memory allocations during * normal operation, the number of concurrently active data_vios is capped. Furthermore, in order * to avoid starvation of reads and writes, at most 75% of the data_vios may be used for * discards. The data_vio_pool is responsible for enforcing these limits. Threads submitting bios * for which a data_vio or discard permit are not available will block until the necessary * resources are available. The pool is also responsible for distributing resources to blocked * threads and waking them. Finally, the pool attempts to batch the work of recycling data_vios by * performing the work of actually assigning resources to blocked threads or placing data_vios back * into the pool on a single cpu at a time. * * The pool contains two "limiters", one for tracking data_vios and one for tracking discard * permits. The limiters also provide safe cross-thread access to pool statistics without the need * to take the pool's lock. When a thread submits a bio to a vdo device, it will first attempt to * get a discard permit if it is a discard, and then to get a data_vio. If the necessary resources * are available, the incoming bio will be assigned to the acquired data_vio, and it will be * launched. However, if either of these are unavailable, the arrival time of the bio is recorded * in the bio's bi_private field, the bio and its submitter are both queued on the appropriate * limiter and the submitting thread will then put itself to sleep. (note that this mechanism will * break if jiffies are only 32 bits.) * * Whenever a data_vio has completed processing for the bio it was servicing, release_data_vio() * will be called on it. This function will add the data_vio to a funnel queue, and then check the * state of the pool. If the pool is not currently processing released data_vios, the pool's * completion will be enqueued on a cpu queue. This obviates the need for the releasing threads to * hold the pool's lock, and also batches release work while avoiding starvation of the cpu * threads. * * Whenever the pool's completion is run on a cpu thread, it calls process_release_callback() which * processes a batch of returned data_vios (currently at most 32) from the pool's funnel queue. For * each data_vio, it first checks whether that data_vio was processing a discard. If so, and there * is a blocked bio waiting for a discard permit, that permit is notionally transferred to the * eldest discard waiter, and that waiter is moved to the end of the list of discard bios waiting * for a data_vio. If there are no discard waiters, the discard permit is returned to the pool. * Next, the data_vio is assigned to the oldest blocked bio which either has a discard permit, or * doesn't need one and relaunched. If neither of these exist, the data_vio is returned to the * pool. Finally, if any waiting bios were launched, the threads which blocked trying to submit * them are awakened.
*/
/* Bookkeeping structure for a single type of resource. */ struct limiter { /* The data_vio_pool to which this limiter belongs */ struct data_vio_pool *pool; /* The maximum number of data_vios available */
data_vio_count_t limit; /* The number of resources in use */
data_vio_count_t busy; /* The maximum number of resources ever simultaneously in use */
data_vio_count_t max_busy; /* The number of resources to release */
data_vio_count_t release_count; /* The number of waiters to wake */
data_vio_count_t wake_count; /* The list of waiting bios which are known to process_release_callback() */ struct bio_list waiters; /* The list of waiting bios which are not yet known to process_release_callback() */ struct bio_list new_waiters; /* The list of waiters which have their permits */ struct bio_list *permitted_waiters; /* The function for assigning a resource to a waiter */
assigner_fn assigner; /* The queue of blocked threads */
wait_queue_head_t blocked_threads; /* The arrival time of the eldest waiter */
u64 arrival;
};
/* * A data_vio_pool is a collection of preallocated data_vios which may be acquired from any thread, * and are released in batches.
*/ struct data_vio_pool { /* Completion for scheduling releases */ struct vdo_completion completion; /* The administrative state of the pool */ struct admin_state state; /* Lock protecting the pool */
spinlock_t lock; /* The main limiter controlling the total data_vios in the pool. */ struct limiter limiter; /* The limiter controlling data_vios for discard */ struct limiter discard_limiter; /* The list of bios which have discard permits but still need a data_vio */ struct bio_list permitted_discards; /* The list of available data_vios */ struct list_head available; /* The queue of data_vios waiting to be returned to the pool */ struct funnel_queue *queue; /* Whether the pool is processing, or scheduled to process releases */
atomic_t processing; /* The data vios in the pool */ struct data_vio data_vios[];
};
/* The steps taken cleaning up a VIO, in the order they are performed. */ enum data_vio_cleanup_stage {
VIO_CLEANUP_START,
VIO_RELEASE_HASH_LOCK = VIO_CLEANUP_START,
VIO_RELEASE_ALLOCATED,
VIO_RELEASE_RECOVERY_LOCKS,
VIO_RELEASE_LOGICAL,
VIO_CLEANUP_DONE
};
staticinline u64 get_arrival_time(struct bio *bio)
{ return (u64) bio->bi_private;
}
/** * check_for_drain_complete_locked() - Check whether a data_vio_pool has no outstanding data_vios * or waiters while holding the pool's lock.
*/ staticbool check_for_drain_complete_locked(struct data_vio_pool *pool)
{ if (pool->limiter.busy > 0) returnfalse;
VDO_ASSERT_LOG_ONLY((pool->discard_limiter.busy == 0), "no outstanding discard permits");
staticvoid acknowledge_data_vio(struct data_vio *data_vio)
{ struct vdo *vdo = vdo_from_data_vio(data_vio); struct bio *bio = data_vio->user_bio; int error = vdo_status_to_errno(data_vio->vio.completion.result);
if (bio == NULL) return;
VDO_ASSERT_LOG_ONLY((data_vio->remaining_discard <=
(u32) (VDO_BLOCK_SIZE - data_vio->offset)), "data_vio to acknowledge is not an incomplete discard");
data_vio->user_bio = NULL;
vdo_count_bios(&vdo->stats.bios_acknowledged, bio); if (data_vio->is_partial)
vdo_count_bios(&vdo->stats.bios_acknowledged_partial, bio);
/** * pack_status() - Convert a data_vio_compression_status into a u32 which may be stored * atomically. * @status: The state to convert. * * Return: The compression state packed into a u32.
*/ static u32 __must_check pack_status(struct data_vio_compression_status status)
{ return status.stage | (status.may_not_compress ? MAY_NOT_COMPRESS_MASK : 0);
}
/** * set_data_vio_compression_status() - Set the compression status of a data_vio. * @data_vio: The data_vio to change. * @status: The expected current status of the data_vio. * @new_status: The status to set. * * Return: true if the new status was set, false if the data_vio's compression status did not * match the expected state, and so was left unchanged.
*/ staticbool __must_check
set_data_vio_compression_status(struct data_vio *data_vio, struct data_vio_compression_status status, struct data_vio_compression_status new_status)
{
u32 actual;
u32 expected = pack_status(status);
u32 replacement = pack_status(new_status);
/* * Extra barriers because this was original developed using a CAS operation that implicitly * had them.
*/
smp_mb__before_atomic();
actual = atomic_cmpxchg(&data_vio->compression.status, expected, replacement); /* same as before_atomic */
smp_mb__after_atomic(); return (expected == actual);
}
struct data_vio_compression_status advance_data_vio_compression_stage(struct data_vio *data_vio)
{ for (;;) { struct data_vio_compression_status status =
get_data_vio_compression_status(data_vio); struct data_vio_compression_status new_status = status;
if (status.stage == DATA_VIO_POST_PACKER) { /* We're already in the last stage. */ return status;
}
if (status.may_not_compress) { /* * Compression has been dis-allowed for this VIO, so skip the rest of the * path and go to the end.
*/
new_status.stage = DATA_VIO_POST_PACKER;
} else { /* Go to the next state. */
new_status.stage++;
}
if (set_data_vio_compression_status(data_vio, status, new_status)) return new_status;
/* Another thread changed the status out from under us so try again. */
}
}
/** * cancel_data_vio_compression() - Prevent this data_vio from being compressed or packed. * * Return: true if the data_vio is in the packer and the caller was the first caller to cancel it.
*/ bool cancel_data_vio_compression(struct data_vio *data_vio)
{ struct data_vio_compression_status status, new_status;
for (;;) {
status = get_data_vio_compression_status(data_vio); if (status.may_not_compress || (status.stage == DATA_VIO_POST_PACKER)) { /* This data_vio is already set up to not block in the packer. */ break;
}
/** * attempt_logical_block_lock() - Attempt to acquire the lock on a logical block. * @completion: The data_vio for an external data request as a completion. * * This is the start of the path for all external requests. It is registered in launch_data_vio().
*/ staticvoid attempt_logical_block_lock(struct vdo_completion *completion)
{ struct data_vio *data_vio = as_data_vio(completion); struct lbn_lock *lock = &data_vio->logical; struct vdo *vdo = vdo_from_data_vio(data_vio); struct data_vio *lock_holder; int result;
assert_data_vio_in_logical_zone(data_vio);
if (data_vio->logical.lbn >= vdo->states.vdo.config.logical_blocks) {
continue_data_vio_with_error(data_vio, VDO_OUT_OF_RANGE); return;
}
result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn,
data_vio, false, (void **) &lock_holder); if (result != VDO_SUCCESS) {
continue_data_vio_with_error(data_vio, result); return;
}
if (lock_holder == NULL) { /* We got the lock */
launch_locked_request(data_vio); return;
}
result = VDO_ASSERT(lock_holder->logical.locked, "logical block lock held"); if (result != VDO_SUCCESS) {
continue_data_vio_with_error(data_vio, result); return;
}
/* * If the new request is a pure read request (not read-modify-write) and the lock_holder is * writing and has received an allocation, service the read request immediately by copying * data from the lock_holder to avoid having to flush the write out of the packer just to * prevent the read from waiting indefinitely. If the lock_holder does not yet have an * allocation, prevent it from blocking in the packer and wait on it. This is necessary in * order to prevent returning data that may not have actually been written.
*/ if (!data_vio->write && READ_ONCE(lock_holder->allocation_succeeded)) {
copy_to_bio(data_vio->user_bio, lock_holder->vio.data + data_vio->offset);
acknowledge_data_vio(data_vio);
complete_data_vio(completion); return;
}
/* * Prevent writes and read-modify-writes from blocking indefinitely on lock holders in the * packer.
*/ if (lock_holder->write && cancel_data_vio_compression(lock_holder)) {
data_vio->compression.lock_holder = lock_holder;
launch_data_vio_packer_callback(data_vio,
vdo_remove_lock_holder_from_packer);
}
}
/** * launch_data_vio() - (Re)initialize a data_vio to have a new logical block number, keeping the * same parent and other state and send it on its way.
*/ staticvoid launch_data_vio(struct data_vio *data_vio, logical_block_number_t lbn)
{ struct vdo_completion *completion = &data_vio->vio.completion;
/* * Clearing the tree lock must happen before initializing the LBN lock, which also adds * information to the tree lock.
*/
memset(&data_vio->tree_lock, 0, sizeof(data_vio->tree_lock));
initialize_lbn_lock(data_vio, lbn);
INIT_LIST_HEAD(&data_vio->hash_lock_entry);
INIT_LIST_HEAD(&data_vio->write_entry);
staticvoid launch_bio(struct vdo *vdo, struct data_vio *data_vio, struct bio *bio)
{
logical_block_number_t lbn; /* * Zero out the fields which don't need to be preserved (i.e. which are not pointers to * separately allocated objects).
*/
memset(data_vio, 0, offsetof(struct data_vio, vio));
memset(&data_vio->compression, 0, offsetof(struct compression_state, block));
/* * Discards behave very differently than other requests when coming in from device-mapper. * We have to be able to handle any size discards and various sector offsets within a * block.
*/ if (bio_op(bio) == REQ_OP_DISCARD) {
data_vio->remaining_discard = bio->bi_iter.bi_size;
data_vio->write = true;
data_vio->is_discard = true; if (data_vio->is_partial) {
vdo_count_bios(&vdo->stats.bios_in_partial, bio);
data_vio->read = true;
}
} elseif (data_vio->is_partial) {
vdo_count_bios(&vdo->stats.bios_in_partial, bio);
data_vio->read = true; if (bio_data_dir(bio) == WRITE)
data_vio->write = true;
} elseif (bio_data_dir(bio) == READ) {
data_vio->read = true;
} else { /* * Copy the bio data to a char array so that we can continue to use the data after * we acknowledge the bio.
*/
copy_from_bio(bio, data_vio->vio.data);
data_vio->is_zero = is_zero_block(data_vio->vio.data);
data_vio->write = true;
}
if (data_vio->user_bio->bi_opf & REQ_FUA)
data_vio->fua = true;
VDO_ASSERT_LOG_ONLY((limiter->release_count <= limiter->busy), "Release count %u is not more than busy count %u",
limiter->release_count, limiter->busy);
get_waiters(limiter); for (; (limiter->release_count > 0) && !bio_list_empty(waiters); limiter->release_count--)
limiter->assigner(limiter);
/** * schedule_releases() - Ensure that release processing is scheduled. * * If this call switches the state to processing, enqueue. Otherwise, some other thread has already * done so.
*/ staticvoid schedule_releases(struct data_vio_pool *pool)
{ /* Pairs with the barrier in process_release_callback(). */
smp_mb__before_atomic(); if (atomic_cmpxchg(&pool->processing, false, true)) return;
spin_lock(&pool->lock); /* * There is a race where waiters could be added while we are in the unlocked section above. * Those waiters could not see the resources we are now about to release, so we assign * those resources now as we have no guarantee of being rescheduled. This is handled in * update_limiter().
*/
update_limiter(&pool->discard_limiter);
list_splice(&returned, &pool->available);
update_limiter(&pool->limiter);
to_wake = pool->limiter.wake_count;
pool->limiter.wake_count = 0;
discards_to_wake = pool->discard_limiter.wake_count;
pool->discard_limiter.wake_count = 0;
atomic_set(&pool->processing, false); /* Pairs with the barrier in schedule_releases(). */
smp_mb();
/** * initialize_data_vio() - Allocate the components of a data_vio. * * The caller is responsible for cleaning up the data_vio on error. * * Return: VDO_SUCCESS or an error.
*/ staticint initialize_data_vio(struct data_vio *data_vio, struct vdo *vdo)
{ struct bio *bio; int result;
BUILD_BUG_ON(VDO_BLOCK_SIZE > PAGE_SIZE);
result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "data_vio data",
&data_vio->vio.data); if (result != VDO_SUCCESS) return vdo_log_error_strerror(result, "data_vio data allocation failure");
result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "compressed block",
&data_vio->compression.block); if (result != VDO_SUCCESS) { return vdo_log_error_strerror(result, "data_vio compressed block allocation failure");
}
result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "vio scratch",
&data_vio->scratch_block); if (result != VDO_SUCCESS) return vdo_log_error_strerror(result, "data_vio scratch allocation failure");
result = vdo_create_bio(&bio); if (result != VDO_SUCCESS) return vdo_log_error_strerror(result, "data_vio data bio allocation failure");
/** * make_data_vio_pool() - Initialize a data_vio pool. * @vdo: The vdo to which the pool will belong. * @pool_size: The number of data_vios in the pool. * @discard_limit: The maximum number of data_vios which may be used for discards. * @pool_ptr: A pointer to hold the newly allocated pool.
*/ int make_data_vio_pool(struct vdo *vdo, data_vio_count_t pool_size,
data_vio_count_t discard_limit, struct data_vio_pool **pool_ptr)
{ int result; struct data_vio_pool *pool;
data_vio_count_t i;
result = vdo_allocate_extended(struct data_vio_pool, pool_size, struct data_vio,
__func__, &pool); if (result != VDO_SUCCESS) return result;
VDO_ASSERT_LOG_ONLY((discard_limit <= pool_size), "discard limit does not exceed pool size");
initialize_limiter(&pool->discard_limiter, pool, assign_discard_permit,
discard_limit);
pool->discard_limiter.permitted_waiters = &pool->permitted_discards;
initialize_limiter(&pool->limiter, pool, assign_data_vio_to_waiter, pool_size);
pool->limiter.permitted_waiters = &pool->limiter.waiters;
INIT_LIST_HEAD(&pool->available);
spin_lock_init(&pool->lock);
vdo_set_admin_state_code(&pool->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
vdo_initialize_completion(&pool->completion, vdo, VDO_DATA_VIO_POOL_COMPLETION);
vdo_prepare_completion(&pool->completion, process_release_callback,
process_release_callback, vdo->thread_config.cpu_thread,
NULL);
result = vdo_make_funnel_queue(&pool->queue); if (result != VDO_SUCCESS) {
free_data_vio_pool(vdo_forget(pool)); return result;
}
for (i = 0; i < pool_size; i++) { struct data_vio *data_vio = &pool->data_vios[i];
result = initialize_data_vio(data_vio, vdo); if (result != VDO_SUCCESS) {
destroy_data_vio(data_vio);
free_data_vio_pool(pool); return result;
}
/** * free_data_vio_pool() - Free a data_vio_pool and the data_vios in it. * * All data_vios must be returned to the pool before calling this function.
*/ void free_data_vio_pool(struct data_vio_pool *pool)
{ struct data_vio *data_vio, *tmp;
if (pool == NULL) return;
/* * Pairs with the barrier in process_release_callback(). Possibly not needed since it * caters to an enqueue vs. free race.
*/
smp_mb();
BUG_ON(atomic_read(&pool->processing));
spin_lock(&pool->lock);
VDO_ASSERT_LOG_ONLY((pool->limiter.busy == 0), "data_vio pool must not have %u busy entries when being freed",
pool->limiter.busy);
VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->limiter.waiters) &&
bio_list_empty(&pool->limiter.new_waiters)), "data_vio pool must not have threads waiting to read or write when being freed");
VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->discard_limiter.waiters) &&
bio_list_empty(&pool->discard_limiter.new_waiters)), "data_vio pool must not have threads waiting to discard when being freed");
spin_unlock(&pool->lock);
/** * vdo_launch_bio() - Acquire a data_vio from the pool, assign the bio to it, and launch it. * * This will block if data_vios or discard permits are not available.
*/ void vdo_launch_bio(struct data_vio_pool *pool, struct bio *bio)
{ struct data_vio *data_vio;
VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&pool->state), "data_vio_pool not quiescent on acquire");
staticvoid assert_on_vdo_cpu_thread(conststruct vdo *vdo, constchar *name)
{
VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo->thread_config.cpu_thread), "%s called on cpu thread", name);
}
/** * drain_data_vio_pool() - Wait asynchronously for all data_vios to be returned to the pool. * @completion: The completion to notify when the pool has drained.
*/ void drain_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion)
{
assert_on_vdo_cpu_thread(completion->vdo, __func__);
vdo_start_draining(&pool->state, VDO_ADMIN_STATE_SUSPENDING, completion,
initiate_drain);
}
/** * resume_data_vio_pool() - Resume a data_vio pool. * @completion: The completion to notify when the pool has resumed.
*/ void resume_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion)
{
assert_on_vdo_cpu_thread(completion->vdo, __func__);
vdo_continue_completion(completion, vdo_resume_if_quiescent(&pool->state));
}
/** * dump_data_vio_pool() - Dump a data_vio pool to the log. * @dump_vios: Whether to dump the details of each busy data_vio as well.
*/ void dump_data_vio_pool(struct data_vio_pool *pool, bool dump_vios)
{ /* * In order that syslog can empty its buffer, sleep after 35 elements for 4ms (till the * second clock tick). These numbers were picked based on experiments with lab machines.
*/ staticconstint ELEMENTS_PER_BATCH = 35; staticconstint SLEEP_FOR_SYSLOG = 4000;
if (pool == NULL) return;
spin_lock(&pool->lock);
dump_limiter("data_vios", &pool->limiter);
dump_limiter("discard permits", &pool->discard_limiter); if (dump_vios) { int i; int dumped = 0;
for (i = 0; i < pool->limiter.limit; i++) { struct data_vio *data_vio = &pool->data_vios[i];
/** * release_allocated_lock() - Release the PBN lock and/or the reference on the allocated block at * the end of processing a data_vio.
*/ staticvoid release_allocated_lock(struct vdo_completion *completion)
{ struct data_vio *data_vio = as_data_vio(completion);
if (!lock->locked) { /* The lock is not locked, so it had better not be registered in the lock map. */ struct data_vio *lock_holder = vdo_int_map_get(lock_map, lock->lbn);
VDO_ASSERT_LOG_ONLY((data_vio != lock_holder), "no logical block lock held for block %llu",
(unsignedlonglong) lock->lbn); return;
}
/* Release the lock by removing the lock from the map. */
lock_holder = vdo_int_map_remove(lock_map, lock->lbn);
VDO_ASSERT_LOG_ONLY((data_vio == lock_holder), "logical block lock mismatch for block %llu",
(unsignedlonglong) lock->lbn);
lock->locked = false;
}
/** transfer_lock() - Transfer a contended LBN lock to the eldest waiter. */ staticvoid transfer_lock(struct data_vio *data_vio, struct lbn_lock *lock)
{ struct data_vio *lock_holder, *next_lock_holder; int result;
VDO_ASSERT_LOG_ONLY(lock->locked, "lbn_lock with waiters is not locked");
/* Another data_vio is waiting for the lock, transfer it in a single lock map operation. */
next_lock_holder =
vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock->waiters));
/* Transfer the remaining lock waiters to the next lock holder. */
vdo_waitq_transfer_all_waiters(&lock->waiters,
&next_lock_holder->logical.waiters);
result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn,
next_lock_holder, true, (void **) &lock_holder); if (result != VDO_SUCCESS) {
continue_data_vio_with_error(next_lock_holder, result); return;
}
/* * If there are still waiters, other data_vios must be trying to get the lock we just * transferred. We must ensure that the new lock holder doesn't block in the packer.
*/ if (vdo_waitq_has_waiters(&next_lock_holder->logical.waiters))
cancel_data_vio_compression(next_lock_holder);
/* * Avoid stack overflow on lock transfer. * FIXME: this is only an issue in the 1 thread config.
*/
next_lock_holder->vio.completion.requeue = true;
launch_locked_request(next_lock_holder);
}
/** * release_logical_lock() - Release the logical block lock and flush generation lock at the end of * processing a data_vio.
*/ staticvoid release_logical_lock(struct vdo_completion *completion)
{ struct data_vio *data_vio = as_data_vio(completion); struct lbn_lock *lock = &data_vio->logical;
assert_data_vio_in_logical_zone(data_vio);
if (vdo_waitq_has_waiters(&lock->waiters))
transfer_lock(data_vio, lock); else
release_lock(data_vio, lock);
/** clean_hash_lock() - Release the hash lock at the end of processing a data_vio. */ staticvoid clean_hash_lock(struct vdo_completion *completion)
{ struct data_vio *data_vio = as_data_vio(completion);
assert_data_vio_in_hash_zone(data_vio); if (completion->result != VDO_SUCCESS) {
vdo_clean_failed_hash_lock(data_vio); return;
}
/** * finish_cleanup() - Make some assertions about a data_vio which has finished cleaning up. * * If it is part of a multi-block discard, starts on the next block, otherwise, returns it to the * pool.
*/ staticvoid finish_cleanup(struct data_vio *data_vio)
{ struct vdo_completion *completion = &data_vio->vio.completion;
u32 discard_size = min_t(u32, data_vio->remaining_discard,
VDO_BLOCK_SIZE - data_vio->offset);
VDO_ASSERT_LOG_ONLY(data_vio->allocation.lock == NULL, "complete data_vio has no allocation lock");
VDO_ASSERT_LOG_ONLY(data_vio->hash_lock == NULL, "complete data_vio has no hash lock"); if ((data_vio->remaining_discard <= discard_size) ||
(completion->result != VDO_SUCCESS)) { struct data_vio_pool *pool = completion->vdo->data_vio_pool;
/** perform_cleanup_stage() - Perform the next step in the process of cleaning up a data_vio. */ staticvoid perform_cleanup_stage(struct data_vio *data_vio, enum data_vio_cleanup_stage stage)
{ struct vdo *vdo = vdo_from_data_vio(data_vio);
switch (stage) { case VIO_RELEASE_HASH_LOCK: if (data_vio->hash_lock != NULL) {
launch_data_vio_hash_zone_callback(data_vio, clean_hash_lock); return;
}
fallthrough;
case VIO_RELEASE_ALLOCATED: if (data_vio_has_allocation(data_vio)) {
launch_data_vio_allocated_zone_callback(data_vio,
release_allocated_lock); return;
}
fallthrough;
case VIO_RELEASE_RECOVERY_LOCKS: if ((data_vio->recovery_sequence_number > 0) &&
(READ_ONCE(vdo->read_only_notifier.read_only_error) == VDO_SUCCESS) &&
(data_vio->vio.completion.result != VDO_READ_ONLY))
vdo_log_warning("VDO not read-only when cleaning data_vio with RJ lock");
fallthrough;
case VIO_RELEASE_LOGICAL:
launch_data_vio_logical_callback(data_vio, release_logical_lock); return;
staticvoid enter_read_only_mode(struct vdo_completion *completion)
{ if (vdo_is_read_only(completion->vdo)) return;
if (completion->result != VDO_READ_ONLY) { struct data_vio *data_vio = as_data_vio(completion);
vdo_log_error_strerror(completion->result, "Preparing to enter read-only mode: data_vio for LBN %llu (becoming mapped to %llu, previously mapped to %llu, allocated %llu) is completing with a fatal error after operation %s",
(unsignedlonglong) data_vio->logical.lbn,
(unsignedlonglong) data_vio->new_mapped.pbn,
(unsignedlonglong) data_vio->mapped.pbn,
(unsignedlonglong) data_vio->allocation.pbn,
get_data_vio_operation_name(data_vio));
}
/** * get_data_vio_operation_name() - Get the name of the last asynchronous operation performed on a * data_vio.
*/ constchar *get_data_vio_operation_name(struct data_vio *data_vio)
{
BUILD_BUG_ON((MAX_VIO_ASYNC_OPERATION_NUMBER - MIN_VIO_ASYNC_OPERATION_NUMBER) !=
ARRAY_SIZE(ASYNC_OPERATION_NAMES));
/** * data_vio_allocate_data_block() - Allocate a data block. * * @write_lock_type: The type of write lock to obtain on the block. * @callback: The callback which will attempt an allocation in the current zone and continue if it * succeeds. * @error_handler: The handler for errors while allocating.
*/ void data_vio_allocate_data_block(struct data_vio *data_vio, enum pbn_lock_type write_lock_type,
vdo_action_fn callback, vdo_action_fn error_handler)
{ struct allocation *allocation = &data_vio->allocation;
VDO_ASSERT_LOG_ONLY((allocation->pbn == VDO_ZERO_BLOCK), "data_vio does not have an allocation");
allocation->write_lock_type = write_lock_type;
allocation->zone = vdo_get_next_allocation_zone(data_vio->logical.zone);
allocation->first_allocation_zone = allocation->zone->zone_number;
/** * release_data_vio_allocation_lock() - Release the PBN lock on a data_vio's allocated block. * @reset: If true, the allocation will be reset (i.e. any allocated pbn will be forgotten). * * If the reference to the locked block is still provisional, it will be released as well.
*/ void release_data_vio_allocation_lock(struct data_vio *data_vio, bool reset)
{ struct allocation *allocation = &data_vio->allocation;
physical_block_number_t locked_pbn = allocation->pbn;
assert_data_vio_in_allocated_zone(data_vio);
if (reset || vdo_pbn_lock_has_provisional_reference(allocation->lock))
allocation->pbn = VDO_ZERO_BLOCK;
/** * uncompress_data_vio() - Uncompress the data a data_vio has just read. * @mapping_state: The mapping state indicating which fragment to decompress. * @buffer: The buffer to receive the uncompressed data.
*/ int uncompress_data_vio(struct data_vio *data_vio, enum block_mapping_state mapping_state, char *buffer)
{ int size;
u16 fragment_offset, fragment_size; struct compressed_block *block = data_vio->compression.block; int result = vdo_get_compressed_block_fragment(mapping_state, block,
&fragment_offset, &fragment_size);
if (result != VDO_SUCCESS) {
vdo_log_debug("%s: compressed fragment error %d", __func__, result); return result;
}
/** * modify_for_partial_write() - Do the modify-write part of a read-modify-write cycle. * @completion: The data_vio which has just finished its read. * * This callback is registered in read_block().
*/ staticvoid modify_for_partial_write(struct vdo_completion *completion)
{ struct data_vio *data_vio = as_data_vio(completion); char *data = data_vio->vio.data; struct bio *bio = data_vio->user_bio;
staticvoid read_endio(struct bio *bio)
{ struct data_vio *data_vio = vio_as_data_vio(bio->bi_private); int result = blk_status_to_errno(bio->bi_status);
vdo_count_completed_bios(bio); if (result != VDO_SUCCESS) {
continue_data_vio_with_error(data_vio, result); return;
}
if (data_vio->is_partial) {
result = vio_reset_bio(vio, vio->data, read_endio, opf,
data_vio->mapped.pbn);
} else { /* A full 4k read. Use the incoming bio to avoid having to copy the data */
bio_reset(vio->bio, vio->bio->bi_bdev, opf);
bio_init_clone(data_vio->user_bio->bi_bdev, vio->bio,
data_vio->user_bio, GFP_KERNEL);
/* Copy over the original bio iovec and opflags. */
vdo_set_bio_properties(vio->bio, vio, read_endio, opf,
data_vio->mapped.pbn);
}
}
if (result != VDO_SUCCESS) {
continue_data_vio_with_error(data_vio, result); return;
}
/** * update_block_map() - Rendezvous of the data_vio and decrement completions after each has * made its reference updates. Handle any error from either, or proceed * to updating the block map. * @completion: The completion of the write in progress.
*/ staticvoid update_block_map(struct vdo_completion *completion)
{ struct data_vio *data_vio = reference_count_update_completion_as_data_vio(completion);
assert_data_vio_in_logical_zone(data_vio);
if (!data_vio->first_reference_operation_complete) { /* Rendezvous, we're first */
data_vio->first_reference_operation_complete = true; return;
}
if (data_vio->downgrade_allocation_lock) { /* * Now that the data has been written, it's safe to deduplicate against the * block. Downgrade the allocation lock to a read lock so it can be used later by * the hash lock. This is done here since it needs to happen sometime before we * return to the hash zone, and we are currently on the correct thread. For * compressed blocks, the downgrade will have already been done.
*/
vdo_downgrade_pbn_write_lock(data_vio->allocation.lock, false);
}
/** * read_old_block_mapping() - Get the previous PBN/LBN mapping of an in-progress write. * * Gets the previous PBN mapped to this LBN from the block map, so as to make an appropriate * journal entry referencing the removal of this LBN->PBN mapping.
*/ staticvoid read_old_block_mapping(struct vdo_completion *completion)
{ struct data_vio *data_vio = as_data_vio(completion);
/** * pack_compressed_data() - Attempt to pack the compressed data_vio into a block. * * This is the callback registered in launch_compress_data_vio().
*/ staticvoid pack_compressed_data(struct vdo_completion *completion)
{ struct data_vio *data_vio = as_data_vio(completion);
assert_data_vio_in_packer_zone(data_vio);
if (!vdo_get_compressing(vdo_from_data_vio(data_vio)) ||
get_data_vio_compression_status(data_vio).may_not_compress) {
write_data_vio(data_vio); return;
}
/** * compress_data_vio() - Do the actual work of compressing the data on a CPU queue. * * This callback is registered in launch_compress_data_vio().
*/ staticvoid compress_data_vio(struct vdo_completion *completion)
{ struct data_vio *data_vio = as_data_vio(completion); int size;
assert_data_vio_on_cpu_thread(data_vio);
/* * By putting the compressed data at the start of the compressed block data field, we won't * need to copy it if this data_vio becomes a compressed write agent.
*/
size = LZ4_compress_default(data_vio->vio.data,
data_vio->compression.block->data, VDO_BLOCK_SIZE,
VDO_MAX_COMPRESSED_FRAGMENT_SIZE,
(char *) vdo_get_work_queue_private_data()); if ((size > 0) && (size < VDO_COMPRESSED_BLOCK_DATA_SIZE)) {
data_vio->compression.size = size;
launch_data_vio_packer_callback(data_vio, pack_compressed_data); return;
}
write_data_vio(data_vio);
}
/** * launch_compress_data_vio() - Continue a write by attempting to compress the data. * * This is a re-entry point to vio_write used by hash locks.
*/ void launch_compress_data_vio(struct data_vio *data_vio)
{
VDO_ASSERT_LOG_ONLY(!data_vio->is_duplicate, "compressing a non-duplicate block");
VDO_ASSERT_LOG_ONLY(data_vio->hash_lock != NULL, "data_vio to compress has a hash_lock");
VDO_ASSERT_LOG_ONLY(data_vio_has_allocation(data_vio), "data_vio to compress has an allocation");
/* * There are 4 reasons why a data_vio which has reached this point will not be eligible for * compression: * * 1) Since data_vios can block indefinitely in the packer, it would be bad to do so if the * write request also requests FUA. * * 2) A data_vio should not be compressed when compression is disabled for the vdo. * * 3) A data_vio could be doing a partial write on behalf of a larger discard which has not * yet been acknowledged and hence blocking in the packer would be bad. * * 4) Some other data_vio may be waiting on this data_vio in which case blocking in the * packer would also be bad.
*/ if (data_vio->fua ||
!vdo_get_compressing(vdo_from_data_vio(data_vio)) ||
((data_vio->user_bio != NULL) && (bio_op(data_vio->user_bio) == REQ_OP_DISCARD)) ||
(advance_data_vio_compression_stage(data_vio).stage != DATA_VIO_COMPRESSING)) {
write_data_vio(data_vio); return;
}
/** prepare_for_dedupe() - Prepare for the dedupe path after attempting to get an allocation. */ staticvoid prepare_for_dedupe(struct data_vio *data_vio)
{ /* We don't care what thread we are on. */
VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "must not prepare to dedupe zero blocks");
/* * Before we can dedupe, we need to know the record name, so the first * step is to hash the block data.
*/
data_vio->last_async_operation = VIO_ASYNC_OP_HASH_DATA_VIO;
launch_data_vio_cpu_callback(data_vio, hash_data_vio, CPU_Q_HASH_BLOCK_PRIORITY);
}
/** * write_bio_finished() - This is the bio_end_io function registered in write_block() to be called * when a data_vio's write to the underlying storage has completed.
*/ staticvoid write_bio_finished(struct bio *bio)
{ struct data_vio *data_vio = vio_as_data_vio((struct vio *) bio->bi_private);
/** write_data_vio() - Write a data block to storage without compression. */ void write_data_vio(struct data_vio *data_vio)
{ struct data_vio_compression_status status, new_status; int result;
if (!data_vio_has_allocation(data_vio)) { /* * There was no space to write this block and we failed to deduplicate or compress * it.
*/
continue_data_vio_with_error(data_vio, VDO_NO_SPACE); return;
}
do {
status = get_data_vio_compression_status(data_vio);
} while ((status.stage != DATA_VIO_POST_PACKER) &&
!set_data_vio_compression_status(data_vio, status, new_status));
/* Write the data from the data block buffer. */
result = vio_reset_bio(&data_vio->vio, data_vio->vio.data,
write_bio_finished, REQ_OP_WRITE,
data_vio->allocation.pbn); if (result != VDO_SUCCESS) {
continue_data_vio_with_error(data_vio, result); return;
}
/** * acknowledge_write_callback() - Acknowledge a write to the requestor. * * This callback is registered in allocate_block() and continue_write_with_block_map_slot().
*/ staticvoid acknowledge_write_callback(struct vdo_completion *completion)
{ struct data_vio *data_vio = as_data_vio(completion); struct vdo *vdo = completion->vdo;
VDO_ASSERT_LOG_ONLY((!vdo_uses_bio_ack_queue(vdo) ||
(vdo_get_callback_thread_id() == vdo->thread_config.bio_ack_thread)), "%s() called on bio ack queue", __func__);
VDO_ASSERT_LOG_ONLY(data_vio_has_flush_generation_lock(data_vio), "write VIO to be acknowledged has a flush generation lock");
acknowledge_data_vio(data_vio); if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) { /* This is a zero write or discard */
update_metadata_for_data_vio_write(data_vio, NULL); return;
}
prepare_for_dedupe(data_vio);
}
/** * allocate_block() - Attempt to allocate a block in the current allocation zone. * * This callback is registered in continue_write_with_block_map_slot().
*/ staticvoid allocate_block(struct vdo_completion *completion)
{ struct data_vio *data_vio = as_data_vio(completion);
assert_data_vio_in_allocated_zone(data_vio);
if (!vdo_allocate_block_in_zone(data_vio)) return;
/** * handle_allocation_error() - Handle an error attempting to allocate a block. * * This error handler is registered in continue_write_with_block_map_slot().
*/ staticvoid handle_allocation_error(struct vdo_completion *completion)
{ struct data_vio *data_vio = as_data_vio(completion);
if (completion->result == VDO_NO_SPACE) { /* We failed to get an allocation, but we can try to dedupe. */
vdo_reset_completion(completion);
completion->error_handler = handle_data_vio_error;
prepare_for_dedupe(data_vio); return;
}
/* We got a "real" error, not just a failure to allocate, so fail the request. */
handle_data_vio_error(completion);
}
staticint assert_is_discard(struct data_vio *data_vio)
{ int result = VDO_ASSERT(data_vio->is_discard, "data_vio with no block map page is a discard");
return ((result == VDO_SUCCESS) ? result : VDO_READ_ONLY);
}
/** * continue_data_vio_with_block_map_slot() - Read the data_vio's mapping from the block map. * * This callback is registered in launch_read_data_vio().
*/ void continue_data_vio_with_block_map_slot(struct vdo_completion *completion)
{ struct data_vio *data_vio = as_data_vio(completion);
if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) { /* * This is a discard for a block on a block map page which has not been allocated, so * there's nothing more we need to do.
*/
completion->callback = complete_data_vio;
continue_data_vio_with_error(data_vio, assert_is_discard(data_vio)); return;
}
/*
--> --------------------
--> maximum size reached
--> --------------------
Messung V0.5
¤ Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.0.12Bemerkung:
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.