/* * This file is part of FFmpeg. * * FFmpeg is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * FFmpeg is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with FFmpeg; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
enum { /// Set when the thread is awaiting a packet.
STATE_INPUT_READY, /// Set before the codec has called ff_thread_finish_setup().
STATE_SETTING_UP, /// Set after the codec has called ff_thread_finish_setup().
STATE_SETUP_FINISHED,
};
enum {
UNINITIALIZED, ///< Thread has not been created, AVCodec->close mustn't be called
NEEDS_CLOSE, ///< FFCodec->close needs to be called
INITIALIZED, ///< Thread has been properly set up
};
/** * Context used by codec threads and stored in their AVCodecInternal thread_ctx.
*/ typedefstruct PerThreadContext { struct FrameThreadContext *parent;
pthread_t thread; int thread_init; unsigned pthread_init_cnt;///< Number of successfully initialized mutexes/conditions
pthread_cond_t input_cond; ///< Used to wait for a new packet from the main thread.
pthread_cond_t progress_cond; ///< Used by child threads to wait for progress to change.
pthread_cond_t output_cond; ///< Used by the main thread to wait for frames to finish.
pthread_mutex_t mutex; ///< Mutex used to protect the contents of the PerThreadContext.
pthread_mutex_t progress_mutex; ///< Mutex used to protect frame progress values and progress_cond.
AVCodecContext *avctx; ///< Context used to decode packets passed to this thread.
/** * Decoded frames from a single decode iteration.
*/
DecodedFrames df; int result; ///< The result of the last codec decode/encode() call.
atomic_int state;
int die; ///< Set when the thread should exit.
int hwaccel_serializing; int async_serializing;
// set to 1 in ff_thread_finish_setup() when a threadsafe hwaccel is used; // cannot check hwaccel caps directly, because // worked threads clear hwaccel state for thread-unsafe hwaccels // after each decode call int hwaccel_threadsafe;
atomic_int debug_threads; ///< Set if the FF_DEBUG_THREADS option is set.
/// The following two fields have the same semantics as the DecodeContext field int intra_only_flag; enum AVPictureType initial_pict_type;
} PerThreadContext;
/** * Context stored in the client AVCodecInternal thread_ctx.
*/ typedefstruct FrameThreadContext {
PerThreadContext *threads; ///< The contexts for each thread.
PerThreadContext *prev_thread; ///< The last thread submit_packet() was called on.
unsigned pthread_init_cnt; ///< Number of successfully initialized mutexes/conditions
pthread_mutex_t buffer_mutex; ///< Mutex used to protect get/release_buffer(). /** * This lock is used for ensuring threads run in serial when thread-unsafe * hwaccel is used.
*/
pthread_mutex_t hwaccel_mutex;
pthread_mutex_t async_mutex;
pthread_cond_t async_cond; int async_lock;
DecodedFrames df; int result;
/** * Packet to be submitted to the next thread for decoding.
*/
AVPacket *next_pkt;
int next_decoding; ///< The next context to submit a packet to. int next_finished; ///< The next context to return output from.
/* hwaccel state for thread-unsafe hwaccels is temporarily stored here in * order to transfer its ownership to the next decoding thread without the
* need for extra synchronization */ const AVHWAccel *stash_hwaccel; void *stash_hwaccel_context; void *stash_hwaccel_priv;
} FrameThreadContext;
staticvoid decoded_frames_flush(DecodedFrames *df)
{ for (size_t i = 0; i < df->nb_f; i++)
av_frame_unref(df->f[i]);
df->nb_f = 0;
}
staticvoid decoded_frames_free(DecodedFrames *df)
{ for (size_t i = 0; i < df->nb_f_allocated; i++)
av_frame_free(&df->f[i]);
av_freep(&df->f);
df->nb_f = 0;
df->nb_f_allocated = 0;
}
/** * Codec worker thread. * * Automatically calls ff_thread_finish_setup() if the codec does * not provide an update_thread_context method, or if the codec returns * before calling it.
*/ static attribute_align_arg void *frame_worker_thread(void *arg)
{
PerThreadContext *p = arg;
AVCodecContext *avctx = p->avctx; const FFCodec *codec = ffcodec(avctx->codec);
thread_set_name(p);
pthread_mutex_lock(&p->mutex); while (1) { int ret;
while (atomic_load(&p->state) == STATE_INPUT_READY && !p->die)
pthread_cond_wait(&p->input_cond, &p->mutex);
if (p->die) break;
if (!codec->update_thread_context)
ff_thread_finish_setup(avctx);
/* If a decoder supports hwaccel, then it must call ff_get_format(). * Since that call must happen before ff_thread_finish_setup(), the * decoder is required to implement update_thread_context() and call * ff_thread_finish_setup() manually. Therefore the above * ff_thread_finish_setup() call did not happen and hwaccel_serializing
* cannot be true here. */
av_assert0(!p->hwaccel_serializing);
/* if the previous thread uses thread-unsafe hwaccel then we take the
* lock to ensure the threads don't run concurrently */ if (hwaccel_serial(avctx)) {
pthread_mutex_lock(&p->parent->hwaccel_mutex);
p->hwaccel_serializing = 1;
}
ret = 0; while (ret >= 0) {
AVFrame *frame;
/* get the frame which will store the output */
frame = decoded_frames_get_free(&p->df); if (!frame) {
p->result = AVERROR(ENOMEM); goto alloc_fail;
}
/* do the actual decoding */
ret = ff_decode_receive_frame_internal(avctx, frame); if (ret == 0)
p->df.nb_f++; elseif (ret < 0 && frame->buf[0])
av_frame_unref(frame);
p->result = (ret == AVERROR(EAGAIN)) ? 0 : ret;
}
if (atomic_load(&p->state) == STATE_SETTING_UP)
ff_thread_finish_setup(avctx);
alloc_fail: if (p->hwaccel_serializing) { /* wipe hwaccel state for thread-unsafe hwaccels to avoid stale * pointers lying around; * the state was transferred to FrameThreadContext in
* ff_thread_finish_setup(), so nothing is leaked */
avctx->hwaccel = NULL;
avctx->hwaccel_context = NULL;
avctx->internal->hwaccel_priv_data = NULL;
/** * Update the next thread's AVCodecContext with values from the reference thread's context. * * @param dst The destination context. * @param src The source context. * @param for_user 0 if the destination is a codec thread, 1 if the destination is the user's thread * @return 0 on success, negative error code on failure
*/ staticint update_context_from_thread(AVCodecContext *dst, const AVCodecContext *src, int for_user)
{ const FFCodec *const codec = ffcodec(dst->codec); int err = 0;
if (codec->update_thread_context) {
err = codec->update_thread_context(dst, src); if (err < 0) return err;
}
// reset dst hwaccel state if needed
av_assert0(p_dst->hwaccel_threadsafe ||
(!dst->hwaccel && !dst->internal->hwaccel_priv_data)); if (p_dst->hwaccel_threadsafe &&
(!p_src->hwaccel_threadsafe || dst->hwaccel != src->hwaccel)) {
ff_hwaccel_uninit(dst);
p_dst->hwaccel_threadsafe = 0;
}
// propagate hwaccel state for threadsafe hwaccels if (p_src->hwaccel_threadsafe) { const FFHWAccel *hwaccel = ffhwaccel(src->hwaccel); if (!dst->hwaccel) { if (hwaccel->priv_data_size) {
av_assert0(hwaccel->update_thread_context);
/** * Update the next thread's AVCodecContext with values set by the user. * * @param dst The destination context. * @param src The source context. * @return 0 on success, negative error code on failure
*/ staticint update_context_from_user(AVCodecContext *dst, const AVCodecContext *src)
{ int err;
if (AVPACKET_IS_EMPTY(p->avpkt))
p->avctx->internal->draining = 1;
ret = update_context_from_user(p->avctx, user_avctx); if (ret) {
pthread_mutex_unlock(&p->mutex); return ret;
}
atomic_store_explicit(&p->debug_threads,
(p->avctx->debug & FF_DEBUG_THREADS) != 0,
memory_order_relaxed);
if (prev_thread) { if (atomic_load(&prev_thread->state) == STATE_SETTING_UP) {
pthread_mutex_lock(&prev_thread->progress_mutex); while (atomic_load(&prev_thread->state) == STATE_SETTING_UP)
pthread_cond_wait(&prev_thread->progress_cond, &prev_thread->progress_mutex);
pthread_mutex_unlock(&prev_thread->progress_mutex);
}
/* codecs without delay might not be prepared to be called repeatedly here during * flushing (vp3/theora), and also don't need to be, since from this point on, they
* will always return EOF anyway */ if (!p->avctx->internal->draining ||
(codec->capabilities & AV_CODEC_CAP_DELAY)) {
ret = update_context_from_thread(p->avctx, prev_thread->avctx, 0); if (ret) {
pthread_mutex_unlock(&p->mutex); return ret;
}
}
}
/* transfer the stashed hwaccel state, if any */
av_assert0(!p->avctx->hwaccel || p->hwaccel_threadsafe); if (!p->hwaccel_threadsafe) {
FFSWAP(const AVHWAccel*, p->avctx->hwaccel, fctx->stash_hwaccel);
FFSWAP(void*, p->avctx->hwaccel_context, fctx->stash_hwaccel_context);
FFSWAP(void*, p->avctx->internal->hwaccel_priv_data, fctx->stash_hwaccel_priv);
}
int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame)
{
FrameThreadContext *fctx = avctx->internal->thread_ctx; int ret = 0;
/* release the async lock, permitting blocked hwaccel threads to
* go forward while we are in this function */
async_unlock(fctx);
/* submit packets to threads while there are no buffered results to return */ while (!fctx->df.nb_f && !fctx->result) {
PerThreadContext *p;
/* get a packet to be submitted to the next thread */
av_packet_unref(fctx->next_pkt);
ret = ff_decode_get_packet(avctx, fctx->next_pkt); if (ret < 0 && ret != AVERROR_EOF) goto finish;
ret = submit_packet(&fctx->threads[fctx->next_decoding], avctx,
fctx->next_pkt); if (ret < 0) goto finish;
/* do not return any frames until all threads have something to do */ if (fctx->next_decoding != fctx->next_finished &&
!avctx->internal->draining) continue;
p = &fctx->threads[fctx->next_finished];
fctx->next_finished = (fctx->next_finished + 1) % avctx->thread_count;
if (atomic_load(&p->state) != STATE_INPUT_READY) {
pthread_mutex_lock(&p->progress_mutex); while (atomic_load_explicit(&p->state, memory_order_relaxed) != STATE_INPUT_READY)
pthread_cond_wait(&p->output_cond, &p->progress_mutex);
pthread_mutex_unlock(&p->progress_mutex);
}
/* a thread may return multiple frames AND an error
* we first return all the frames, then the error */ if (fctx->df.nb_f) {
decoded_frames_pop(&fctx->df, frame);
ret = 0;
} else {
ret = fctx->result;
fctx->result = 0;
}
finish:
async_lock(fctx); return ret;
}
void ff_thread_report_progress(ThreadFrame *f, int n, int field)
{
PerThreadContext *p;
atomic_int *progress = f->progress ? f->progress->progress : NULL;
if (!progress ||
atomic_load_explicit(&progress[field], memory_order_relaxed) >= n) return;
p = f->owner[field]->internal->thread_ctx;
if (atomic_load_explicit(&p->debug_threads, memory_order_relaxed))
av_log(f->owner[field], AV_LOG_DEBUG, "%p finished %d field %d\n", progress, n, field);
pthread_mutex_lock(&p->progress_mutex);
atomic_store_explicit(&progress[field], n, memory_order_release);
void ff_thread_await_progress(const ThreadFrame *f, int n, int field)
{
PerThreadContext *p;
atomic_int *progress = f->progress ? f->progress->progress : NULL;
if (!progress ||
atomic_load_explicit(&progress[field], memory_order_acquire) >= n) return;
p = f->owner[field]->internal->thread_ctx;
if (atomic_load_explicit(&p->debug_threads, memory_order_relaxed))
av_log(f->owner[field], AV_LOG_DEBUG, "thread awaiting %d field %d from %p\n", n, field, progress);
pthread_mutex_lock(&p->progress_mutex); while (atomic_load_explicit(&progress[field], memory_order_relaxed) < n)
pthread_cond_wait(&p->progress_cond, &p->progress_mutex);
pthread_mutex_unlock(&p->progress_mutex);
}
if (hwaccel_serial(avctx) && !p->hwaccel_serializing) {
pthread_mutex_lock(&p->parent->hwaccel_mutex);
p->hwaccel_serializing = 1;
}
/* this assumes that no hwaccel calls happen before ff_thread_finish_setup() */ if (avctx->hwaccel &&
!(ffhwaccel(avctx->hwaccel)->caps_internal & HWACCEL_CAP_ASYNC_SAFE)) {
p->async_serializing = 1;
async_lock(p->parent);
}
/* thread-unsafe hwaccels share a single private data instance, so we * save hwaccel state for passing to the next thread; * this is done here so that this worker thread can wipe its own hwaccel
* state after decoding, without requiring synchronization */
av_assert0(!p->parent->stash_hwaccel); if (hwaccel_serial(avctx)) {
p->parent->stash_hwaccel = avctx->hwaccel;
p->parent->stash_hwaccel_context = avctx->hwaccel_context;
p->parent->stash_hwaccel_priv = avctx->internal->hwaccel_priv_data;
}
/* if we have stashed hwaccel state, move it to the user-facing context,
* so it will be freed in ff_codec_close() */
av_assert0(!avctx->hwaccel);
FFSWAP(const AVHWAccel*, avctx->hwaccel, fctx->stash_hwaccel);
FFSWAP(void*, avctx->hwaccel_context, fctx->stash_hwaccel_context);
FFSWAP(void*, avctx->internal->hwaccel_priv_data, fctx->stash_hwaccel_priv);
av_freep(&avctx->internal->thread_ctx);
}
static av_cold int init_thread(PerThreadContext *p, int *threads_to_free,
FrameThreadContext *fctx, AVCodecContext *avctx, const FFCodec *codec, int first)
{
AVCodecContext *copy; int err;
p->initial_pict_type = AV_PICTURE_TYPE_NONE; if (avctx->codec_descriptor->props & AV_CODEC_PROP_INTRA_ONLY) {
p->intra_only_flag = AV_FRAME_FLAG_KEY; if (avctx->codec_type == AVMEDIA_TYPE_VIDEO)
p->initial_pict_type = AV_PICTURE_TYPE_I;
}
int ff_frame_thread_init(AVCodecContext *avctx)
{ int thread_count = avctx->thread_count; const FFCodec *codec = ffcodec(avctx->codec);
FrameThreadContext *fctx; int err, i = 0;
if (!thread_count) { int nb_cpus = av_cpu_count(); // use number of cores + 1 as thread count if there is more than one if (nb_cpus > 1)
thread_count = avctx->thread_count = FFMIN(nb_cpus + 1, MAX_AUTO_THREADS); else
thread_count = avctx->thread_count = 1;
}
for (i = 0; i < avctx->thread_count; i++) {
PerThreadContext *p = &fctx->threads[i];
decoded_frames_flush(&p->df);
p->result = 0;
avcodec_flush_buffers(p->avctx);
}
}
int ff_thread_can_start_frame(AVCodecContext *avctx)
{ if ((avctx->active_thread_type & FF_THREAD_FRAME) &&
ffcodec(avctx->codec)->update_thread_context) {
PerThreadContext *p = avctx->internal->thread_ctx;
if (atomic_load(&p->state) != STATE_SETTING_UP) return 0;
}
return 1;
}
staticint thread_get_buffer_internal(AVCodecContext *avctx, AVFrame *f, int flags)
{
PerThreadContext *p; int err;
if (!(avctx->active_thread_type & FF_THREAD_FRAME)) return ff_get_buffer(avctx, f, flags);
p = avctx->internal->thread_ctx; if (atomic_load(&p->state) != STATE_SETTING_UP &&
ffcodec(avctx->codec)->update_thread_context) {
av_log(avctx, AV_LOG_ERROR, "get_buffer() cannot be called after ff_thread_finish_setup()\n"); return -1;
}
int ff_thread_get_buffer(AVCodecContext *avctx, AVFrame *f, int flags)
{ int ret = thread_get_buffer_internal(avctx, f, flags); if (ret < 0)
av_log(avctx, AV_LOG_ERROR, "thread_get_buffer() failed\n"); return ret;
}
int ff_thread_get_ext_buffer(AVCodecContext *avctx, ThreadFrame *f, int flags)
{ int ret;
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.