ref: 4c9316f4faacea175a7e6f0bc607dc3fea21848f
parent: 15ea853ccd534ef06908198529429ae893f3ca06
parent: c1b024b48861dcd788a3419e556eb31bb727d5d1
author: Harish Mahendrakar <harish.mahendrakar@ittiam.com>
date: Wed Jan 30 13:21:42 EST 2019
Merge changes I49a760ea,I792df86e * changes: Modify map read/write to sync logic in row_mt case Revert "Revert "Add Tile-SB-Row based Multi-threading in Decoder""
--- a/vp9/common/vp9_enums.h
+++ b/vp9/common/vp9_enums.h
@@ -41,6 +41,8 @@
MAX_PROFILES
} BITSTREAM_PROFILE;
+typedef enum PARSE_RECON_FLAG { PARSE = 1, RECON = 2 } PARSE_RECON_FLAG;
+
#define BLOCK_4X4 0
#define BLOCK_4X8 1
#define BLOCK_8X4 2
--- a/vp9/common/vp9_thread_common.c
+++ b/vp9/common/vp9_thread_common.c
@@ -475,6 +475,12 @@
#endif // CONFIG_MULTITHREAD
}
+void vp9_loopfilter_job(LFWorkerData *lf_data, VP9LfSync *lf_sync) {
+ thread_loop_filter_rows(lf_data->frame_buffer, lf_data->cm, lf_data->planes,
+ lf_data->start, lf_data->stop, lf_data->y_only,
+ lf_sync);
+}
+
// Accumulate frame counts.
void vp9_accumulate_frame_counts(FRAME_COUNTS *accum,
const FRAME_COUNTS *counts, int is_dec) {
--- a/vp9/common/vp9_thread_common.h
+++ b/vp9/common/vp9_thread_common.h
@@ -70,7 +70,7 @@
void vp9_set_row(VP9LfSync *lf_sync, int num_tiles, int row, int is_last_row,
int corrupted);
-void vp9_set_last_decoded_row(struct VP9Common *cm, int tile_col, int mi_row);
+void vp9_loopfilter_job(LFWorkerData *lf_data, VP9LfSync *lf_sync);
void vp9_accumulate_frame_counts(struct FRAME_COUNTS *accum,
const struct FRAME_COUNTS *counts, int is_dec);
--- a/vp9/decoder/vp9_decodeframe.c
+++ b/vp9/decoder/vp9_decodeframe.c
@@ -42,6 +42,7 @@
#include "vp9/decoder/vp9_decodemv.h"
#include "vp9/decoder/vp9_decoder.h"
#include "vp9/decoder/vp9_dsubexp.h"
+#include "vp9/decoder/vp9_job_queue.h"
#define MAX_VP9_HEADER_SIZE 80
@@ -1027,7 +1028,6 @@
static void parse_block(TileWorkerData *twd, VP9Decoder *const pbi, int mi_row,
int mi_col, BLOCK_SIZE bsize, int bwl, int bhl) {
VP9_COMMON *const cm = &pbi->common;
- const int less8x8 = bsize < BLOCK_8X8;
const int bw = 1 << (bwl - 1);
const int bh = 1 << (bhl - 1);
const int x_mis = VPXMIN(bw, cm->mi_cols - mi_col);
@@ -1059,7 +1059,7 @@
const int eobtotal =
predict_recon_inter(xd, mi, twd, parse_inter_block_row_mt);
- if (!less8x8 && eobtotal == 0) mi->skip = 1; // skip loopfilter
+ if (bsize >= BLOCK_8X8 && eobtotal == 0) mi->skip = 1; // skip loopfilter
}
}
@@ -1172,9 +1172,10 @@
dec_update_partition_context(twd, mi_row, mi_col, subsize, num_8x8_wh);
}
-static void recon_partition(TileWorkerData *twd, VP9Decoder *const pbi,
- int mi_row, int mi_col, BLOCK_SIZE bsize,
- int n4x4_l2) {
+static void process_partition(TileWorkerData *twd, VP9Decoder *const pbi,
+ int mi_row, int mi_col, BLOCK_SIZE bsize,
+ int n4x4_l2, int parse_recon_flag,
+ process_block_fn_t process_block) {
VP9_COMMON *const cm = &pbi->common;
const int n8x8_l2 = n4x4_l2 - 1;
const int num_8x8_wh = 1 << n8x8_l2;
@@ -1187,61 +1188,11 @@
if (mi_row >= cm->mi_rows || mi_col >= cm->mi_cols) return;
- partition = *xd->partition;
- xd->partition++;
-
- subsize = get_subsize(bsize, partition);
- if (!hbs) {
- // calculate bmode block dimensions (log 2)
- xd->bmode_blocks_wl = 1 >> !!(partition & PARTITION_VERT);
- xd->bmode_blocks_hl = 1 >> !!(partition & PARTITION_HORZ);
- recon_block(twd, pbi, mi_row, mi_col, subsize, 1, 1);
- } else {
- switch (partition) {
- case PARTITION_NONE:
- recon_block(twd, pbi, mi_row, mi_col, subsize, n4x4_l2, n4x4_l2);
- break;
- case PARTITION_HORZ:
- recon_block(twd, pbi, mi_row, mi_col, subsize, n4x4_l2, n8x8_l2);
- if (has_rows)
- recon_block(twd, pbi, mi_row + hbs, mi_col, subsize, n4x4_l2,
- n8x8_l2);
- break;
- case PARTITION_VERT:
- recon_block(twd, pbi, mi_row, mi_col, subsize, n8x8_l2, n4x4_l2);
- if (has_cols)
- recon_block(twd, pbi, mi_row, mi_col + hbs, subsize, n8x8_l2,
- n4x4_l2);
- break;
- case PARTITION_SPLIT:
- recon_partition(twd, pbi, mi_row, mi_col, subsize, n8x8_l2);
- recon_partition(twd, pbi, mi_row, mi_col + hbs, subsize, n8x8_l2);
- recon_partition(twd, pbi, mi_row + hbs, mi_col, subsize, n8x8_l2);
- recon_partition(twd, pbi, mi_row + hbs, mi_col + hbs, subsize, n8x8_l2);
- break;
- default: assert(0 && "Invalid partition type");
- }
+ if (parse_recon_flag & PARSE) {
+ *xd->partition =
+ read_partition(twd, mi_row, mi_col, has_rows, has_cols, n8x8_l2);
}
-}
-static void parse_partition(TileWorkerData *twd, VP9Decoder *const pbi,
- int mi_row, int mi_col, BLOCK_SIZE bsize,
- int n4x4_l2) {
- VP9_COMMON *const cm = &pbi->common;
- const int n8x8_l2 = n4x4_l2 - 1;
- const int num_8x8_wh = 1 << n8x8_l2;
- const int hbs = num_8x8_wh >> 1;
- PARTITION_TYPE partition;
- BLOCK_SIZE subsize;
- const int has_rows = (mi_row + hbs) < cm->mi_rows;
- const int has_cols = (mi_col + hbs) < cm->mi_cols;
- MACROBLOCKD *const xd = &twd->xd;
-
- if (mi_row >= cm->mi_rows || mi_col >= cm->mi_cols) return;
-
- *xd->partition =
- read_partition(twd, mi_row, mi_col, has_rows, has_cols, n8x8_l2);
-
partition = *xd->partition;
xd->partition++;
@@ -1250,38 +1201,44 @@
// calculate bmode block dimensions (log 2)
xd->bmode_blocks_wl = 1 >> !!(partition & PARTITION_VERT);
xd->bmode_blocks_hl = 1 >> !!(partition & PARTITION_HORZ);
- parse_block(twd, pbi, mi_row, mi_col, subsize, 1, 1);
+ process_block(twd, pbi, mi_row, mi_col, subsize, 1, 1);
} else {
switch (partition) {
case PARTITION_NONE:
- parse_block(twd, pbi, mi_row, mi_col, subsize, n4x4_l2, n4x4_l2);
+ process_block(twd, pbi, mi_row, mi_col, subsize, n4x4_l2, n4x4_l2);
break;
case PARTITION_HORZ:
- parse_block(twd, pbi, mi_row, mi_col, subsize, n4x4_l2, n8x8_l2);
+ process_block(twd, pbi, mi_row, mi_col, subsize, n4x4_l2, n8x8_l2);
if (has_rows)
- parse_block(twd, pbi, mi_row + hbs, mi_col, subsize, n4x4_l2,
- n8x8_l2);
+ process_block(twd, pbi, mi_row + hbs, mi_col, subsize, n4x4_l2,
+ n8x8_l2);
break;
case PARTITION_VERT:
- parse_block(twd, pbi, mi_row, mi_col, subsize, n8x8_l2, n4x4_l2);
+ process_block(twd, pbi, mi_row, mi_col, subsize, n8x8_l2, n4x4_l2);
if (has_cols)
- parse_block(twd, pbi, mi_row, mi_col + hbs, subsize, n8x8_l2,
- n4x4_l2);
+ process_block(twd, pbi, mi_row, mi_col + hbs, subsize, n8x8_l2,
+ n4x4_l2);
break;
case PARTITION_SPLIT:
- parse_partition(twd, pbi, mi_row, mi_col, subsize, n8x8_l2);
- parse_partition(twd, pbi, mi_row, mi_col + hbs, subsize, n8x8_l2);
- parse_partition(twd, pbi, mi_row + hbs, mi_col, subsize, n8x8_l2);
- parse_partition(twd, pbi, mi_row + hbs, mi_col + hbs, subsize, n8x8_l2);
+ process_partition(twd, pbi, mi_row, mi_col, subsize, n8x8_l2,
+ parse_recon_flag, process_block);
+ process_partition(twd, pbi, mi_row, mi_col + hbs, subsize, n8x8_l2,
+ parse_recon_flag, process_block);
+ process_partition(twd, pbi, mi_row + hbs, mi_col, subsize, n8x8_l2,
+ parse_recon_flag, process_block);
+ process_partition(twd, pbi, mi_row + hbs, mi_col + hbs, subsize,
+ n8x8_l2, parse_recon_flag, process_block);
break;
default: assert(0 && "Invalid partition type");
}
}
- // update partition context
- if (bsize >= BLOCK_8X8 &&
- (bsize == BLOCK_8X8 || partition != PARTITION_SPLIT))
- dec_update_partition_context(twd, mi_row, mi_col, subsize, num_8x8_wh);
+ if (parse_recon_flag & PARSE) {
+ // update partition context
+ if ((bsize == BLOCK_8X8 || partition != PARTITION_SPLIT) &&
+ bsize >= BLOCK_8X8)
+ dec_update_partition_context(twd, mi_row, mi_col, subsize, num_8x8_wh);
+ }
}
static void setup_token_decoder(const uint8_t *data, const uint8_t *data_end,
@@ -1688,6 +1645,317 @@
}
}
+static void map_write(RowMTWorkerData *const row_mt_worker_data, int map_idx,
+ int sync_idx) {
+#if CONFIG_MULTITHREAD
+ pthread_mutex_lock(&row_mt_worker_data->recon_sync_mutex[sync_idx]);
+ row_mt_worker_data->recon_map[map_idx] = 1;
+ pthread_cond_signal(&row_mt_worker_data->recon_sync_cond[sync_idx]);
+ pthread_mutex_unlock(&row_mt_worker_data->recon_sync_mutex[sync_idx]);
+#else
+ (void)row_mt_worker_data;
+ (void)map_idx;
+ (void)sync_idx;
+#endif // CONFIG_MULTITHREAD
+}
+
+static void map_read(RowMTWorkerData *const row_mt_worker_data, int map_idx,
+ int sync_idx) {
+#if CONFIG_MULTITHREAD
+ volatile int8_t *map = row_mt_worker_data->recon_map + map_idx;
+ pthread_mutex_t *const mutex =
+ &row_mt_worker_data->recon_sync_mutex[sync_idx];
+ pthread_mutex_lock(mutex);
+ while (!(*map)) {
+ pthread_cond_wait(&row_mt_worker_data->recon_sync_cond[sync_idx], mutex);
+ }
+ pthread_mutex_unlock(mutex);
+#else
+ (void)row_mt_worker_data;
+ (void)map_idx;
+ (void)sync_idx;
+#endif // CONFIG_MULTITHREAD
+}
+
+static int lpf_map_write_check(VP9LfSync *lf_sync, int row, int num_tile_cols) {
+ int return_val = 0;
+#if CONFIG_MULTITHREAD
+ int corrupted;
+ pthread_mutex_lock(&lf_sync->lf_mutex);
+ corrupted = lf_sync->corrupted;
+ pthread_mutex_unlock(&lf_sync->lf_mutex);
+ if (!corrupted) {
+ pthread_mutex_lock(&lf_sync->recon_done_mutex[row]);
+ lf_sync->num_tiles_done[row] += 1;
+ if (num_tile_cols == lf_sync->num_tiles_done[row]) return_val = 1;
+ pthread_mutex_unlock(&lf_sync->recon_done_mutex[row]);
+ }
+#else
+ (void)lf_sync;
+ (void)row;
+ (void)num_tile_cols;
+#endif
+ return return_val;
+}
+
+static void vp9_tile_done(VP9Decoder *pbi) {
+#if CONFIG_MULTITHREAD
+ int terminate;
+ RowMTWorkerData *const row_mt_worker_data = pbi->row_mt_worker_data;
+ const int all_parse_done = 1 << pbi->common.log2_tile_cols;
+ pthread_mutex_lock(&row_mt_worker_data->recon_done_mutex);
+ row_mt_worker_data->num_tiles_done++;
+ terminate = all_parse_done == row_mt_worker_data->num_tiles_done;
+ pthread_mutex_unlock(&row_mt_worker_data->recon_done_mutex);
+ if (terminate) {
+ vp9_jobq_terminate(&row_mt_worker_data->jobq);
+ }
+#else
+ (void)pbi;
+#endif
+}
+
+static void vp9_jobq_alloc(VP9Decoder *pbi) {
+ VP9_COMMON *const cm = &pbi->common;
+ RowMTWorkerData *const row_mt_worker_data = pbi->row_mt_worker_data;
+ const int aligned_rows = mi_cols_aligned_to_sb(cm->mi_rows);
+ const int sb_rows = aligned_rows >> MI_BLOCK_SIZE_LOG2;
+ const int tile_cols = 1 << cm->log2_tile_cols;
+ const size_t jobq_size = (tile_cols * sb_rows * 2 + sb_rows) * sizeof(Job);
+
+ if (jobq_size > row_mt_worker_data->jobq_size) {
+ vpx_free(row_mt_worker_data->jobq_buf);
+ CHECK_MEM_ERROR(cm, row_mt_worker_data->jobq_buf, vpx_calloc(1, jobq_size));
+ vp9_jobq_init(&row_mt_worker_data->jobq, row_mt_worker_data->jobq_buf,
+ jobq_size);
+ row_mt_worker_data->jobq_size = jobq_size;
+ }
+}
+
+static void recon_tile_row(TileWorkerData *tile_data, VP9Decoder *pbi,
+ int mi_row, int is_last_row, VP9LfSync *lf_sync,
+ int cur_tile_col) {
+ VP9_COMMON *const cm = &pbi->common;
+ RowMTWorkerData *const row_mt_worker_data = pbi->row_mt_worker_data;
+ const int tile_cols = 1 << cm->log2_tile_cols;
+ const int aligned_cols = mi_cols_aligned_to_sb(cm->mi_cols);
+ const int sb_cols = aligned_cols >> MI_BLOCK_SIZE_LOG2;
+ const int cur_sb_row = mi_row >> MI_BLOCK_SIZE_LOG2;
+ int mi_col_start = tile_data->xd.tile.mi_col_start;
+ int mi_col_end = tile_data->xd.tile.mi_col_end;
+ int mi_col;
+
+ vp9_zero(tile_data->xd.left_context);
+ vp9_zero(tile_data->xd.left_seg_context);
+ for (mi_col = mi_col_start; mi_col < mi_col_end; mi_col += MI_BLOCK_SIZE) {
+ const int c = mi_col >> MI_BLOCK_SIZE_LOG2;
+ int plane;
+ const int sb_num = (cur_sb_row * (aligned_cols >> MI_BLOCK_SIZE_LOG2) + c);
+
+ // Top Dependency
+ if (cur_sb_row) {
+ map_read(row_mt_worker_data, ((cur_sb_row - 1) * sb_cols) + c,
+ ((cur_sb_row - 1) * tile_cols) + cur_tile_col);
+ }
+
+ for (plane = 0; plane < MAX_MB_PLANE; ++plane) {
+ tile_data->xd.plane[plane].eob =
+ row_mt_worker_data->eob[plane] + (sb_num << EOBS_PER_SB_LOG2);
+ tile_data->xd.plane[plane].dqcoeff =
+ row_mt_worker_data->dqcoeff[plane] + (sb_num << DQCOEFFS_PER_SB_LOG2);
+ }
+ tile_data->xd.partition =
+ row_mt_worker_data->partition + (sb_num * PARTITIONS_PER_SB);
+ process_partition(tile_data, pbi, mi_row, mi_col, BLOCK_64X64, 4, RECON,
+ recon_block);
+ if (cm->lf.filter_level && !cm->skip_loop_filter) {
+ // Queue LPF_JOB
+ int is_lpf_job_ready = 0;
+
+ if (mi_col + MI_BLOCK_SIZE >= mi_col_end) {
+ // Checks if this row has been decoded in all tiles
+ is_lpf_job_ready = lpf_map_write_check(lf_sync, cur_sb_row, tile_cols);
+
+ if (is_lpf_job_ready) {
+ Job lpf_job;
+ lpf_job.job_type = LPF_JOB;
+ if (cur_sb_row > 0) {
+ lpf_job.row_num = mi_row - MI_BLOCK_SIZE;
+ vp9_jobq_queue(&row_mt_worker_data->jobq, &lpf_job,
+ sizeof(lpf_job));
+ }
+ if (is_last_row) {
+ lpf_job.row_num = mi_row;
+ vp9_jobq_queue(&row_mt_worker_data->jobq, &lpf_job,
+ sizeof(lpf_job));
+ }
+ }
+ }
+ }
+ map_write(row_mt_worker_data, (cur_sb_row * sb_cols) + c,
+ (cur_sb_row * tile_cols) + cur_tile_col);
+ }
+}
+
+static void parse_tile_row(TileWorkerData *tile_data, VP9Decoder *pbi,
+ int mi_row, int cur_tile_col, uint8_t **data_end) {
+ int mi_col;
+ VP9_COMMON *const cm = &pbi->common;
+ RowMTWorkerData *const row_mt_worker_data = pbi->row_mt_worker_data;
+ TileInfo *tile = &tile_data->xd.tile;
+ TileBuffer *const buf = &pbi->tile_buffers[cur_tile_col];
+ const int aligned_cols = mi_cols_aligned_to_sb(cm->mi_cols);
+
+ vp9_zero(tile_data->dqcoeff);
+ vp9_tile_init(tile, cm, 0, cur_tile_col);
+
+ /* Update reader only at the beginning of each row in a tile */
+ if (mi_row == 0) {
+ setup_token_decoder(buf->data, *data_end, buf->size, &tile_data->error_info,
+ &tile_data->bit_reader, pbi->decrypt_cb,
+ pbi->decrypt_state);
+ }
+ vp9_init_macroblockd(cm, &tile_data->xd, tile_data->dqcoeff);
+ tile_data->xd.error_info = &tile_data->error_info;
+
+ vp9_zero(tile_data->xd.left_context);
+ vp9_zero(tile_data->xd.left_seg_context);
+ for (mi_col = tile->mi_col_start; mi_col < tile->mi_col_end;
+ mi_col += MI_BLOCK_SIZE) {
+ const int r = mi_row >> MI_BLOCK_SIZE_LOG2;
+ const int c = mi_col >> MI_BLOCK_SIZE_LOG2;
+ int plane;
+ const int sb_num = (r * (aligned_cols >> MI_BLOCK_SIZE_LOG2) + c);
+ for (plane = 0; plane < MAX_MB_PLANE; ++plane) {
+ tile_data->xd.plane[plane].eob =
+ row_mt_worker_data->eob[plane] + (sb_num << EOBS_PER_SB_LOG2);
+ tile_data->xd.plane[plane].dqcoeff =
+ row_mt_worker_data->dqcoeff[plane] + (sb_num << DQCOEFFS_PER_SB_LOG2);
+ }
+ tile_data->xd.partition =
+ row_mt_worker_data->partition + sb_num * PARTITIONS_PER_SB;
+ process_partition(tile_data, pbi, mi_row, mi_col, BLOCK_64X64, 4, PARSE,
+ parse_block);
+ }
+}
+
+static int row_decode_worker_hook(ThreadData *const thread_data,
+ uint8_t **data_end) {
+ VP9Decoder *const pbi = thread_data->pbi;
+ VP9_COMMON *const cm = &pbi->common;
+ RowMTWorkerData *const row_mt_worker_data = pbi->row_mt_worker_data;
+ const int aligned_cols = mi_cols_aligned_to_sb(cm->mi_cols);
+ const int aligned_rows = mi_cols_aligned_to_sb(cm->mi_rows);
+ const int sb_rows = aligned_rows >> MI_BLOCK_SIZE_LOG2;
+ const int tile_cols = 1 << cm->log2_tile_cols;
+ Job job;
+ LFWorkerData *lf_data = thread_data->lf_data;
+ VP9LfSync *lf_sync = thread_data->lf_sync;
+ volatile int corrupted = 0;
+
+ while (!vp9_jobq_dequeue(&row_mt_worker_data->jobq, &job, sizeof(job), 1)) {
+ int mi_col;
+ const int mi_row = job.row_num;
+
+ if (job.job_type == LPF_JOB) {
+ lf_data->start = mi_row;
+ lf_data->stop = lf_data->start + MI_BLOCK_SIZE;
+
+ if (cm->lf.filter_level && !cm->skip_loop_filter &&
+ mi_row < cm->mi_rows) {
+ vp9_loopfilter_job(lf_data, lf_sync);
+ }
+ } else if (job.job_type == RECON_JOB) {
+ const int cur_sb_row = mi_row >> MI_BLOCK_SIZE_LOG2;
+ const int is_last_row = sb_rows - 1 == cur_sb_row;
+ TileWorkerData twd_recon;
+ TileWorkerData *const tile_data_recon = &twd_recon;
+ int mi_col_start, mi_col_end;
+
+ tile_data_recon->xd = pbi->mb;
+ vp9_tile_init(&tile_data_recon->xd.tile, cm, 0, job.tile_col);
+ vp9_init_macroblockd(cm, &tile_data_recon->xd, tile_data_recon->dqcoeff);
+ mi_col_start = tile_data_recon->xd.tile.mi_col_start;
+ mi_col_end = tile_data_recon->xd.tile.mi_col_end;
+
+ if (setjmp(tile_data_recon->error_info.jmp)) {
+ const int sb_cols = aligned_cols >> MI_BLOCK_SIZE_LOG2;
+ tile_data_recon->error_info.setjmp = 0;
+ corrupted = 1;
+ for (mi_col = mi_col_start; mi_col < mi_col_end;
+ mi_col += MI_BLOCK_SIZE) {
+ const int c = mi_col >> MI_BLOCK_SIZE_LOG2;
+ map_write(row_mt_worker_data, (cur_sb_row * sb_cols) + c,
+ (cur_sb_row * tile_cols) + job.tile_col);
+ }
+ if (is_last_row) {
+ vp9_tile_done(pbi);
+ }
+ continue;
+ }
+
+ tile_data_recon->error_info.setjmp = 1;
+ tile_data_recon->xd.error_info = &tile_data_recon->error_info;
+
+ recon_tile_row(tile_data_recon, pbi, mi_row, is_last_row, lf_sync,
+ job.tile_col);
+
+ if (corrupted)
+ vpx_internal_error(&tile_data_recon->error_info,
+ VPX_CODEC_CORRUPT_FRAME,
+ "Failed to decode tile data");
+
+ if (is_last_row) {
+ vp9_tile_done(pbi);
+ }
+ } else if (job.job_type == PARSE_JOB) {
+ TileWorkerData *const tile_data = &pbi->tile_worker_data[job.tile_col];
+
+ if (setjmp(tile_data->error_info.jmp)) {
+ tile_data->error_info.setjmp = 0;
+ corrupted = 1;
+ vp9_tile_done(pbi);
+ continue;
+ }
+
+ tile_data->xd = pbi->mb;
+ tile_data->xd.counts =
+ cm->frame_parallel_decoding_mode ? 0 : &tile_data->counts;
+
+ tile_data->error_info.setjmp = 1;
+
+ parse_tile_row(tile_data, pbi, mi_row, job.tile_col, data_end);
+
+ corrupted |= tile_data->xd.corrupted;
+ if (corrupted)
+ vpx_internal_error(&tile_data->error_info, VPX_CODEC_CORRUPT_FRAME,
+ "Failed to decode tile data");
+
+ /* Queue in the recon_job for this row */
+ {
+ Job recon_job;
+ recon_job.row_num = mi_row;
+ recon_job.tile_col = job.tile_col;
+ recon_job.job_type = RECON_JOB;
+ vp9_jobq_queue(&row_mt_worker_data->jobq, &recon_job,
+ sizeof(recon_job));
+ }
+
+ /* Queue next parse job */
+ if (mi_row + MI_BLOCK_SIZE < cm->mi_rows) {
+ Job parse_job;
+ parse_job.row_num = mi_row + MI_BLOCK_SIZE;
+ parse_job.tile_col = job.tile_col;
+ parse_job.job_type = PARSE_JOB;
+ vp9_jobq_queue(&row_mt_worker_data->jobq, &parse_job,
+ sizeof(parse_job));
+ }
+ }
+ }
+
+ return !corrupted;
+}
+
static const uint8_t *decode_tiles(VP9Decoder *pbi, const uint8_t *data,
const uint8_t *data_end) {
VP9_COMMON *const cm = &pbi->common;
@@ -1775,7 +2043,8 @@
row_mt_worker_data->dqcoeff[plane];
}
tile_data->xd.partition = row_mt_worker_data->partition;
- parse_partition(tile_data, pbi, mi_row, mi_col, BLOCK_64X64, 4);
+ process_partition(tile_data, pbi, mi_row, mi_col, BLOCK_64X64, 4,
+ PARSE, parse_block);
for (plane = 0; plane < MAX_MB_PLANE; ++plane) {
tile_data->xd.plane[plane].eob = row_mt_worker_data->eob[plane];
@@ -1783,7 +2052,8 @@
row_mt_worker_data->dqcoeff[plane];
}
tile_data->xd.partition = row_mt_worker_data->partition;
- recon_partition(tile_data, pbi, mi_row, mi_col, BLOCK_64X64, 4);
+ process_partition(tile_data, pbi, mi_row, mi_col, BLOCK_64X64, 4,
+ RECON, recon_block);
} else {
decode_partition(tile_data, pbi, mi_row, mi_col, BLOCK_64X64, 4);
}
@@ -1951,23 +2221,13 @@
return (buf_a->size < buf_b->size) - (buf_a->size > buf_b->size);
}
-static const uint8_t *decode_tiles_mt(VP9Decoder *pbi, const uint8_t *data,
- const uint8_t *data_end) {
+static INLINE void init_mt(VP9Decoder *pbi) {
+ int n;
VP9_COMMON *const cm = &pbi->common;
- const VPxWorkerInterface *const winterface = vpx_get_worker_interface();
- const uint8_t *bit_reader_end = NULL;
VP9LfSync *lf_row_sync = &pbi->lf_row_sync;
- YV12_BUFFER_CONFIG *const new_fb = get_frame_new_buffer(cm);
const int aligned_mi_cols = mi_cols_aligned_to_sb(cm->mi_cols);
- const int tile_cols = 1 << cm->log2_tile_cols;
- const int tile_rows = 1 << cm->log2_tile_rows;
- const int num_workers = VPXMIN(pbi->max_threads, tile_cols);
- int n;
+ const VPxWorkerInterface *const winterface = vpx_get_worker_interface();
- assert(tile_cols <= (1 << 6));
- assert(tile_rows == 1);
- (void)tile_rows;
-
if (pbi->num_tile_workers == 0) {
const int num_threads = pbi->max_threads;
CHECK_MEM_ERROR(cm, pbi->tile_workers,
@@ -1985,14 +2245,163 @@
}
// Initialize LPF
- if (pbi->lpf_mt_opt && cm->lf.filter_level && !cm->skip_loop_filter) {
+ if ((pbi->lpf_mt_opt || pbi->row_mt) && cm->lf.filter_level &&
+ !cm->skip_loop_filter) {
vp9_lpf_mt_init(lf_row_sync, cm, cm->lf.filter_level,
pbi->num_tile_workers);
}
+ // Note: this memset assumes above_context[0], [1] and [2]
+ // are allocated as part of the same buffer.
+ memset(cm->above_context, 0,
+ sizeof(*cm->above_context) * MAX_MB_PLANE * 2 * aligned_mi_cols);
+
+ memset(cm->above_seg_context, 0,
+ sizeof(*cm->above_seg_context) * aligned_mi_cols);
+
+ vp9_reset_lfm(cm);
+}
+
+static const uint8_t *decode_tiles_row_wise_mt(VP9Decoder *pbi,
+ const uint8_t *data,
+ const uint8_t *data_end) {
+ VP9_COMMON *const cm = &pbi->common;
+ RowMTWorkerData *const row_mt_worker_data = pbi->row_mt_worker_data;
+ const VPxWorkerInterface *const winterface = vpx_get_worker_interface();
+ const int tile_cols = 1 << cm->log2_tile_cols;
+ const int tile_rows = 1 << cm->log2_tile_rows;
+ const int num_workers = pbi->max_threads;
+ int i, n;
+ int col;
+ int corrupted = 0;
+ const int sb_rows = mi_cols_aligned_to_sb(cm->mi_rows) >> MI_BLOCK_SIZE_LOG2;
+ const int sb_cols = mi_cols_aligned_to_sb(cm->mi_cols) >> MI_BLOCK_SIZE_LOG2;
+ VP9LfSync *lf_row_sync = &pbi->lf_row_sync;
+ YV12_BUFFER_CONFIG *const new_fb = get_frame_new_buffer(cm);
+
+ assert(tile_cols <= (1 << 6));
+ assert(tile_rows == 1);
+ (void)tile_rows;
+
+ memset(row_mt_worker_data->recon_map, 0,
+ sb_rows * sb_cols * sizeof(*row_mt_worker_data->recon_map));
+
+ init_mt(pbi);
+
// Reset tile decoding hook
for (n = 0; n < num_workers; ++n) {
VPxWorker *const worker = &pbi->tile_workers[n];
+ ThreadData *const thread_data = &pbi->row_mt_worker_data->thread_data[n];
+ winterface->sync(worker);
+
+ if (cm->lf.filter_level && !cm->skip_loop_filter) {
+ thread_data->lf_sync = lf_row_sync;
+ thread_data->lf_data = &thread_data->lf_sync->lfdata[n];
+ vp9_loop_filter_data_reset(thread_data->lf_data, new_fb, cm,
+ pbi->mb.plane);
+ }
+
+ thread_data->pbi = pbi;
+
+ worker->hook = (VPxWorkerHook)row_decode_worker_hook;
+ worker->data1 = thread_data;
+ worker->data2 = (void *)&row_mt_worker_data->data_end;
+ }
+
+ for (col = 0; col < tile_cols; ++col) {
+ TileWorkerData *const tile_data = &pbi->tile_worker_data[col];
+ tile_data->xd = pbi->mb;
+ tile_data->xd.counts =
+ cm->frame_parallel_decoding_mode ? NULL : &tile_data->counts;
+ }
+
+ /* Reset the jobq to start of the jobq buffer */
+ vp9_jobq_reset(&row_mt_worker_data->jobq);
+ row_mt_worker_data->num_tiles_done = 0;
+ row_mt_worker_data->data_end = NULL;
+
+ // Load tile data into tile_buffers
+ get_tile_buffers(pbi, data, data_end, tile_cols, tile_rows,
+ &pbi->tile_buffers);
+
+ // Initialize thread frame counts.
+ if (!cm->frame_parallel_decoding_mode) {
+ for (col = 0; col < tile_cols; ++col) {
+ TileWorkerData *const tile_data =
+ (TileWorkerData *)&pbi->tile_worker_data[col];
+ vp9_zero(tile_data->counts);
+ }
+ }
+
+ // queue parse jobs for 0th row of every tile
+ for (col = 0; col < tile_cols; ++col) {
+ Job parse_job;
+ parse_job.row_num = 0;
+ parse_job.tile_col = col;
+ parse_job.job_type = PARSE_JOB;
+ vp9_jobq_queue(&row_mt_worker_data->jobq, &parse_job, sizeof(parse_job));
+ }
+
+ for (i = 0; i < num_workers; ++i) {
+ VPxWorker *const worker = &pbi->tile_workers[i];
+ worker->had_error = 0;
+ if (i == num_workers - 1) {
+ winterface->execute(worker);
+ } else {
+ winterface->launch(worker);
+ }
+ }
+
+ for (; n > 0; --n) {
+ VPxWorker *const worker = &pbi->tile_workers[n - 1];
+ // TODO(jzern): The tile may have specific error data associated with
+ // its vpx_internal_error_info which could be propagated to the main info
+ // in cm. Additionally once the threads have been synced and an error is
+ // detected, there's no point in continuing to decode tiles.
+ corrupted |= !winterface->sync(worker);
+ }
+
+ pbi->mb.corrupted = corrupted;
+
+ {
+ /* Set data end */
+ TileWorkerData *const tile_data = &pbi->tile_worker_data[tile_cols - 1];
+ row_mt_worker_data->data_end = vpx_reader_find_end(&tile_data->bit_reader);
+ }
+
+ // Accumulate thread frame counts.
+ if (!cm->frame_parallel_decoding_mode) {
+ for (i = 0; i < tile_cols; ++i) {
+ TileWorkerData *const tile_data =
+ (TileWorkerData *)&pbi->tile_worker_data[i];
+ vp9_accumulate_frame_counts(&cm->counts, &tile_data->counts, 1);
+ }
+ }
+
+ return row_mt_worker_data->data_end;
+}
+
+static const uint8_t *decode_tiles_mt(VP9Decoder *pbi, const uint8_t *data,
+ const uint8_t *data_end) {
+ VP9_COMMON *const cm = &pbi->common;
+ const VPxWorkerInterface *const winterface = vpx_get_worker_interface();
+ const uint8_t *bit_reader_end = NULL;
+ VP9LfSync *lf_row_sync = &pbi->lf_row_sync;
+ YV12_BUFFER_CONFIG *const new_fb = get_frame_new_buffer(cm);
+ const int tile_cols = 1 << cm->log2_tile_cols;
+ const int tile_rows = 1 << cm->log2_tile_rows;
+ const int num_workers = VPXMIN(pbi->max_threads, tile_cols);
+ int n;
+
+ assert(tile_cols <= (1 << 6));
+ assert(tile_rows == 1);
+ (void)tile_rows;
+
+ init_mt(pbi);
+
+ // Reset tile decoding hook
+ for (n = 0; n < num_workers; ++n) {
+ VPxWorker *const worker = &pbi->tile_workers[n];
TileWorkerData *const tile_data =
&pbi->tile_worker_data[n + pbi->total_tiles];
winterface->sync(worker);
@@ -2012,15 +2421,6 @@
worker->data2 = pbi;
}
- // Note: this memset assumes above_context[0], [1] and [2]
- // are allocated as part of the same buffer.
- memset(cm->above_context, 0,
- sizeof(*cm->above_context) * MAX_MB_PLANE * 2 * aligned_mi_cols);
- memset(cm->above_seg_context, 0,
- sizeof(*cm->above_seg_context) * aligned_mi_cols);
-
- vp9_reset_lfm(cm);
-
// Load tile data into tile_buffers
get_tile_buffers(pbi, data, data_end, tile_cols, tile_rows,
&pbi->tile_buffers);
@@ -2366,17 +2766,20 @@
setup_tile_info(cm, rb);
if (pbi->row_mt == 1) {
int num_sbs = 1;
+ const int aligned_rows = mi_cols_aligned_to_sb(cm->mi_rows);
+ const int sb_rows = aligned_rows >> MI_BLOCK_SIZE_LOG2;
if (pbi->row_mt_worker_data == NULL) {
CHECK_MEM_ERROR(cm, pbi->row_mt_worker_data,
vpx_calloc(1, sizeof(*pbi->row_mt_worker_data)));
+#if CONFIG_MULTITHREAD
+ pthread_mutex_init(&pbi->row_mt_worker_data->recon_done_mutex, NULL);
+#endif
}
if (pbi->max_threads > 1) {
const int aligned_cols = mi_cols_aligned_to_sb(cm->mi_cols);
const int sb_cols = aligned_cols >> MI_BLOCK_SIZE_LOG2;
- const int aligned_rows = mi_cols_aligned_to_sb(cm->mi_rows);
- const int sb_rows = aligned_rows >> MI_BLOCK_SIZE_LOG2;
num_sbs = sb_cols * sb_rows;
}
@@ -2383,8 +2786,10 @@
if (num_sbs > pbi->row_mt_worker_data->num_sbs) {
vp9_dec_free_row_mt_mem(pbi->row_mt_worker_data);
- vp9_dec_alloc_row_mt_mem(pbi->row_mt_worker_data, cm, num_sbs);
+ vp9_dec_alloc_row_mt_mem(pbi->row_mt_worker_data, cm, num_sbs,
+ pbi->max_threads, sb_rows << cm->log2_tile_cols);
}
+ vp9_jobq_alloc(pbi);
}
sz = vpx_rb_read_literal(rb, 16);
@@ -2544,21 +2949,27 @@
pbi->total_tiles = tile_rows * tile_cols;
}
- if (pbi->max_threads > 1 && tile_rows == 1 && tile_cols > 1) {
- // Multi-threaded tile decoder
- *p_data_end = decode_tiles_mt(pbi, data + first_partition_size, data_end);
- if (!pbi->lpf_mt_opt) {
- if (!xd->corrupted) {
- if (!cm->skip_loop_filter) {
- // If multiple threads are used to decode tiles, then we use those
- // threads to do parallel loopfiltering.
- vp9_loop_filter_frame_mt(new_fb, cm, pbi->mb.plane,
- cm->lf.filter_level, 0, 0, pbi->tile_workers,
- pbi->num_tile_workers, &pbi->lf_row_sync);
+ if (pbi->max_threads > 1 && tile_rows == 1 &&
+ (tile_cols > 1 || pbi->row_mt == 1)) {
+ if (pbi->row_mt == 1) {
+ *p_data_end =
+ decode_tiles_row_wise_mt(pbi, data + first_partition_size, data_end);
+ } else {
+ // Multi-threaded tile decoder
+ *p_data_end = decode_tiles_mt(pbi, data + first_partition_size, data_end);
+ if (!pbi->lpf_mt_opt) {
+ if (!xd->corrupted) {
+ if (!cm->skip_loop_filter) {
+ // If multiple threads are used to decode tiles, then we use those
+ // threads to do parallel loopfiltering.
+ vp9_loop_filter_frame_mt(
+ new_fb, cm, pbi->mb.plane, cm->lf.filter_level, 0, 0,
+ pbi->tile_workers, pbi->num_tile_workers, &pbi->lf_row_sync);
+ }
+ } else {
+ vpx_internal_error(&cm->error, VPX_CODEC_CORRUPT_FRAME,
+ "Decode failed. Frame data is corrupted.");
}
- } else {
- vpx_internal_error(&cm->error, VPX_CODEC_CORRUPT_FRAME,
- "Decode failed. Frame data is corrupted.");
}
}
} else {
--- a/vp9/decoder/vp9_decoder.c
+++ b/vp9/decoder/vp9_decoder.c
@@ -56,10 +56,34 @@
}
void vp9_dec_alloc_row_mt_mem(RowMTWorkerData *row_mt_worker_data,
- VP9_COMMON *cm, int num_sbs) {
+ VP9_COMMON *cm, int num_sbs, int max_threads,
+ int num_jobs) {
int plane;
const size_t dqcoeff_size = (num_sbs << DQCOEFFS_PER_SB_LOG2) *
sizeof(*row_mt_worker_data->dqcoeff[0]);
+ row_mt_worker_data->num_jobs = num_jobs;
+#if CONFIG_MULTITHREAD
+ {
+ int i;
+ CHECK_MEM_ERROR(
+ cm, row_mt_worker_data->recon_sync_mutex,
+ vpx_malloc(sizeof(*row_mt_worker_data->recon_sync_mutex) * num_jobs));
+ if (row_mt_worker_data->recon_sync_mutex) {
+ for (i = 0; i < num_jobs; ++i) {
+ pthread_mutex_init(&row_mt_worker_data->recon_sync_mutex[i], NULL);
+ }
+ }
+
+ CHECK_MEM_ERROR(
+ cm, row_mt_worker_data->recon_sync_cond,
+ vpx_malloc(sizeof(*row_mt_worker_data->recon_sync_cond) * num_jobs));
+ if (row_mt_worker_data->recon_sync_cond) {
+ for (i = 0; i < num_jobs; ++i) {
+ pthread_cond_init(&row_mt_worker_data->recon_sync_cond[i], NULL);
+ }
+ }
+ }
+#endif
row_mt_worker_data->num_sbs = num_sbs;
for (plane = 0; plane < 3; ++plane) {
CHECK_MEM_ERROR(cm, row_mt_worker_data->dqcoeff[plane],
@@ -74,11 +98,36 @@
sizeof(*row_mt_worker_data->partition)));
CHECK_MEM_ERROR(cm, row_mt_worker_data->recon_map,
vpx_calloc(num_sbs, sizeof(*row_mt_worker_data->recon_map)));
+
+ // allocate memory for thread_data
+ if (row_mt_worker_data->thread_data == NULL) {
+ const size_t thread_size =
+ max_threads * sizeof(*row_mt_worker_data->thread_data);
+ CHECK_MEM_ERROR(cm, row_mt_worker_data->thread_data,
+ vpx_memalign(32, thread_size));
+ }
}
void vp9_dec_free_row_mt_mem(RowMTWorkerData *row_mt_worker_data) {
if (row_mt_worker_data != NULL) {
int plane;
+#if CONFIG_MULTITHREAD
+ int i;
+ if (row_mt_worker_data->recon_sync_mutex != NULL) {
+ for (i = 0; i < row_mt_worker_data->num_jobs; ++i) {
+ pthread_mutex_destroy(&row_mt_worker_data->recon_sync_mutex[i]);
+ }
+ vpx_free(row_mt_worker_data->recon_sync_mutex);
+ row_mt_worker_data->recon_sync_mutex = NULL;
+ }
+ if (row_mt_worker_data->recon_sync_cond != NULL) {
+ for (i = 0; i < row_mt_worker_data->num_jobs; ++i) {
+ pthread_cond_destroy(&row_mt_worker_data->recon_sync_cond[i]);
+ }
+ vpx_free(row_mt_worker_data->recon_sync_cond);
+ row_mt_worker_data->recon_sync_cond = NULL;
+ }
+#endif
for (plane = 0; plane < 3; ++plane) {
vpx_free(row_mt_worker_data->eob[plane]);
row_mt_worker_data->eob[plane] = NULL;
@@ -89,6 +138,8 @@
row_mt_worker_data->partition = NULL;
vpx_free(row_mt_worker_data->recon_map);
row_mt_worker_data->recon_map = NULL;
+ vpx_free(row_mt_worker_data->thread_data);
+ row_mt_worker_data->thread_data = NULL;
}
}
@@ -179,8 +230,16 @@
if (pbi->row_mt == 1) {
vp9_dec_free_row_mt_mem(pbi->row_mt_worker_data);
+ if (pbi->row_mt_worker_data != NULL) {
+ vp9_jobq_deinit(&pbi->row_mt_worker_data->jobq);
+ vpx_free(pbi->row_mt_worker_data->jobq_buf);
+#if CONFIG_MULTITHREAD
+ pthread_mutex_destroy(&pbi->row_mt_worker_data->recon_done_mutex);
+#endif
+ }
vpx_free(pbi->row_mt_worker_data);
}
+
vp9_remove_common(&pbi->common);
vpx_free(pbi);
}
--- a/vp9/decoder/vp9_decoder.h
+++ b/vp9/decoder/vp9_decoder.h
@@ -21,6 +21,7 @@
#include "vp9/common/vp9_thread_common.h"
#include "vp9/common/vp9_onyxc_int.h"
#include "vp9/common/vp9_ppflags.h"
+#include "./vp9_job_queue.h"
#ifdef __cplusplus
extern "C" {
@@ -30,6 +31,14 @@
#define DQCOEFFS_PER_SB_LOG2 12
#define PARTITIONS_PER_SB 85
+typedef enum JobType { PARSE_JOB, RECON_JOB, LPF_JOB } JobType;
+
+typedef struct ThreadData {
+ struct VP9Decoder *pbi;
+ LFWorkerData *lf_data;
+ VP9LfSync *lf_sync;
+} ThreadData;
+
typedef struct TileBuffer {
const uint8_t *data;
size_t size;
@@ -49,6 +58,11 @@
struct vpx_internal_error_info error_info;
} TileWorkerData;
+typedef void (*process_block_fn_t)(TileWorkerData *twd,
+ struct VP9Decoder *const pbi, int mi_row,
+ int mi_col, BLOCK_SIZE bsize, int bwl,
+ int bhl);
+
typedef struct RowMTWorkerData {
int num_sbs;
int *eob[MAX_MB_PLANE];
@@ -55,8 +69,27 @@
PARTITION_TYPE *partition;
tran_low_t *dqcoeff[MAX_MB_PLANE];
int8_t *recon_map;
+ const uint8_t *data_end;
+ uint8_t *jobq_buf;
+ JobQueueRowMt jobq;
+ size_t jobq_size;
+ int num_tiles_done;
+ int num_jobs;
+#if CONFIG_MULTITHREAD
+ pthread_mutex_t recon_done_mutex;
+ pthread_mutex_t *recon_sync_mutex;
+ pthread_cond_t *recon_sync_cond;
+#endif
+ ThreadData *thread_data;
} RowMTWorkerData;
+/* Structure to queue and dequeue row decode jobs */
+typedef struct Job {
+ int row_num;
+ int tile_col;
+ JobType job_type;
+} Job;
+
typedef struct VP9Decoder {
DECLARE_ALIGNED(16, MACROBLOCKD, mb);
@@ -128,7 +161,8 @@
void vp9_decoder_remove(struct VP9Decoder *pbi);
void vp9_dec_alloc_row_mt_mem(RowMTWorkerData *row_mt_worker_data,
- VP9_COMMON *cm, int num_sbs);
+ VP9_COMMON *cm, int num_sbs, int max_threads,
+ int num_jobs);
void vp9_dec_free_row_mt_mem(RowMTWorkerData *row_mt_worker_data);
static INLINE void decrease_ref_count(int idx, RefCntBuffer *const frame_bufs,
--- /dev/null
+++ b/vp9/decoder/vp9_job_queue.c
@@ -1,0 +1,124 @@
+/*
+ * Copyright (c) 2018 The WebM project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include <assert.h>
+#include <string.h>
+
+#include "vpx/vpx_integer.h"
+
+#include "vp9/decoder/vp9_job_queue.h"
+
+void vp9_jobq_init(JobQueueRowMt *jobq, uint8_t *buf, size_t buf_size) {
+#if CONFIG_MULTITHREAD
+ pthread_mutex_init(&jobq->mutex, NULL);
+ pthread_cond_init(&jobq->cond, NULL);
+#endif
+ jobq->buf_base = buf;
+ jobq->buf_wr = buf;
+ jobq->buf_rd = buf;
+ jobq->buf_end = buf + buf_size;
+ jobq->terminate = 0;
+}
+
+void vp9_jobq_reset(JobQueueRowMt *jobq) {
+#if CONFIG_MULTITHREAD
+ pthread_mutex_lock(&jobq->mutex);
+#endif
+ jobq->buf_wr = jobq->buf_base;
+ jobq->buf_rd = jobq->buf_base;
+ jobq->terminate = 0;
+#if CONFIG_MULTITHREAD
+ pthread_mutex_unlock(&jobq->mutex);
+#endif
+}
+
+void vp9_jobq_deinit(JobQueueRowMt *jobq) {
+ vp9_jobq_reset(jobq);
+#if CONFIG_MULTITHREAD
+ pthread_mutex_destroy(&jobq->mutex);
+ pthread_cond_destroy(&jobq->cond);
+#endif
+}
+
+void vp9_jobq_terminate(JobQueueRowMt *jobq) {
+#if CONFIG_MULTITHREAD
+ pthread_mutex_lock(&jobq->mutex);
+#endif
+ jobq->terminate = 1;
+#if CONFIG_MULTITHREAD
+ pthread_cond_broadcast(&jobq->cond);
+ pthread_mutex_unlock(&jobq->mutex);
+#endif
+}
+
+int vp9_jobq_queue(JobQueueRowMt *jobq, void *job, size_t job_size) {
+ int ret = 0;
+#if CONFIG_MULTITHREAD
+ pthread_mutex_lock(&jobq->mutex);
+#endif
+ if (jobq->buf_end >= jobq->buf_wr + job_size) {
+ memcpy(jobq->buf_wr, job, job_size);
+ jobq->buf_wr = jobq->buf_wr + job_size;
+#if CONFIG_MULTITHREAD
+ pthread_cond_signal(&jobq->cond);
+#endif
+ ret = 0;
+ } else {
+ /* Wrap around case is not supported */
+ assert(0);
+ ret = 1;
+ }
+#if CONFIG_MULTITHREAD
+ pthread_mutex_unlock(&jobq->mutex);
+#endif
+ return ret;
+}
+
+int vp9_jobq_dequeue(JobQueueRowMt *jobq, void *job, size_t job_size,
+ int blocking) {
+ int ret = 0;
+#if CONFIG_MULTITHREAD
+ pthread_mutex_lock(&jobq->mutex);
+#endif
+ if (jobq->buf_end >= jobq->buf_rd + job_size) {
+ while (1) {
+ if (jobq->buf_wr >= jobq->buf_rd + job_size) {
+ memcpy(job, jobq->buf_rd, job_size);
+ jobq->buf_rd = jobq->buf_rd + job_size;
+ ret = 0;
+ break;
+ } else {
+ /* If all the entries have been dequeued, then break and return */
+ if (jobq->terminate == 1) {
+ ret = 1;
+ break;
+ }
+ if (blocking == 1) {
+#if CONFIG_MULTITHREAD
+ pthread_cond_wait(&jobq->cond, &jobq->mutex);
+#endif
+ } else {
+ /* If there is no job available,
+ * and this is non blocking call then return fail */
+ ret = 1;
+ break;
+ }
+ }
+ }
+ } else {
+ /* Wrap around case is not supported */
+ ret = 1;
+ }
+#if CONFIG_MULTITHREAD
+ pthread_mutex_unlock(&jobq->mutex);
+#endif
+
+ return ret;
+}
--- /dev/null
+++ b/vp9/decoder/vp9_job_queue.h
@@ -1,0 +1,45 @@
+/*
+ * Copyright (c) 2018 The WebM project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef VPX_VP9_DECODER_VP9_JOB_QUEUE_H_
+#define VPX_VP9_DECODER_VP9_JOB_QUEUE_H_
+
+#include "vpx_util/vpx_thread.h"
+
+typedef struct {
+ // Pointer to buffer base which contains the jobs
+ uint8_t *buf_base;
+
+ // Pointer to current address where new job can be added
+ uint8_t *volatile buf_wr;
+
+ // Pointer to current address from where next job can be obtained
+ uint8_t *volatile buf_rd;
+
+ // Pointer to end of job buffer
+ uint8_t *buf_end;
+
+ int terminate;
+
+#if CONFIG_MULTITHREAD
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+#endif
+} JobQueueRowMt;
+
+void vp9_jobq_init(JobQueueRowMt *jobq, uint8_t *buf, size_t buf_size);
+void vp9_jobq_reset(JobQueueRowMt *jobq);
+void vp9_jobq_deinit(JobQueueRowMt *jobq);
+void vp9_jobq_terminate(JobQueueRowMt *jobq);
+int vp9_jobq_queue(JobQueueRowMt *jobq, void *job, size_t job_size);
+int vp9_jobq_dequeue(JobQueueRowMt *jobq, void *job, size_t job_size,
+ int blocking);
+
+#endif // VPX_VP9_DECODER_VP9_JOB_QUEUE_H_
--- a/vp9/vp9dx.mk
+++ b/vp9/vp9dx.mk
@@ -28,5 +28,7 @@
VP9_DX_SRCS-yes += decoder/vp9_decoder.h
VP9_DX_SRCS-yes += decoder/vp9_dsubexp.c
VP9_DX_SRCS-yes += decoder/vp9_dsubexp.h
+VP9_DX_SRCS-yes += decoder/vp9_job_queue.c
+VP9_DX_SRCS-yes += decoder/vp9_job_queue.h
VP9_DX_SRCS-yes := $(filter-out $(VP9_DX_SRCS_REMOVE-yes),$(VP9_DX_SRCS-yes))
--- a/vpx_util/vpx_thread.h
+++ b/vpx_util/vpx_thread.h
@@ -211,6 +211,17 @@
#endif
return !ok;
}
+
+static INLINE int sched_yield() {
+ int ok = 0;
+#if _WIN32_WINNT >= 0x0400 // Windows XP and above
+ SwitchToThread();
+#else
+ Sleep(0);
+#endif // _WIN32_WINNT >= 0x0400
+ return ok;
+}
+
#elif defined(__OS2__)
#define INCL_DOS
#include <os2.h> // NOLINT