/* set rm_replay_slots for offline slot(s) */ for (i = 0; i < replay_map->rm_slots; i++) { if (ocfs2_slot_to_node_num_locked(osb, i, &node_num) == -ENOENT)
replay_map->rm_replay_slots[i] = 1;
}
if (replay_map->rm_state != REPLAY_NEEDED) return;
for (i = 0; i < replay_map->rm_slots; i++) if (replay_map->rm_replay_slots[i])
ocfs2_queue_recovery_completion(osb->journal, i, NULL,
NULL, NULL,
orphan_reco_type);
replay_map->rm_state = REPLAY_DONE;
}
staticvoid ocfs2_recovery_disable(struct ocfs2_super *osb, enum ocfs2_recovery_state state)
{
mutex_lock(&osb->recovery_lock); /* * If recovery thread is not running, we can directly transition to * final state.
*/ if (!ocfs2_recovery_thread_running(osb)) {
osb->recovery_state = state + 1; goto out_lock;
}
osb->recovery_state = state; /* Wait for recovery thread to acknowledge state transition */
wait_event_cmd(osb->recovery_event,
!ocfs2_recovery_thread_running(osb) ||
osb->recovery_state >= state + 1,
mutex_unlock(&osb->recovery_lock),
mutex_lock(&osb->recovery_lock));
out_lock:
mutex_unlock(&osb->recovery_lock);
/* * At this point we know that no more recovery work can be queued so * wait for any recovery completion work to complete.
*/ if (osb->ocfs2_wq)
flush_workqueue(osb->ocfs2_wq);
}
/* disable any new recovery threads and wait for any currently
* running ones to exit. Do this before setting the vol_state. */
ocfs2_recovery_disable(osb, OCFS2_REC_WANT_DISABLE);
/* * Now that recovery is shut down, and the osb is about to be * freed, the osb_lock is not taken here.
*/
rm = osb->recovery_map; /* XXX: Should we bug if there are dirty entries? */
for (i = 0; i < rm->rm_used; i++) { if (rm->rm_entries[i] == node_num) break;
}
if (i < rm->rm_used) { /* XXX: be careful with the pointer math */
memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
(rm->rm_used - i - 1) * sizeof(unsignedint));
rm->rm_used--;
}
spin_unlock(&osb->osb_lock);
}
staticint ocfs2_commit_cache(struct ocfs2_super *osb)
{ int status = 0; unsignedint flushed; struct ocfs2_journal *journal = NULL;
journal = osb->journal;
/* Flush all pending commits and checkpoint the journal. */
down_write(&journal->j_trans_barrier);
/* Nested transaction? Just return the handle... */ if (journal_current_handle()) return jbd2_journal_start(journal, max_buffs);
sb_start_intwrite(osb->sb);
down_read(&osb->journal->j_trans_barrier);
handle = jbd2_journal_start(journal, max_buffs); if (IS_ERR(handle)) {
up_read(&osb->journal->j_trans_barrier);
sb_end_intwrite(osb->sb);
mlog_errno(PTR_ERR(handle));
if (is_journal_aborted(journal)) {
ocfs2_abort(osb->sb, "Detected aborted journal\n");
handle = ERR_PTR(-EROFS);
}
} else { if (!ocfs2_mount_local(osb))
atomic_inc(&(osb->journal->j_num_trans));
}
return handle;
}
int ocfs2_commit_trans(struct ocfs2_super *osb,
handle_t *handle)
{ int ret, nested; struct ocfs2_journal *journal = osb->journal;
BUG_ON(!handle);
nested = handle->h_ref > 1;
ret = jbd2_journal_stop(handle); if (ret < 0)
mlog_errno(ret);
if (!nested) {
up_read(&journal->j_trans_barrier);
sb_end_intwrite(osb->sb);
}
return ret;
}
/* * 'nblocks' is what you want to add to the current transaction. * * This might call jbd2_journal_restart() which will commit dirty buffers * and then restart the transaction. Before calling * ocfs2_extend_trans(), any changed blocks should have been * dirtied. After calling it, all blocks which need to be changed must * go through another set of journal_access/journal_dirty calls. * * WARNING: This will not release any semaphores or disk locks taken * during the transaction, so make sure they were taken *before* * start_trans or we'll have ordering deadlocks. * * WARNING2: Note that we do *not* drop j_trans_barrier here. This is * good because transaction ids haven't yet been recorded on the * cluster locks associated with this handle.
*/ int ocfs2_extend_trans(handle_t *handle, int nblocks)
{ int status, old_nblocks;
BUG_ON(!handle);
BUG_ON(nblocks < 0);
if (!nblocks) return 0;
old_nblocks = jbd2_handle_buffer_credits(handle);
trace_ocfs2_extend_trans(old_nblocks, nblocks);
#ifdef CONFIG_OCFS2_DEBUG_FS
status = 1; #else
status = jbd2_journal_extend(handle, nblocks, 0); if (status < 0) {
mlog_errno(status); goto bail;
} #endif
if (status > 0) {
trace_ocfs2_extend_trans_restart(old_nblocks + nblocks);
status = jbd2_journal_restart(handle,
old_nblocks + nblocks); if (status < 0) {
mlog_errno(status); goto bail;
}
}
status = 0;
bail: return status;
}
/* * Make sure handle has at least 'nblocks' credits available. If it does not * have that many credits available, we will try to extend the handle to have * enough credits. If that fails, we will restart transaction to have enough * credits. Similar notes regarding data consistency and locking implications * as for ocfs2_extend_trans() apply here.
*/ int ocfs2_assure_trans_credits(handle_t *handle, int nblocks)
{ int old_nblks = jbd2_handle_buffer_credits(handle);
/* * If we have fewer than thresh credits, extend by OCFS2_MAX_TRANS_DATA. * If that fails, restart the transaction & regain write access for the * buffer head which is used for metadata modifications. * Taken from Ext4: extend_or_restart_transaction()
*/ int ocfs2_allocate_extend_trans(handle_t *handle, int thresh)
{ int status, old_nblks;
/* * We aren't guaranteed to have the superblock here, so we * must unconditionally compute the ecc data. * __ocfs2_journal_access() will only set the triggers if * metaecc is enabled.
*/
ocfs2_block_check_compute(data, size, data + ot->ot_offset);
}
/* * Quota blocks have their own trigger because the struct ocfs2_block_check * offset depends on the blocksize.
*/ staticvoid ocfs2_dq_frozen_trigger(struct jbd2_buffer_trigger_type *triggers, struct buffer_head *bh, void *data, size_t size)
{ struct ocfs2_disk_dqtrailer *dqt =
ocfs2_block_dqtrailer(size, data);
/* * We aren't guaranteed to have the superblock here, so we * must unconditionally compute the ecc data. * __ocfs2_journal_access() will only set the triggers if * metaecc is enabled.
*/
ocfs2_block_check_compute(data, size, &dqt->dq_check);
}
/* * Directory blocks also have their own trigger because the * struct ocfs2_block_check offset depends on the blocksize.
*/ staticvoid ocfs2_db_frozen_trigger(struct jbd2_buffer_trigger_type *triggers, struct buffer_head *bh, void *data, size_t size)
{ struct ocfs2_dir_block_trailer *trailer =
ocfs2_dir_trailer_from_size(size, data);
/* * We aren't guaranteed to have the superblock here, so we * must unconditionally compute the ecc data. * __ocfs2_journal_access() will only set the triggers if * metaecc is enabled.
*/
ocfs2_block_check_compute(data, size, &trailer->db_check);
}
/* we can safely remove this assertion after testing. */ if (!buffer_uptodate(bh)) {
mlog(ML_ERROR, "giving me a buffer that's not uptodate!\n");
mlog(ML_ERROR, "b_blocknr=%llu, b_state=0x%lx\n",
(unsignedlonglong)bh->b_blocknr, bh->b_state);
lock_buffer(bh); /* * A previous transaction with a couple of buffer heads fail * to checkpoint, so all the bhs are marked as BH_Write_EIO. * For current transaction, the bh is just among those error * bhs which previous transaction handle. We can't just clear * its BH_Write_EIO and reuse directly, since other bhs are * not written to disk yet and that will cause metadata * inconsistency. So we should set fs read-only to avoid * further damage.
*/ if (buffer_write_io_error(bh) && !buffer_uptodate(bh)) {
unlock_buffer(bh); return ocfs2_error(osb->sb, "A previous attempt to " "write this buffer head failed\n");
}
unlock_buffer(bh);
}
/* Set the current transaction information on the ci so * that the locking code knows whether it can drop it's locks * on this ci or not. We're protected from the commit * thread updating the current transaction id until * ocfs2_commit_trans() because ocfs2_start_trans() took
* j_trans_barrier for us. */
ocfs2_set_ci_lock_trans(osb->journal, ci);
ocfs2_metadata_cache_io_lock(ci); switch (type) { case OCFS2_JOURNAL_ACCESS_CREATE: case OCFS2_JOURNAL_ACCESS_WRITE:
status = jbd2_journal_get_write_access(handle, bh); break;
case OCFS2_JOURNAL_ACCESS_UNDO:
status = jbd2_journal_get_undo_access(handle, bh); break;
default:
status = -EINVAL;
mlog(ML_ERROR, "Unknown access type!\n");
} if (!status && ocfs2_meta_ecc(osb) && triggers)
jbd2_journal_set_triggers(bh, &triggers->ot_triggers);
ocfs2_metadata_cache_io_unlock(ci);
if (status < 0)
mlog(ML_ERROR, "Error %d getting %d access to buffer!\n",
status, type);
return status;
}
int ocfs2_journal_access_di(handle_t *handle, struct ocfs2_caching_info *ci, struct buffer_head *bh, int type)
{ struct ocfs2_super *osb = OCFS2_SB(ocfs2_metadata_cache_get_super(ci));
status = jbd2_journal_dirty_metadata(handle, bh); if (status) {
mlog_errno(status); if (!is_handle_aborted(handle)) {
journal_t *journal = handle->h_transaction->t_journal;
mlog(ML_ERROR, "jbd2_journal_dirty_metadata failed: " "handle type %u started at line %u, credits %u/%u " "errcode %d. Aborting transaction and journal.\n",
handle->h_type, handle->h_line_no,
handle->h_requested_credits,
jbd2_handle_buffer_credits(handle), status);
handle->h_err = status;
jbd2_journal_abort_handle(handle);
jbd2_journal_abort(journal, status);
}
}
}
/* * alloc & initialize skeleton for journal structure. * ocfs2_journal_init() will make fs have journal ability.
*/ int ocfs2_journal_alloc(struct ocfs2_super *osb)
{ int status = 0; struct ocfs2_journal *journal;
journal = kzalloc(sizeof(struct ocfs2_journal), GFP_KERNEL); if (!journal) {
mlog(ML_ERROR, "unable to alloc journal\n");
status = -ENOMEM; goto bail;
}
osb->journal = journal;
journal->j_osb = osb;
/* Skip recovery waits here - journal inode metadata never * changes in a live cluster so it can be considered an
* exception to the rule. */
status = ocfs2_inode_lock_full(inode, &bh, 1, OCFS2_META_LOCK_RECOVERY); if (status < 0) { if (status != -ERESTARTSYS)
mlog(ML_ERROR, "Could not get lock on journal!\n"); goto done;
}
inode_lock = 1;
di = (struct ocfs2_dinode *)bh->b_data;
if (i_size_read(inode) < OCFS2_MIN_JOURNAL_SIZE) {
mlog(ML_ERROR, "Journal file size (%lld) is too small!\n",
i_size_read(inode));
status = -EINVAL; goto done;
}
staticint ocfs2_journal_toggle_dirty(struct ocfs2_super *osb, int dirty, int replayed)
{ int status; unsignedint flags; struct ocfs2_journal *journal = osb->journal; struct buffer_head *bh = journal->j_bh; struct ocfs2_dinode *fe;
fe = (struct ocfs2_dinode *)bh->b_data;
/* The journal bh on the osb always comes from ocfs2_journal_init() * and was validated there inside ocfs2_inode_lock_full(). It's a
* code bug if we mess it up. */
BUG_ON(!OCFS2_IS_VALID_DINODE(fe));
ocfs2_compute_meta_ecc(osb->sb, bh->b_data, &fe->i_check);
status = ocfs2_write_block(osb, bh, INODE_CACHE(journal->j_inode)); if (status < 0)
mlog_errno(status);
return status;
}
/* * If the journal has been kmalloc'd it needs to be freed after this * call.
*/ void ocfs2_journal_shutdown(struct ocfs2_super *osb)
{ struct ocfs2_journal *journal = NULL; int status = 0; struct inode *inode = NULL; int num_running_trans = 0;
BUG_ON(!osb);
journal = osb->journal; if (!journal) goto done;
inode = journal->j_inode;
if (journal->j_state != OCFS2_JOURNAL_LOADED) goto done;
/* need to inc inode use count - jbd2_journal_destroy will iput. */ if (!igrab(inode))
BUG();
/* Do a commit_cache here. It will flush our journal, *and* * release any locks that are still held. * set the SHUTDOWN flag and release the trans lock.
* the commit thread will take the trans lock for us below. */
journal->j_state = OCFS2_JOURNAL_IN_SHUTDOWN;
/* The OCFS2_JOURNAL_IN_SHUTDOWN will signal to commit_cache to not * drop the trans_lock (which we want to hold until we
* completely destroy the journal. */ if (osb->commit_task) { /* Wait for the commit thread */
trace_ocfs2_journal_shutdown_wait(osb->commit_task);
kthread_stop(osb->commit_task);
osb->commit_task = NULL;
}
if (ocfs2_mount_local(osb) &&
(journal->j_journal->j_flags & JBD2_LOADED)) {
jbd2_journal_lock_updates(journal->j_journal);
status = jbd2_journal_flush(journal->j_journal, 0);
jbd2_journal_unlock_updates(journal->j_journal); if (status < 0)
mlog_errno(status);
}
/* Shutdown the kernel journal system */ if (!jbd2_journal_destroy(journal->j_journal) && !status) { /* * Do not toggle if flush was unsuccessful otherwise * will leave dirty metadata in a "clean" journal
*/
status = ocfs2_journal_toggle_dirty(osb, 0, 0); if (status < 0)
mlog_errno(status);
}
journal->j_journal = NULL;
staticvoid ocfs2_clear_journal_error(struct super_block *sb,
journal_t *journal, int slot)
{ int olderr;
olderr = jbd2_journal_errno(journal); if (olderr) {
mlog(ML_ERROR, "File system error %d recorded in " "journal %u.\n", olderr, slot);
mlog(ML_ERROR, "File system on device %s needs checking.\n",
sb->s_id);
if (replayed) {
jbd2_journal_lock_updates(journal->j_journal);
status = jbd2_journal_flush(journal->j_journal, 0);
jbd2_journal_unlock_updates(journal->j_journal); if (status < 0)
mlog_errno(status);
}
status = ocfs2_journal_toggle_dirty(osb, 1, replayed); if (status < 0) {
mlog_errno(status); goto done;
}
/* Launch the commit thread */ if (!local) {
osb->commit_task = kthread_run(ocfs2_commit_thread, osb, "ocfs2cmt-%s", osb->uuid_str); if (IS_ERR(osb->commit_task)) {
status = PTR_ERR(osb->commit_task);
osb->commit_task = NULL;
mlog(ML_ERROR, "unable to launch ocfs2commit thread, " "error=%d", status); goto done;
}
} else
osb->commit_task = NULL;
done: return status;
}
/* 'full' flag tells us whether we clear out all blocks or if we just
* mark the journal clean */ int ocfs2_journal_wipe(struct ocfs2_journal *journal, int full)
{ int status;
BUG_ON(!journal);
status = jbd2_journal_wipe(journal->j_journal, full); if (status < 0) {
mlog_errno(status); goto bail;
}
status = ocfs2_journal_toggle_dirty(journal->j_osb, 0, 0); if (status < 0)
mlog_errno(status);
/* * JBD Might read a cached version of another nodes journal file. We * don't want this as this file changes often and we get no * notification on those changes. The only way to be sure that we've * got the most up to date version of those blocks then is to force * read them off disk. Just searching through the buffer cache won't * work as there may be pages backing this file which are still marked * up to date. We know things can't change on this file underneath us * as we have the lock by now :)
*/ staticint ocfs2_force_read_journal(struct inode *inode)
{ int status = 0; int i;
u64 v_blkno, p_blkno, p_blocks, num_blocks; struct buffer_head *bh = NULL; struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
num_blocks = ocfs2_blocks_for_bytes(inode->i_sb, i_size_read(inode));
v_blkno = 0; while (v_blkno < num_blocks) {
status = ocfs2_extent_map_get_blocks(inode, v_blkno,
&p_blkno, &p_blocks, NULL); if (status < 0) {
mlog_errno(status); goto bail;
}
for (i = 0; i < p_blocks; i++, p_blkno++) {
bh = __find_get_block_nonatomic(osb->sb->s_bdev, p_blkno,
osb->sb->s_blocksize); /* block not cached. */ if (!bh) continue;
brelse(bh);
bh = NULL; /* We are reading journal data which should not * be put in the uptodate cache.
*/
status = ocfs2_read_blocks_sync(osb, p_blkno, 1, &bh); if (status < 0) {
mlog_errno(status); goto bail;
}
/* Does the second half of the recovery process. By this point, the * node is marked clean and can actually be considered recovered, * hence it's no longer in the recovery map, but there's still some * cleanup we can do which shouldn't happen within the recovery thread * as locking in that context becomes very difficult if we are to take * recovering nodes into account. * * NOTE: This function can and will sleep on recovery of other nodes * during cluster locking, just like any other ocfs2 process.
*/ void ocfs2_complete_recovery(struct work_struct *work)
{ int ret = 0; struct ocfs2_journal *journal =
container_of(work, struct ocfs2_journal, j_recovery_work); struct ocfs2_super *osb = journal->j_osb; struct ocfs2_dinode *la_dinode, *tl_dinode; struct ocfs2_la_recovery_item *item, *n; struct ocfs2_quota_recovery *qrec; enum ocfs2_orphan_reco_type orphan_reco_type;
LIST_HEAD(tmp_la_list);
if (la_dinode) {
ret = ocfs2_complete_local_alloc_recovery(osb,
la_dinode); if (ret < 0)
mlog_errno(ret);
kfree(la_dinode);
}
if (tl_dinode) {
ret = ocfs2_complete_truncate_log_recovery(osb,
tl_dinode); if (ret < 0)
mlog_errno(ret);
kfree(tl_dinode);
}
ret = ocfs2_recover_orphans(osb, item->lri_slot,
orphan_reco_type); if (ret < 0)
mlog_errno(ret);
if (qrec) {
ret = ocfs2_finish_quota_recovery(osb, qrec,
item->lri_slot); if (ret < 0)
mlog_errno(ret); /* Recovery info is already freed now */
}
kfree(item);
}
trace_ocfs2_complete_recovery_end(ret);
}
/* NOTE: This function always eats your references to la_dinode and * tl_dinode, either manually on error, or by passing them to
* ocfs2_complete_recovery */ staticvoid ocfs2_queue_recovery_completion(struct ocfs2_journal *journal, int slot_num, struct ocfs2_dinode *la_dinode, struct ocfs2_dinode *tl_dinode, struct ocfs2_quota_recovery *qrec, enum ocfs2_orphan_reco_type orphan_reco_type)
{ struct ocfs2_la_recovery_item *item;
item = kmalloc(sizeof(struct ocfs2_la_recovery_item), GFP_NOFS); if (!item) { /* Though we wish to avoid it, we are in fact safe in * skipping local alloc cleanup as fsck.ocfs2 is more
* than capable of reclaiming unused space. */
kfree(la_dinode);
kfree(tl_dinode);
/* Called by the mount code to queue recovery the last part of
* recovery for it's own and offline slot(s). */ void ocfs2_complete_mount_recovery(struct ocfs2_super *osb)
{ struct ocfs2_journal *journal = osb->journal;
if (ocfs2_is_hard_readonly(osb)) return;
/* No need to queue up our truncate_log as regular cleanup will catch
* that */
ocfs2_queue_recovery_completion(journal, osb->slot_num,
osb->local_alloc_copy, NULL, NULL,
ORPHAN_NEED_TRUNCATE);
ocfs2_schedule_truncate_log_flush(osb, 0);
osb->local_alloc_copy = NULL;
/* queue to recover orphan slots for all offline slots */
ocfs2_replay_map_set_state(osb, REPLAY_NEEDED);
ocfs2_queue_replay_slots(osb, ORPHAN_NEED_TRUNCATE);
ocfs2_free_replay_slots(osb);
}
staticint __ocfs2_recovery_thread(void *arg)
{ int status, node_num, slot_num; struct ocfs2_super *osb = arg; struct ocfs2_recovery_map *rm = osb->recovery_map; int *rm_quota = NULL; int rm_quota_used = 0, i; struct ocfs2_quota_recovery *qrec;
/* Whether the quota supported. */ int quota_enabled = OCFS2_HAS_RO_COMPAT_FEATURE(osb->sb,
OCFS2_FEATURE_RO_COMPAT_USRQUOTA)
|| OCFS2_HAS_RO_COMPAT_FEATURE(osb->sb,
OCFS2_FEATURE_RO_COMPAT_GRPQUOTA);
status = ocfs2_wait_on_mount(osb); if (status < 0) { goto bail;
}
if (quota_enabled) {
rm_quota = kcalloc(osb->max_slots, sizeof(int), GFP_NOFS); if (!rm_quota) {
status = -ENOMEM; goto bail;
}
}
restart: if (quota_enabled) {
mutex_lock(&osb->recovery_lock); /* Confirm that recovery thread will no longer recover quotas */ if (osb->recovery_state == OCFS2_REC_QUOTA_WANT_DISABLE) {
osb->recovery_state = OCFS2_REC_QUOTA_DISABLED;
wake_up(&osb->recovery_event);
} if (osb->recovery_state >= OCFS2_REC_QUOTA_DISABLED)
quota_enabled = 0;
mutex_unlock(&osb->recovery_lock);
}
status = ocfs2_super_lock(osb, 1); if (status < 0) {
mlog_errno(status); goto bail;
}
status = ocfs2_compute_replay_slots(osb); if (status < 0)
mlog_errno(status);
/* queue recovery for our own slot */
ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL,
NULL, NULL, ORPHAN_NO_NEED_TRUNCATE);
spin_lock(&osb->osb_lock); while (rm->rm_used) { /* It's always safe to remove entry zero, as we won't
* clear it until ocfs2_recover_node() has succeeded. */
node_num = rm->rm_entries[0];
spin_unlock(&osb->osb_lock);
slot_num = ocfs2_node_num_to_slot(osb, node_num);
trace_ocfs2_recovery_thread_node(node_num, slot_num); if (slot_num == -ENOENT) {
status = 0; goto skip_recovery;
}
/* It is a bit subtle with quota recovery. We cannot do it * immediately because we have to obtain cluster locks from * quota files and we also don't want to just skip it because * then quota usage would be out of sync until some node takes * the slot. So we remember which nodes need quota recovery
* and when everything else is done, we recover quotas. */ if (quota_enabled) { for (i = 0; i < rm_quota_used
&& rm_quota[i] != slot_num; i++)
;
if (i == rm_quota_used)
rm_quota[rm_quota_used++] = slot_num;
}
/* Refresh all journal recovery generations from disk */
status = ocfs2_check_journals_nolocks(osb);
status = (status == -EROFS) ? 0 : status; if (status < 0)
mlog_errno(status);
/* Now it is right time to recover quotas... We have to do this under * superblock lock so that no one can start using the slot (and crash)
* before we recover it */ if (quota_enabled) { for (i = 0; i < rm_quota_used; i++) {
qrec = ocfs2_begin_quota_recovery(osb, rm_quota[i]); if (IS_ERR(qrec)) {
status = PTR_ERR(qrec);
mlog_errno(status); continue;
}
ocfs2_queue_recovery_completion(osb->journal,
rm_quota[i],
NULL, NULL, qrec,
ORPHAN_NEED_TRUNCATE);
}
}
ocfs2_super_unlock(osb, 1);
/* queue recovery for offline slots */
ocfs2_queue_replay_slots(osb, ORPHAN_NEED_TRUNCATE);
status = ocfs2_read_inode_block_full(inode, bh, OCFS2_BH_IGNORE_CACHE); if (status < 0) {
mlog_errno(status); goto bail;
}
status = 0;
bail: if (inode) { if (status || !ret_inode)
iput(inode); else
*ret_inode = inode;
} return status;
}
/* Does the actual journal replay and marks the journal inode as
* clean. Will only replay if the journal inode is marked dirty. */ staticint ocfs2_replay_journal(struct ocfs2_super *osb, int node_num, int slot_num)
{ int status; int got_lock = 0; unsignedint flags; struct inode *inode = NULL; struct ocfs2_dinode *fe;
journal_t *journal = NULL; struct buffer_head *bh = NULL;
u32 slot_reco_gen;
status = ocfs2_read_journal_inode(osb, slot_num, &bh, &inode); if (status) {
mlog_errno(status); goto done;
}
/* * As the fs recovery is asynchronous, there is a small chance that * another node mounted (and recovered) the slot before the recovery * thread could get the lock. To handle that, we dirty read the journal * inode for that slot to get the recovery generation. If it is * different than what we expected, the slot has been recovered. * If not, it needs recovery.
*/ if (osb->slot_recovery_generations[slot_num] != slot_reco_gen) {
trace_ocfs2_replay_journal_recovered(slot_num,
osb->slot_recovery_generations[slot_num], slot_reco_gen);
osb->slot_recovery_generations[slot_num] = slot_reco_gen;
status = -EBUSY; goto done;
}
/* Continue with recovery as the journal has not yet been recovered */
status = ocfs2_inode_lock_full(inode, &bh, 1, OCFS2_META_LOCK_RECOVERY); if (status < 0) {
trace_ocfs2_replay_journal_lock_err(status); if (status != -ERESTARTSYS)
mlog(ML_ERROR, "Could not lock journal!\n"); goto done;
}
got_lock = 1;
/* wipe the journal */
jbd2_journal_lock_updates(journal);
status = jbd2_journal_flush(journal, 0);
jbd2_journal_unlock_updates(journal); if (status < 0)
mlog_errno(status);
/* This will mark the node clean */
flags = le32_to_cpu(fe->id1.journal1.ij_flags);
flags &= ~OCFS2_JOURNAL_DIRTY_FL;
fe->id1.journal1.ij_flags = cpu_to_le32(flags);
ocfs2_compute_meta_ecc(osb->sb, bh->b_data, &fe->i_check);
status = ocfs2_write_block(osb, bh, INODE_CACHE(inode)); if (status < 0)
mlog_errno(status);
BUG_ON(!igrab(inode));
jbd2_journal_destroy(journal);
printk(KERN_NOTICE "ocfs2: End replay journal (node %d, slot %d) on "\ "device (%u,%u)\n", node_num, slot_num, MAJOR(osb->sb->s_dev),
MINOR(osb->sb->s_dev));
done: /* drop the lock on this nodes journal */ if (got_lock)
ocfs2_inode_unlock(inode, 1);
iput(inode);
brelse(bh);
return status;
}
/* * Do the most important parts of node recovery: * - Replay it's journal * - Stamp a clean local allocator file * - Stamp a clean truncate log * - Mark the node clean * * If this function completes without error, a node in OCFS2 can be * said to have been safely recovered. As a result, failure during the * second part of a nodes recovery process (local alloc recovery) is * far less concerning.
*/ staticint ocfs2_recover_node(struct ocfs2_super *osb, int node_num, int slot_num)
{ int status = 0; struct ocfs2_dinode *la_copy = NULL; struct ocfs2_dinode *tl_copy = NULL;
/* Should not ever be called to recover ourselves -- in that
* case we should've called ocfs2_journal_load instead. */
BUG_ON(osb->node_num == node_num);
status = ocfs2_replay_journal(osb, node_num, slot_num); if (status < 0) { if (status == -EBUSY) {
trace_ocfs2_recover_node_skip(slot_num, node_num);
status = 0; goto done;
}
mlog_errno(status); goto done;
}
/* Stamp a clean local alloc file AFTER recovering the journal... */
status = ocfs2_begin_local_alloc_recovery(osb, slot_num, &la_copy); if (status < 0) {
mlog_errno(status); goto done;
}
/* An error from begin_truncate_log_recovery is not * serious enough to warrant halting the rest of
* recovery. */
status = ocfs2_begin_truncate_log_recovery(osb, slot_num, &tl_copy); if (status < 0)
mlog_errno(status);
/* Likewise, this would be a strange but ultimately not so
* harmful place to get an error... */
status = ocfs2_clear_slot(osb, slot_num); if (status < 0)
mlog_errno(status);
/* This will kfree the memory pointed to by la_copy and tl_copy */
ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
tl_copy, NULL, ORPHAN_NEED_TRUNCATE);
status = 0;
done:
return status;
}
/* Test node liveness by trylocking his journal. If we get the lock, * we drop it here. Return 0 if we got the lock, -EAGAIN if node is
* still alive (we couldn't get the lock) and < 0 on error. */ staticint ocfs2_trylock_journal(struct ocfs2_super *osb, int slot_num)
{ int status, flags; struct inode *inode = NULL;
inode = ocfs2_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE,
slot_num); if (inode == NULL) {
mlog(ML_ERROR, "access error\n");
status = -EACCES; goto bail;
} if (is_bad_inode(inode)) {
mlog(ML_ERROR, "access error (bad inode)\n");
iput(inode);
inode = NULL;
status = -EACCES; goto bail;
}
SET_INODE_JOURNAL(inode);
flags = OCFS2_META_LOCK_RECOVERY | OCFS2_META_LOCK_NOQUEUE;
status = ocfs2_inode_lock_full(inode, NULL, 1, flags); if (status < 0) { if (status != -EAGAIN)
mlog_errno(status); goto bail;
}
ocfs2_inode_unlock(inode, 1);
bail:
iput(inode);
return status;
}
/* Call this underneath ocfs2_super_lock. It also assumes that the
* slot info struct has been updated from disk. */ int ocfs2_mark_dead_nodes(struct ocfs2_super *osb)
{ unsignedint node_num; int status, i;
u32 gen; struct buffer_head *bh = NULL; struct ocfs2_dinode *di;
/* This is called with the super block cluster lock, so we
* know that the slot map can't change underneath us. */
for (i = 0; i < osb->max_slots; i++) { /* Read journal inode to get the recovery generation */
status = ocfs2_read_journal_inode(osb, i, &bh, NULL); if (status) {
mlog_errno(status); goto bail;
}
di = (struct ocfs2_dinode *)bh->b_data;
gen = ocfs2_get_recovery_generation(di);
brelse(bh);
bh = NULL;
if (i == osb->slot_num) {
spin_unlock(&osb->osb_lock); continue;
}
status = ocfs2_slot_to_node_num_locked(osb, i, &node_num); if (status == -ENOENT) {
spin_unlock(&osb->osb_lock); continue;
}
if (__ocfs2_recovery_map_test(osb, node_num)) {
spin_unlock(&osb->osb_lock); continue;
}
spin_unlock(&osb->osb_lock);
/* Ok, we have a slot occupied by another node which * is not in the recovery map. We trylock his journal
* file here to test if he's alive. */
status = ocfs2_trylock_journal(osb, i); if (!status) { /* Since we're called from mount, we know that * the recovery thread can't race us on
* setting / checking the recovery bits. */
ocfs2_recovery_thread(osb, node_num);
} elseif ((status < 0) && (status != -EAGAIN)) {
mlog_errno(status); goto bail;
}
}
status = 0;
bail: return status;
}
/* * Scan timer should get fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT. Add some * randomness to the timeout to minimize multiple nodes firing the timer at the * same time.
*/ staticinlineunsignedlong ocfs2_orphan_scan_timeout(void)
{ unsignedlong time;
/* * ocfs2_queue_orphan_scan calls ocfs2_queue_recovery_completion for * every slot, queuing a recovery of the slot on the ocfs2_wq thread. This * is done to catch any orphans that are left over in orphan directories. * * It scans all slots, even ones that are in use. It does so to handle the * case described below: * * Node 1 has an inode it was using. The dentry went away due to memory * pressure. Node 1 closes the inode, but it's on the free list. The node * has the open lock. * Node 2 unlinks the inode. It grabs the dentry lock to notify others, * but node 1 has no dentry and doesn't get the message. It trylocks the * open lock, sees that another node has a PR, and does nothing. * Later node 2 runs its orphan dir. It igets the inode, trylocks the * open lock, sees the PR still, and does nothing. * Basically, we have to trigger an orphan iput on node 1. The only way * for this to happen is if node 1 runs node 2's orphan dir. * * ocfs2_queue_orphan_scan gets called every ORPHAN_SCAN_SCHEDULE_TIMEOUT * seconds. It gets an EX lock on os_lockres and checks sequence number * stored in LVB. If the sequence number has changed, it means some other * node has done the scan. This node skips the scan and tracks the * sequence number. If the sequence number didn't change, it means a scan * hasn't happened. The node queues a scan and increments the * sequence number in the LVB.
*/ staticvoid ocfs2_queue_orphan_scan(struct ocfs2_super *osb)
{ struct ocfs2_orphan_scan *os; int status, i;
u32 seqno = 0;
os = &osb->osb_orphan_scan;
if (atomic_read(&os->os_state) == ORPHAN_SCAN_INACTIVE) goto out;
for (i = 0; i < osb->max_slots; i++)
ocfs2_queue_recovery_completion(osb->journal, i, NULL, NULL,
NULL, ORPHAN_NO_NEED_TRUNCATE); /* * We queued a recovery on orphan slots, increment the sequence * number and update LVB so other node will skip the scan for a while
*/
seqno++;
os->os_count++;
os->os_scantime = ktime_get_seconds();
unlock:
ocfs2_orphan_scan_unlock(osb, seqno);
out:
trace_ocfs2_queue_orphan_scan_end(os->os_count, os->os_seqno,
atomic_read(&os->os_state)); return;
}
if (name_len == 1 && !strncmp(".", name, 1)) returntrue; if (name_len == 2 && !strncmp("..", name, 2)) returntrue;
/* do not include dio entry in case of orphan scan */ if ((p->orphan_reco_type == ORPHAN_NO_NEED_TRUNCATE) &&
(!strncmp(name, OCFS2_DIO_ORPHAN_PREFIX,
OCFS2_DIO_ORPHAN_PREFIX_LEN))) returntrue;
/* Skip bad inodes so that recovery can continue */
iter = ocfs2_iget(p->osb, ino,
OCFS2_FI_FLAG_ORPHAN_RECOVERY, 0); if (IS_ERR(iter)) returntrue;
if (!strncmp(name, OCFS2_DIO_ORPHAN_PREFIX,
OCFS2_DIO_ORPHAN_PREFIX_LEN))
OCFS2_I(iter)->ip_flags |= OCFS2_INODE_DIO_ORPHAN_ENTRY;
/* Skip inodes which are already added to recover list, since dio may
* happen concurrently with unlink/rename */ if (OCFS2_I(iter)->ip_next_orphan) {
iput(iter); returntrue;
}
trace_ocfs2_orphan_filldir((unsignedlonglong)OCFS2_I(iter)->ip_blkno); /* No locking is required for the next_orphan queue as there
* is only ever a single process doing orphan recovery. */
OCFS2_I(iter)->ip_next_orphan = p->head;
p->head = iter;
staticint ocfs2_orphan_recovery_can_continue(struct ocfs2_super *osb, int slot)
{ int ret;
spin_lock(&osb->osb_lock);
ret = !osb->osb_orphan_wipes[slot];
spin_unlock(&osb->osb_lock); return ret;
}
staticvoid ocfs2_mark_recovering_orphan_dir(struct ocfs2_super *osb, int slot)
{
spin_lock(&osb->osb_lock); /* Mark ourselves such that new processes in delete_inode()
* know to quit early. */
ocfs2_node_map_set_bit(osb, &osb->osb_recovering_orphan_dirs, slot); while (osb->osb_orphan_wipes[slot]) { /* If any processes are already in the middle of an * orphan wipe on this dir, then we need to wait for
* them. */
spin_unlock(&osb->osb_lock);
wait_event_interruptible(osb->osb_wipe_event,
ocfs2_orphan_recovery_can_continue(osb, slot));
spin_lock(&osb->osb_lock);
}
spin_unlock(&osb->osb_lock);
}
/* * Orphan recovery. Each mounted node has it's own orphan dir which we * must run during recovery. Our strategy here is to build a list of * the inodes in the orphan dir and iget/iput them. The VFS does * (most) of the rest of the work. * * Orphan recovery can happen at any time, not just mount so we have a * couple of extra considerations. * * - We grab as many inodes as we can under the orphan dir lock - * doing iget() outside the orphan dir risks getting a reference on * an invalid inode. * - We must be sure not to deadlock with other processes on the * system wanting to run delete_inode(). This can happen when they go * to lock the orphan dir and the orphan recovery process attempts to * iget() inside the orphan dir lock. This can be avoided by * advertising our state to ocfs2_delete_inode().
*/ staticint ocfs2_recover_orphans(struct ocfs2_super *osb, int slot, enum ocfs2_orphan_reco_type orphan_reco_type)
{ int ret = 0; struct inode *inode = NULL; struct inode *iter; struct ocfs2_inode_info *oi; struct buffer_head *di_bh = NULL; struct ocfs2_dinode *di = NULL;
trace_ocfs2_recover_orphans(slot);
ocfs2_mark_recovering_orphan_dir(osb, slot);
ret = ocfs2_queue_orphans(osb, slot, &inode, orphan_reco_type);
ocfs2_clear_recovering_orphan_dir(osb, slot);
/* Error here should be noted, but we want to continue with as
* many queued inodes as we've got. */ if (ret)
mlog_errno(ret);
while (inode) {
oi = OCFS2_I(inode);
trace_ocfs2_recover_orphans_iput(
(unsignedlonglong)oi->ip_blkno);
iter = oi->ip_next_orphan;
oi->ip_next_orphan = NULL;
if (oi->ip_flags & OCFS2_INODE_DIO_ORPHAN_ENTRY) {
inode_lock(inode);
ret = ocfs2_rw_lock(inode, 1); if (ret < 0) {
mlog_errno(ret); goto unlock_mutex;
} /* * We need to take and drop the inode lock to * force read inode from disk.
*/
ret = ocfs2_inode_lock(inode, &di_bh, 1); if (ret) {
mlog_errno(ret); goto unlock_rw;
}
di = (struct ocfs2_dinode *)di_bh->b_data;
if (di->i_flags & cpu_to_le32(OCFS2_DIO_ORPHANED_FL)) {
ret = ocfs2_truncate_file(inode, di_bh,
i_size_read(inode)); if (ret < 0) { if (ret != -ENOSPC)
mlog_errno(ret); goto unlock_inode;
}
/* clear dio flag in ocfs2_inode_info */
oi->ip_flags &= ~OCFS2_INODE_DIO_ORPHAN_ENTRY;
} else {
spin_lock(&oi->ip_lock); /* Set the proper information to get us going into
* ocfs2_delete_inode. */
oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
spin_unlock(&oi->ip_lock);
}
iput(inode);
inode = iter;
}
return ret;
}
staticint __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota)
{ /* This check is good because ocfs2 will wait on our recovery * thread before changing it to something other than MOUNTED
* or DISABLED. */
wait_event(osb->osb_mount_event,
(!quota && atomic_read(&osb->vol_state) == VOLUME_MOUNTED) ||
atomic_read(&osb->vol_state) == VOLUME_MOUNTED_QUOTAS ||
atomic_read(&osb->vol_state) == VOLUME_DISABLED);
/* If there's an error on mount, then we may never get to the * MOUNTED flag, but this is set right before
* dismount_volume() so we can trust it. */ if (atomic_read(&osb->vol_state) == VOLUME_DISABLED) {
trace_ocfs2_wait_on_mount(VOLUME_DISABLED);
mlog(0, "mount error, exiting!\n"); return -EBUSY;
}
/* we can trust j_num_trans here because _should_stop() is only set in * shutdown and nobody other than ourselves should be able to start * transactions. committing on shutdown might take a few iterations
* as final transactions put deleted inodes on the list */ while (!(kthread_should_stop() &&
atomic_read(&journal->j_num_trans) == 0)) {
status = ocfs2_commit_cache(osb); if (status < 0) { staticunsignedlong abort_warn_time;
--> --------------------
--> maximum size reached
--> --------------------
Messung V0.5
¤ Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.0.27Bemerkung:
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.