// This function resets the cur pointer to the first frame theoretically // executable after a task completed (ie. each time we update some progress or // insert some tasks in the queue). // When frame_idx is set, it can be either from a completed task, or from tasks // inserted in the queue, in which case we have to make sure the cur pointer // isn't past this insert. // The special case where frame_idx is UINT_MAX is to handle the reset after // completing a task and locklessly signaling progress. In this case we don't // enter a critical section, which is needed for this function, so we set an // atomic for a delayed handling, happening here. Meaning we can call this // function without any actual update other than what's in the atomic, hence // this special case. staticinlineint reset_task_cur(const Dav1dContext *const c, struct TaskThreadData *const ttd, unsigned frame_idx)
{ constunsigned first = atomic_load(&ttd->first); unsigned reset_frame_idx = atomic_exchange(&ttd->reset_task_cur, UINT_MAX); if (reset_frame_idx < first) { if (frame_idx == UINT_MAX) return 0;
reset_frame_idx = UINT_MAX;
} if (!ttd->cur && c->fc[first].task_thread.task_cur_prev == NULL) return 0; if (reset_frame_idx != UINT_MAX) { if (frame_idx == UINT_MAX) { if (reset_frame_idx > first + ttd->cur) return 0;
ttd->cur = reset_frame_idx - first; goto cur_found;
}
} elseif (frame_idx == UINT_MAX) return 0; if (frame_idx < first) frame_idx += c->n_fc; constunsigned min_frame_idx = umin(reset_frame_idx, frame_idx); constunsigned cur_frame_idx = first + ttd->cur; if (ttd->cur < c->n_fc && cur_frame_idx < min_frame_idx) return 0; for (ttd->cur = min_frame_idx - first; ttd->cur < c->n_fc; ttd->cur++) if (c->fc[(first + ttd->cur) % c->n_fc].task_thread.task_head) break;
cur_found: for (unsigned i = ttd->cur; i < c->n_fc; i++)
c->fc[(first + i) % c->n_fc].task_thread.task_cur_prev = NULL; return 1;
}
staticinlinevoid reset_task_cur_async(struct TaskThreadData *const ttd, unsigned frame_idx, unsigned n_frames)
{ constunsigned first = atomic_load(&ttd->first); if (frame_idx < first) frame_idx += n_frames; unsigned last_idx = frame_idx; do {
frame_idx = last_idx;
last_idx = atomic_exchange(&ttd->reset_task_cur, frame_idx);
} while (last_idx < frame_idx); if (frame_idx == first && atomic_load(&ttd->first) != first) { unsigned expected = frame_idx;
atomic_compare_exchange_strong(&ttd->reset_task_cur, &expected, UINT_MAX);
}
}
staticvoid insert_tasks_between(Dav1dFrameContext *const f,
Dav1dTask *const first, Dav1dTask *const last,
Dav1dTask *const a, Dav1dTask *const b, constint cond_signal)
{ struct TaskThreadData *const ttd = f->task_thread.ttd; if (atomic_load(f->c->flush)) return;
assert(!a || a->next == b); if (!a) f->task_thread.task_head = first; else a->next = first; if (!b) f->task_thread.task_tail = last;
last->next = b;
reset_task_cur(f->c, ttd, first->frame_idx); if (cond_signal && !atomic_fetch_or(&ttd->cond_signaled, 1))
pthread_cond_signal(&ttd->cond);
}
staticvoid insert_tasks(Dav1dFrameContext *const f,
Dav1dTask *const first, Dav1dTask *const last, constint cond_signal)
{ // insert task back into task queue
Dav1dTask *t_ptr, *prev_t = NULL; for (t_ptr = f->task_thread.task_head;
t_ptr; prev_t = t_ptr, t_ptr = t_ptr->next)
{ // entropy coding precedes other steps if (t_ptr->type == DAV1D_TASK_TYPE_TILE_ENTROPY) { if (first->type > DAV1D_TASK_TYPE_TILE_ENTROPY) continue; // both are entropy if (first->sby > t_ptr->sby) continue; if (first->sby < t_ptr->sby) {
insert_tasks_between(f, first, last, prev_t, t_ptr, cond_signal); return;
} // same sby
} else { if (first->type == DAV1D_TASK_TYPE_TILE_ENTROPY) {
insert_tasks_between(f, first, last, prev_t, t_ptr, cond_signal); return;
} if (first->sby > t_ptr->sby) continue; if (first->sby < t_ptr->sby) {
insert_tasks_between(f, first, last, prev_t, t_ptr, cond_signal); return;
} // same sby if (first->type > t_ptr->type) continue; if (first->type < t_ptr->type) {
insert_tasks_between(f, first, last, prev_t, t_ptr, cond_signal); return;
} // same task type
}
// sort by tile-id
assert(first->type == DAV1D_TASK_TYPE_TILE_RECONSTRUCTION ||
first->type == DAV1D_TASK_TYPE_TILE_ENTROPY);
assert(first->type == t_ptr->type);
assert(t_ptr->sby == first->sby); constint p = first->type == DAV1D_TASK_TYPE_TILE_ENTROPY; constint t_tile_idx = (int) (first - f->task_thread.tile_tasks[p]); constint p_tile_idx = (int) (t_ptr - f->task_thread.tile_tasks[p]);
assert(t_tile_idx != p_tile_idx); if (t_tile_idx > p_tile_idx) continue;
insert_tasks_between(f, first, last, prev_t, t_ptr, cond_signal); return;
} // append at the end
insert_tasks_between(f, first, last, prev_t, NULL, cond_signal);
}
staticinlineint merge_pending(const Dav1dContext *const c) { int res = 0; for (unsigned i = 0; i < c->n_fc; i++)
res |= merge_pending_frame(&c->fc[i]); return res;
}
// XXX in theory this could be done locklessly, at this point they are no // tasks in the frameQ, so no other runner should be using this lock, but // we must add both passes at once
pthread_mutex_lock(&f->task_thread.pending_tasks.lock);
assert(f->task_thread.pending_tasks.head == NULL || pass == 2); if (!f->task_thread.pending_tasks.head)
f->task_thread.pending_tasks.head = &tasks[0]; else
f->task_thread.pending_tasks.tail->next = &tasks[0];
f->task_thread.pending_tasks.tail = prev_t;
atomic_store(&f->task_thread.pending_tasks.merge, 1);
atomic_store(&f->task_thread.init_done, 1);
pthread_mutex_unlock(&f->task_thread.pending_tasks.lock);
pthread_mutex_lock(&ttd->lock); for (;;) { if (tc->task_thread.die) break; if (atomic_load(c->flush)) goto park;
merge_pending(c); if (ttd->delayed_fg.exec) { // run delayed film grain first
delayed_fg_task(c, ttd); continue;
}
Dav1dFrameContext *f;
Dav1dTask *t, *prev_t = NULL; if (c->n_fc > 1) { // run init tasks second for (unsigned i = 0; i < c->n_fc; i++) { constunsigned first = atomic_load(&ttd->first);
f = &c->fc[(first + i) % c->n_fc]; if (atomic_load(&f->task_thread.init_done)) continue;
t = f->task_thread.task_head; if (!t) continue; if (t->type == DAV1D_TASK_TYPE_INIT) goto found; if (t->type == DAV1D_TASK_TYPE_INIT_CDF) { // XXX This can be a simple else, if adding tasks of both // passes at once (in dav1d_task_create_tile_sbrow). // Adding the tasks to the pending Q can result in a // thread merging them before setting init_done. // We will need to set init_done before adding to the // pending Q, so maybe return the tasks, set init_done, // and add to pending Q only then. constint p1 = f->in_cdf.progress ?
atomic_load(f->in_cdf.progress) : 1; if (p1) {
atomic_fetch_or(&f->task_thread.error, p1 == TILE_ERROR); goto found;
}
}
}
} while (ttd->cur < c->n_fc) { // run decoding tasks last constunsigned first = atomic_load(&ttd->first);
f = &c->fc[(first + ttd->cur) % c->n_fc];
merge_pending_frame(f);
prev_t = f->task_thread.task_cur_prev;
t = prev_t ? prev_t->next : f->task_thread.task_head; while (t) { if (t->type == DAV1D_TASK_TYPE_INIT_CDF) goto next; elseif (t->type == DAV1D_TASK_TYPE_TILE_ENTROPY ||
t->type == DAV1D_TASK_TYPE_TILE_RECONSTRUCTION)
{ // if not bottom sbrow of tile, this task will be re-added // after it's finished if (!check_tile(t, f, c->n_fc > 1)) goto found;
} elseif (t->recon_progress) { constint p = t->type == DAV1D_TASK_TYPE_ENTROPY_PROGRESS; int error = atomic_load(&f->task_thread.error);
assert(!atomic_load(&f->task_thread.done[p]) || error); constint tile_row_base = f->frame_hdr->tiling.cols *
f->frame_thread.next_tile_row[p]; if (p) {
atomic_int *const prog = &f->frame_thread.entropy_progress; constint p1 = atomic_load(prog); if (p1 < t->sby) goto next;
atomic_fetch_or(&f->task_thread.error, p1 == TILE_ERROR);
} for (int tc = 0; tc < f->frame_hdr->tiling.cols; tc++) {
Dav1dTileState *const ts = &f->ts[tile_row_base + tc]; constint p2 = atomic_load(&ts->progress[p]); if (p2 < t->recon_progress) goto next;
atomic_fetch_or(&f->task_thread.error, p2 == TILE_ERROR);
} if (t->sby + 1 < f->sbh) { // add sby+1 to list to replace this one
Dav1dTask *next_t = &t[1];
*next_t = *t;
next_t->sby++; constint ntr = f->frame_thread.next_tile_row[p] + 1; constint start = f->frame_hdr->tiling.row_start_sb[ntr]; if (next_t->sby == start)
f->frame_thread.next_tile_row[p] = ntr;
next_t->recon_progress = next_t->sby + 1;
insert_task(f, next_t, 0);
} goto found;
} elseif (t->type == DAV1D_TASK_TYPE_CDEF) {
atomic_uint *prog = f->frame_thread.copy_lpf_progress; constint p1 = atomic_load(&prog[(t->sby - 1) >> 5]); if (p1 & (1U << ((t->sby - 1) & 31))) goto found;
} else {
assert(t->deblock_progress); constint p1 = atomic_load(&f->frame_thread.deblock_progress); if (p1 >= t->deblock_progress) {
atomic_fetch_or(&f->task_thread.error, p1 == TILE_ERROR); goto found;
}
}
next:
prev_t = t;
t = t->next;
f->task_thread.task_cur_prev = prev_t;
}
ttd->cur++;
} if (reset_task_cur(c, ttd, UINT_MAX)) continue; if (merge_pending(c)) continue;
park:
tc->task_thread.flushed = 1;
pthread_cond_signal(&tc->task_thread.td.cond); // we want to be woken up next time progress is signaled
atomic_store(&ttd->cond_signaled, 0);
pthread_cond_wait(&ttd->cond, &ttd->lock);
tc->task_thread.flushed = 0;
reset_task_cur(c, ttd, UINT_MAX); continue;
found: // remove t from list if (prev_t) prev_t->next = t->next; else f->task_thread.task_head = t->next; if (!t->next) f->task_thread.task_tail = prev_t; if (t->type > DAV1D_TASK_TYPE_INIT_CDF && !f->task_thread.task_head)
ttd->cur++;
t->next = NULL; // we don't need to check cond_signaled here, since we found a task // after the last signal so we want to re-signal the next waiting thread // and again won't need to signal after that
atomic_store(&ttd->cond_signaled, 1);
pthread_cond_signal(&ttd->cond);
pthread_mutex_unlock(&ttd->lock);
found_unlocked:; constint flush = atomic_load(c->flush); int error = atomic_fetch_or(&f->task_thread.error, flush) | flush;
// run it
tc->f = f; int sby = t->sby; switch (t->type) { case DAV1D_TASK_TYPE_INIT: {
assert(c->n_fc > 1); int res = dav1d_decode_frame_init(f); int p1 = f->in_cdf.progress ? atomic_load(f->in_cdf.progress) : 1; if (res || p1 == TILE_ERROR) {
pthread_mutex_lock(&ttd->lock);
abort_frame(f, res ? res : DAV1D_ERR(EINVAL));
reset_task_cur(c, ttd, t->frame_idx);
} else {
t->type = DAV1D_TASK_TYPE_INIT_CDF; if (p1) goto found_unlocked;
add_pending(f, t);
pthread_mutex_lock(&ttd->lock);
} continue;
} case DAV1D_TASK_TYPE_INIT_CDF: {
assert(c->n_fc > 1); int res = DAV1D_ERR(EINVAL); if (!atomic_load(&f->task_thread.error))
res = dav1d_decode_frame_init_cdf(f); if (f->frame_hdr->refresh_context && !f->task_thread.update_set) {
atomic_store(f->out_cdf.progress, res < 0 ? TILE_ERROR : 1);
} if (!res) {
assert(c->n_fc > 1); for (int p = 1; p <= 2; p++) { constint res = dav1d_task_create_tile_sbrow(f, p, 0); if (res) {
pthread_mutex_lock(&ttd->lock); // memory allocation failed
atomic_store(&f->task_thread.done[2 - p], 1);
atomic_store(&f->task_thread.error, -1);
atomic_fetch_sub(&f->task_thread.task_counter,
f->frame_hdr->tiling.cols *
f->frame_hdr->tiling.rows + f->sbh);
atomic_store(&f->sr_cur.progress[p - 1], FRAME_ERROR); if (p == 2 && atomic_load(&f->task_thread.done[1])) {
assert(!atomic_load(&f->task_thread.task_counter));
dav1d_decode_frame_exit(f, DAV1D_ERR(ENOMEM));
f->n_tile_data = 0;
pthread_cond_signal(&f->task_thread.cond);
} else {
pthread_mutex_unlock(&ttd->lock);
}
}
}
pthread_mutex_lock(&ttd->lock);
} else {
pthread_mutex_lock(&ttd->lock);
abort_frame(f, res);
reset_task_cur(c, ttd, t->frame_idx);
atomic_store(&f->task_thread.init_done, 1);
} continue;
} case DAV1D_TASK_TYPE_TILE_ENTROPY: case DAV1D_TASK_TYPE_TILE_RECONSTRUCTION: { constint p = t->type == DAV1D_TASK_TYPE_TILE_ENTROPY; constint tile_idx = (int)(t - f->task_thread.tile_tasks[p]);
Dav1dTileState *const ts = &f->ts[tile_idx];
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.