staticvoid journal_pin_list_init(struct journal_entry_pin_list *p, int count)
{ for (unsigned i = 0; i < ARRAY_SIZE(p->unflushed); i++)
INIT_LIST_HEAD(&p->unflushed[i]); for (unsigned i = 0; i < ARRAY_SIZE(p->flushed); i++)
INIT_LIST_HEAD(&p->flushed[i]);
atomic_set(&p->count, count);
p->devs.nr = 0;
}
/* * Detect stuck journal conditions and trigger shutdown. Technically the journal * can end up stuck for a variety of reasons, such as a blocked I/O, journal * reservation lockup, etc. Since this is a fatal error with potentially * unpredictable characteristics, we want to be fairly conservative before we * decide to shut things down. * * Consider the journal stuck when it appears full with no ability to commit * btree transactions, to discard journal buckets, nor acquire priority * (reserved watermark) reservation.
*/ staticinlinebool
journal_error_check_stuck(struct journal *j, int error, unsigned flags)
{ struct bch_fs *c = container_of(j, struct bch_fs, journal); bool stuck = false; struct printbuf buf = PRINTBUF;
if (j->can_discard) {
spin_unlock(&j->lock); return stuck;
}
stuck = true;
/* * The journal shutdown path will set ->err_seq, but do it here first to * serialize against concurrent failures and avoid duplicate error * reports.
*/ if (j->err_seq) {
spin_unlock(&j->lock); return stuck;
}
j->err_seq = journal_cur_seq(j);
__bch2_journal_debug_to_text(&buf, j);
spin_unlock(&j->lock);
prt_printf(&buf, bch2_fmt(c, "Journal stuck! Hava a pre-reservation but journal full (error %s)"),
bch2_err_str(error));
bch2_print_str(c, KERN_ERR, buf.buf);
/* * Final processing when the last reference of a journal buffer has been * dropped. Drop the pin list reference acquired at journal entry open and write * the buffer, if requested.
*/ void bch2_journal_buf_put_final(struct journal *j, u64 seq)
{
lockdep_assert_held(&j->lock);
if (__bch2_journal_pin_put(j, seq))
bch2_journal_reclaim_fast(j);
bch2_journal_do_writes(j);
/* * for __bch2_next_write_buffer_flush_journal_buf(), when quiescing an * open journal entry
*/
wake_up(&j->wait);
}
/* * Returns true if journal entry is now closed: * * We don't close a journal_buf until the next journal_buf is finished writing, * and can be opened again - this also initializes the next journal_buf:
*/ staticvoid __journal_entry_close(struct journal *j, unsigned closed_val, bool trace)
{ struct bch_fs *c = container_of(j, struct bch_fs, journal); struct journal_buf *buf = journal_cur_buf(j); union journal_res_state old, new; unsigned sectors;
/* * We have to set last_seq here, _before_ opening a new journal entry: * * A threads may replace an old pin with a new pin on their current * journal reservation - the expectation being that the journal will * contain either what the old pin protected or what the new pin * protects. * * After the old pin is dropped journal_last_seq() won't include the old * pin, so we can only write the updated last_seq on the entry that * contains whatever the new pin protects. * * Restated, we can _not_ update last_seq for a given entry if there * could be a newer entry open with reservations/pins that have been * taken against it. * * Hence, we want update/set last_seq on the current journal entry right * before we open a new one:
*/
buf->last_seq = journal_last_seq(j);
buf->data->last_seq = cpu_to_le64(buf->last_seq);
BUG_ON(buf->last_seq > le64_to_cpu(buf->data->seq));
/* Don't close it yet if we already have a write in flight: */ if (ret)
__journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, true); elseif (nr_unwritten_journal_entries(j)) { struct journal_buf *buf = journal_cur_buf(j);
spin_lock(&j->lock);
ret = journal_entry_want_write(j);
spin_unlock(&j->lock);
return ret;
}
/* * should _only_ called from journal_res_get() - when we actually want a * journal reservation - journal entry is open means journal is dirty:
*/ staticint journal_entry_open(struct journal *j)
{ struct bch_fs *c = container_of(j, struct bch_fs, journal); struct journal_buf *buf = j->buf +
((journal_cur_seq(j) + 1) & JOURNAL_BUF_MASK); union journal_res_state old, new; int u64s;
if (u64s <= (ssize_t) j->early_journal_entries.nr) return bch_err_throw(c, journal_full);
if (fifo_empty(&j->pin) && j->reclaim_thread)
wake_up_process(j->reclaim_thread);
/* * The fifo_push() needs to happen at the same time as j->seq is * incremented for journal_last_seq() to be calculated correctly
*/
atomic64_inc(&j->seq);
journal_pin_list_init(fifo_push_ref(&j->pin), 1);
if (unlikely(bch2_journal_seq_is_blacklisted(c, journal_cur_seq(j), false))) {
bch_err(c, "attempting to open blacklisted journal seq %llu",
journal_cur_seq(j)); if (bch2_fs_emergency_read_only_locked(c))
bch_err(c, "fatal error - emergency read only"); return bch_err_throw(c, journal_shutdown);
}
ret = bch2_journal_error(j); if (unlikely(ret)) return ret;
if (j->blocked) return bch_err_throw(c, journal_blocked);
if ((flags & BCH_WATERMARK_MASK) < j->watermark) {
ret = bch_err_throw(c, journal_full);
can_discard = j->can_discard; goto out;
}
if (nr_unwritten_journal_entries(j) == ARRAY_SIZE(j->buf) && !journal_entry_is_open(j)) {
ret = bch_err_throw(c, journal_max_in_flight); goto out;
}
spin_lock(&j->lock);
journal_buf_prealloc(j);
/* * Recheck after taking the lock, so we don't race with another thread * that just did journal_entry_open() and call bch2_journal_entry_close() * unnecessarily
*/ if (journal_res_get_fast(j, res, flags)) {
ret = 0; goto unlock;
}
/* * If we couldn't get a reservation because the current buf filled up, * and we had room for a bigger entry on disk, signal that we want to * realloc the journal bufs:
*/
buf = journal_cur_buf(j); if (journal_entry_is_open(j) &&
buf->buf_size >> 9 < buf->disk_sectors &&
buf->buf_size < JOURNAL_ENTRY_SIZE_MAX)
j->buf_size_want = max(j->buf_size_want, buf->buf_size << 1);
__journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, false);
ret = journal_entry_open(j) ?: -BCH_ERR_journal_retry_open;
unlock:
can_discard = j->can_discard;
spin_unlock(&j->lock);
out: if (likely(!ret)) return 0; if (ret == -BCH_ERR_journal_retry_open) goto retry;
if (journal_error_check_stuck(j, ret, flags))
ret = bch_err_throw(c, journal_stuck);
/* * Journal is full - can't rely on reclaim from work item due to * freezing:
*/ if ((ret == -BCH_ERR_journal_full ||
ret == -BCH_ERR_journal_pin_full) &&
!(flags & JOURNAL_RES_GET_NONBLOCK)) { if (can_discard) {
bch2_journal_do_discards(j); goto retry;
}
if (mutex_trylock(&j->reclaim_lock)) {
bch2_journal_reclaim(j);
mutex_unlock(&j->reclaim_lock);
}
}
/* * Essentially the entry function to the journaling code. When bcachefs is doing * a btree insert, it calls this function to get the current journal write. * Journal write is the structure used set up journal writes. The calling * function will then add its keys to the structure, queuing them for the next * write. * * To ensure forward progress, the current task must not be holding any * btree node write locks.
*/ int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res, unsigned flags, struct btree_trans *trans)
{ int ret;
void bch2_journal_entry_res_resize(struct journal *j, struct journal_entry_res *res, unsigned new_u64s)
{ union journal_res_state state; int d = new_u64s - res->u64s;
spin_lock(&j->lock);
j->entry_u64s_reserved += d; if (d <= 0) goto out;
j->cur_entry_u64s = max_t(int, 0, j->cur_entry_u64s - d);
state = READ_ONCE(j->reservations);
if (state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL &&
state.cur_entry_offset > j->cur_entry_u64s) {
j->cur_entry_u64s += d; /* * Not enough room in current journal entry, have to flush it:
*/
__journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, true);
} else {
journal_cur_buf(j)->u64s_reserved += d;
}
out:
spin_unlock(&j->lock);
res->u64s += d;
}
/* journal flushing: */
/** * bch2_journal_flush_seq_async - wait for a journal entry to be written * @j: journal object * @seq: seq to flush * @parent: closure object to wait with * Returns: 1 if @seq has already been flushed, 0 if @seq is being flushed, * -BCH_ERR_journal_flush_err if @seq will never be flushed * * Like bch2_journal_wait_on_seq, except that it triggers a write immediately if * necessary
*/ int bch2_journal_flush_seq_async(struct journal *j, u64 seq, struct closure *parent)
{ struct bch_fs *c = container_of(j, struct bch_fs, journal); struct journal_buf *buf; int ret = 0;
if (seq <= j->flushed_seq_ondisk) return 1;
spin_lock(&j->lock);
if (WARN_ONCE(seq > journal_cur_seq(j), "requested to flush journal seq %llu, but currently at %llu",
seq, journal_cur_seq(j))) goto out;
/* Recheck under lock: */ if (j->err_seq && seq >= j->err_seq) {
ret = bch_err_throw(c, journal_flush_err); goto out;
}
if (seq <= j->flushed_seq_ondisk) {
ret = 1; goto out;
}
/* if seq was written, but not flushed - flush a newer one instead */
seq = max(seq, journal_last_unwritten_seq(j));
recheck_need_open: if (seq > journal_cur_seq(j)) { struct journal_res res = { 0 };
if (journal_entry_is_open(j))
__journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, true);
spin_unlock(&j->lock);
/* * We're called from bch2_journal_flush_seq() -> wait_event(); * but this might block. We won't usually block, so we won't * livelock:
*/
sched_annotate_sleep();
ret = bch2_journal_res_get(j, &res, jset_u64s(0), 0, NULL); if (ret) return ret;
if (parent && !closure_wait(&buf->wait, parent))
BUG();
bch2_journal_res_put(j, &res);
spin_lock(&j->lock); goto want_write;
}
/* * if write was kicked off without a flush, or if we promised it * wouldn't be a flush, flush the next sequence number instead
*/
buf = journal_seq_to_buf(j, seq); if (buf->noflush) {
seq++; goto recheck_need_open;
}
if (parent && !closure_wait(&buf->wait, parent))
BUG();
want_write: if (seq == journal_cur_seq(j))
journal_entry_want_write(j);
out:
spin_unlock(&j->lock); return ret;
}
int bch2_journal_flush_seq(struct journal *j, u64 seq, unsigned task_state)
{
u64 start_time = local_clock(); int ret, ret2;
/* * Don't update time_stats when @seq is already flushed:
*/ if (seq <= j->flushed_seq_ondisk) return 0;
ret = wait_event_state(j->wait,
(ret2 = bch2_journal_flush_seq_async(j, seq, NULL)),
task_state);
if (!ret)
bch2_time_stats_update(j->flush_seq_time, start_time);
return ret ?: ret2 < 0 ? ret2 : 0;
}
/* * bch2_journal_flush_async - if there is an open journal entry, or a journal * still being written, write it and wait for the write to complete
*/ void bch2_journal_flush_async(struct journal *j, struct closure *parent)
{
bch2_journal_flush_seq_async(j, atomic64_read(&j->seq), parent);
}
int bch2_journal_flush(struct journal *j)
{ return bch2_journal_flush_seq(j, atomic64_read(&j->seq), TASK_UNINTERRUPTIBLE);
}
/* * bch2_journal_noflush_seq - ask the journal not to issue any flushes in the * range [start, end) * @seq
*/ bool bch2_journal_noflush_seq(struct journal *j, u64 start, u64 end)
{ struct bch_fs *c = container_of(j, struct bch_fs, journal);
u64 unwritten_seq; bool ret = false;
if (!(c->sb.features & (1ULL << BCH_FEATURE_journal_no_flush))) returnfalse;
if (c->journal.flushed_seq_ondisk >= start) returnfalse;
spin_lock(&j->lock); if (c->journal.flushed_seq_ondisk >= start) goto out;
if (c)
spin_unlock(&c->journal.lock);
err_unblock: if (c) {
bch2_journal_unblock(&c->journal);
mutex_unlock(&c->sb_lock);
}
if (ret && !new_fs) for (i = 0; i < nr_got; i++)
bch2_trans_run(c,
bch2_trans_mark_metadata_bucket(trans, ca,
bu[i], BCH_DATA_free, 0,
BTREE_TRIGGER_transactional));
err_free: for (i = 0; i < nr_got; i++)
bch2_open_bucket_put(c, ob[i]);
/* * note: journal buckets aren't really counted as _sectors_ used yet, so * we don't need the disk reservation to avoid the BUG_ON() in buckets.c * when space used goes up without a reservation - but we do need the * reservation to ensure we'll actually be able to allocate: * * XXX: that's not right, disk reservations only ensure a * filesystem-wide allocation will succeed, this is a device * specific allocation - we can hang here:
*/ if (!new_fs) {
ret = bch2_disk_reservation_get(c, &disk_res,
bucket_to_sector(ca, nr - ja->nr), 1, 0); if (ret) break;
}
ret = bch2_set_nr_journal_buckets_iter(ca, nr, new_fs, &cl);
if (ret == -BCH_ERR_bucket_alloc_blocked ||
ret == -BCH_ERR_open_buckets_empty)
ret = 0; /* wait and retry */
/* * Allocate more journal space at runtime - not currently making use if it, but * the code works:
*/ int bch2_set_nr_journal_buckets(struct bch_fs *c, struct bch_dev *ca, unsigned nr)
{
down_write(&c->state_lock); int ret = bch2_set_nr_journal_buckets_loop(c, ca, nr, false);
up_write(&c->state_lock);
/* Clean filesystem? */ if (!last_seq)
last_seq = cur_seq;
u64 nr = cur_seq - last_seq;
/* * Extra fudge factor, in case we crashed when the journal pin fifo was * nearly or completely full. We'll need to be able to open additional * journal entries (at least a few) in order for journal replay to get * going:
*/
nr += nr / 4;
nr = max(nr, JOURNAL_PIN);
init_fifo(&j->pin, roundup_pow_of_two(nr), GFP_KERNEL); if (!j->pin.data) {
bch_err(c, "error reallocating journal fifo (%llu open entries)", nr); return bch_err_throw(c, ENOMEM_journal_pin_fifo);
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.