Fix #2643. (Make progress reporting work right again in the loader). close[t:2643]

git-svn-id: file:///svn/toku/tokudb@20470 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
Bradley C. Kuszmaul 2013-04-16 23:59:14 -04:00 committed by Yoni Fogel
parent af9d10bcd2
commit d567a309ac
3 changed files with 51 additions and 37 deletions

View file

@ -190,8 +190,7 @@ int merge_row_arrays_base (struct row dest[/*an+bn*/], struct row a[/*an*/], int
int merge_files (struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func, int progress_allocation, QUEUE);
CILK_BEGIN
int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func,
int progress_allocation);
int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func);
int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, brt_compare_func, BRTLOADER, struct rowset *);
@ -200,8 +199,7 @@ CILK_END
//int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct descriptor *descriptor, int progress_allocation);
int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sources, DBUFIO_FILESET bfs, FIDX srcs_fidxs[/*n_sources*/], BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare, int progress_allocation);
int brt_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func,
int progress_allocation);
int brt_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func);
// This is probably only for testing.
int toku_loader_write_brt_from_q_in_C (BRTLOADER bl,

View file

@ -329,6 +329,8 @@ void toku_brtloader_internal_destroy (BRTLOADER bl, BOOL is_error) {
brt_loader_destroy_error_callback(&bl->error_callback);
brt_loader_destroy_poll_callback(&bl->poll_callback);
//printf("Progress=%d/%d\n", bl->progress, PROGRESS_MAX);
toku_free(bl);
}
@ -826,8 +828,7 @@ static int finish_primary_rows_internal (BRTLOADER bl)
cilk_for (int i = 0; i < bl->N; i++) {
struct rowset *rows = &(bl->rows[i]);
//printf("%s:%d extractor finishing index %d with %ld rows\n", __FILE__, __LINE__, i, rows->n_rows);
int progress_this_sort = 0; // fix?
ra[i] = cilk_spawn sort_and_write_rows(*rows, &(bl->fs[i]), bl, i, bl->dbs[i], bl->bt_compare_funs[i], progress_this_sort);
ra[i] = cilk_spawn sort_and_write_rows(*rows, &(bl->fs[i]), bl, i, bl->dbs[i], bl->bt_compare_funs[i]);
zero_rowset(rows);
}
// Implicit cilk_sync after that cilk_for loop.
@ -1017,8 +1018,7 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
if (row_wont_fit(rows, skey.size + sval.size)) {
//printf("%s:%d rows.n_rows=%ld rows.n_bytes=%ld\n", __FILE__, __LINE__, rows->n_rows, rows->n_bytes);
BL_TRACE(blt_extractor);
int progress_this_sort = 0; // fix?
int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare, progress_this_sort); // cannot spawn this because of the race on rows. If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however.
int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare); // cannot spawn this because of the race on rows. If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however.
// If we do spawn this, then we must account for the additional storage in the memory_per_rowset() function.
BL_TRACE(blt_sort_and_write_rows);
init_rowset(rows, memory_per_rowset(bl)); // we passed the contents of rows to sort_and_write_rows.
@ -1371,8 +1371,7 @@ static int update_progress (int N,
}
CILK_BEGIN
int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare,
int progress_allocation)
int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare)
/* Effect: Given a rowset, sort it and write it to a temporary file.
* Arguments:
* rows the rowset
@ -1382,6 +1381,7 @@ int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER
* compare The comparison function.
* Returns 0 on success, otherwise an error number.
* Destroy the rowset after finishing it.
* Note: There is no sense in trying to calculate progress by this function since it's done concurrently with the loader->put operation.
*/
{
//printf(" sort_and_write use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
@ -1397,11 +1397,6 @@ int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER
if (r != 0) result = r;
//bl_time_t after_sort = bl_time_now();
if (result == 0) {
r = update_progress(progress_allocation/2, bl, "sorted rows");
progress_allocation -= progress_allocation/2;
if (r != 0) result = r;
}
if (result == 0) {
r = extend_fileset(bl, fs, &sfile);
@ -1429,22 +1424,17 @@ int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER
destroy_rowset(&rows);
//bl_time_t after_write = bl_time_now();
if (result == 0) {
r = update_progress(progress_allocation, bl, "wrote sorted");
if (r != 0) result = r;
}
return result;
}
CILK_END
// C function for testing sort_and_write_rows
int brt_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare,
int progress_allocation) {
int brt_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare) {
#if defined(__cilkplusplus)
return cilk::run(sort_and_write_rows, *rows, fs, bl, which_db, dest_db, compare, progress_allocation);
return cilk::run(sort_and_write_rows, *rows, fs, bl, which_db, dest_db, compare);
#else
return sort_and_write_rows (*rows, fs, bl, which_db, dest_db, compare, progress_allocation);
return sort_and_write_rows (*rows, fs, bl, which_db, dest_db, compare);
#endif
}
@ -1705,6 +1695,7 @@ int merge_files (struct merge_fileset *fs,
while (fs->n_temp_files > 0) {
int progress_allocation_for_this_pass = progress_allocation/n_passes_left;
progress_allocation -= progress_allocation_for_this_pass;
//printf("%s:%d n_passes_left=%d progress_allocation_for_this_pass=%d\n", __FILE__, __LINE__, n_passes_left, progress_allocation_for_this_pass);
invariant(fs->n_temp_files>0);
struct merge_fileset next_file_set;
@ -1716,9 +1707,9 @@ int merge_files (struct merge_fileset *fs,
// We are about to do n_to_merge/n_temp_files of the remaining for this pass.
int progress_allocation_for_this_subpass = progress_allocation_for_this_pass * (double)n_to_merge / (double)fs->n_temp_files;
//printf("%s:%d progress_allocation_for_this_subpass=%d n_temp_files=%d\n", __FILE__, __LINE__, progress_allocation_for_this_subpass, fs->n_temp_files);
progress_allocation_for_this_pass -= progress_allocation_for_this_subpass;
//printf("%s:%d merging\n", __FILE__, __LINE__);
FIDX merged_data = FIDX_NULL;
@ -1776,10 +1767,8 @@ int merge_files (struct merge_fileset *fs,
// Update the progress
n_passes_left--;
{
int r = update_progress(progress_allocation_for_this_pass, bl, "merging files");
if (r!=0 && result==0) result = r;
}
if (result==0) { invariant(progress_allocation_for_this_pass==0); }
if (result!=0) break;
}
@ -1787,6 +1776,7 @@ int merge_files (struct merge_fileset *fs,
int r = queue_eof(output_q);
if (r!=0 && result==0) result = r;
}
// It's conceivable that the progress_allocation could be nonzero (for example if bl->N==0)
{
int r = update_progress(progress_allocation, bl, "did merge_files");
if (r!=0 && result==0) result = r;
@ -2471,9 +2461,8 @@ static int toku_brt_loader_close_internal (BRTLOADER bl)
toku_free((void*)bl->new_fnames_in_env[i]);
bl->new_fnames_in_env[i] = NULL;
invariant(0<=bl->progress && bl->progress <= PROGRESS_MAX);
result = update_progress(0, bl, "did index");
if (result) goto error;
}
if (result==0) invariant(remaining_progress==0);
}
invariant(bl->file_infos.n_files_open == 0);
invariant(bl->file_infos.n_files_extant == 0);

View file

@ -208,17 +208,30 @@ static void check_results(DB **dbs)
static void *expect_poll_void = &expect_poll_void;
static uint64_t poll_count=0;
static uint64_t bomb_after_poll_count=UINT64_MAX;
static struct progress_info {
double time;
double progress;
} *progress_infos=NULL;
static int progress_infos_count=0;
static int progress_infos_limit=0;
// timing
static BOOL did_start=FALSE;
static struct timeval start;
static int poll_function (void *extra, float progress) {
if (0) {
static int did_one=0;
static struct timeval start;
if (verbose>=2) {
assert(did_start);
struct timeval now;
gettimeofday(&now, 0);
if (!did_one) {
start=now;
did_one=1;
double elapsed = now.tv_sec - start.tv_sec + 1e-6*(now.tv_usec - start.tv_usec);
printf("Progress: %6.6fs %5.1f%%\n", elapsed, progress*100);
if (progress_infos_count>=progress_infos_limit) {
progress_infos_limit = 2*progress_infos_limit + 1;
XREALLOC_N(progress_infos_limit, progress_infos);
}
printf("%6.6f %5.1f%%\n", now.tv_sec - start.tv_sec + 1e-6*(now.tv_usec - start.tv_usec), progress*100);
progress_infos[progress_infos_count++] = (struct progress_info){elapsed, progress};
}
assert(extra==expect_poll_void);
assert(0.0<=progress && progress<=1.0);
@ -282,6 +295,9 @@ static void test_loader(DB **dbs)
int n = count_temp(env->i->real_data_dir);
if (verbose) printf("Num temp files = %d\n", n);
did_start = TRUE;
gettimeofday(&start, 0);
// close the loader
printf("%9.6fs closing\n", elapsed_time());
r = loader->close(loader);
@ -374,6 +390,17 @@ int test_main(int argc, char * const *argv) {
do_args(argc, argv);
run_test();
if (free_me) toku_free(free_me);
if (progress_infos) {
if (verbose>=2) {
double ratio=progress_infos[progress_infos_count-1].time/progress_infos[progress_infos_count-1].progress;
printf("Progress ratios:\n");
for (int i=0; i<progress_infos_count; i++) {
printf(" %5.3f\n", (progress_infos[i].time/progress_infos[i].progress)/ratio);
}
}
toku_free(progress_infos);
}
return 0;
}