From 0827928d7615d7ec80bd7fdc39e5c4497d96c7a4 Mon Sep 17 00:00:00 2001 From: "Bradley C. Kuszmaul" Date: Tue, 16 Apr 2013 23:59:11 -0400 Subject: [PATCH] Merge in the changes that make the reads not use fidx locks too. Refs #2571. [t:2571] {{{ svn merge -r 20067:20072 https://svn.tokutek.com/tokudb/toku/tokudb.2571 }}} . git-svn-id: file:///svn/toku/tokudb@20073 c7de825b-a66e-492c-adef-691d508d4ae1 --- newbrt/brtloader-internal.h | 2 +- newbrt/brtloader.c | 53 ++++++++++++++++++++--------------- newbrt/tests/brtloader-test.c | 2 +- 3 files changed, 32 insertions(+), 25 deletions(-) diff --git a/newbrt/brtloader-internal.h b/newbrt/brtloader-internal.h index 3433a389d0c..4fd7ac73f4f 100644 --- a/newbrt/brtloader-internal.h +++ b/newbrt/brtloader-internal.h @@ -58,7 +58,7 @@ void destroy_rowset (struct rowset *rows); void add_row (struct rowset *rows, DBT *key, DBT *val); int loader_write_row(DBT *key, DBT *val, FIDX data, FILE*, u_int64_t *dataoff, BRTLOADER bl); -int loader_read_row (FIDX f, DBT *key, DBT *val, BRTLOADER bl); +int loader_read_row (FILE *f, DBT *key, DBT *val, BRTLOADER bl); struct merge_fileset { int n_temp_files, n_temp_files_limit; diff --git a/newbrt/brtloader.c b/newbrt/brtloader.c index ae1b82831cf..0f09bd708c8 100644 --- a/newbrt/brtloader.c +++ b/newbrt/brtloader.c @@ -381,7 +381,7 @@ static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOAD return 0; } -static int bl_fread (void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOADER bl) +static int bl_fread (void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOADER bl) /* Effect: this is a wrapper for fread that returns 0 on success, otherwise returns an error number. * Arguments: * ptr read data into here. @@ -392,7 +392,6 @@ static int bl_fread (void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOAD * Return value: 0 on success, an error number otherwise. */ { - FILE *stream = toku_bl_fidx2file(bl, streami); size_t r = fread(ptr, size, nmemb, stream); if (r==0) { if (feof(stream)) return EOF; @@ -420,18 +419,18 @@ static int bl_write_dbt (DBT *dbt, FILE* datafile, uint64_t *dataoff, BRTLOADER return 0; } -static int bl_read_dbt (/*in*/DBT *dbt, FIDX datafile, BRTLOADER bl) +static int bl_read_dbt (/*in*/DBT *dbt, FILE *stream, BRTLOADER bl) { int len; { int r; - if ((r = bl_fread(&len, sizeof(len), 1, datafile, bl))) return r; + if ((r = bl_fread(&len, sizeof(len), 1, stream, bl))) return r; assert(len>=0); } if ((int)dbt->ulenulen=len; dbt->data=toku_xrealloc(dbt->data, len); } { int r; - if ((r = bl_fread(dbt->data, 1, len, datafile, bl))) return r; + if ((r = bl_fread(dbt->data, 1, len, stream, bl))) return r; } dbt->size = len; return 0; @@ -459,7 +458,7 @@ int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, u_int64_t *data return 0; } -int loader_read_row (FIDX f, DBT *key, DBT *val, BRTLOADER bl) +int loader_read_row (FILE *f, DBT *key, DBT *val, BRTLOADER bl) /* Effect: Read a key value pair from a file. The DBTs must have DB_DBT_REALLOC set. * Arguments: * f where to read it from. @@ -1162,7 +1161,7 @@ int brt_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *f #endif } -static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sources, FIDX srcs_data[/*n_sources*/], BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare, int progress_allocation) +static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sources, FIDX srcs_fidxs[/*n_sources*/], FILE *srcs_files[/*n_sources*/], BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare, int progress_allocation) /* Effect: Given an array of FILE*'s each containing sorted, merge the data and write it to an output. All the files remain open after the merge. * This merge is performed in one pass, so don't pass too many files in. If you need a tree of merges do it elsewhere. * If TO_Q is true then we write rowsets into queue Q. Otherwise we write into dest_data. @@ -1204,7 +1203,7 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou // load pqueue with first value from each source for (int i=0; ifile_infos.lock); assert(r2==0); } - n_rows += bl->file_infos.file_infos[srcs_data[i].idx].n_rows; + n_rows += bl->file_infos.file_infos[srcs_fidxs[i].idx].n_rows; { int r2 = toku_pthread_mutex_unlock(&bl->file_infos.lock); assert(r2==0); } } u_int64_t n_rows_done = 0; @@ -1278,15 +1277,15 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou { // read next row from file that just sourced min value BL_TRACE_QUIET(blt_do_i); - r = loader_read_row(srcs_data[mini], &keys[mini], &vals[mini], bl); + r = loader_read_row(srcs_files[mini], &keys[mini], &vals[mini], bl); BL_TRACE_QUIET(blt_read_row); if (r!=0) { - if (feof(toku_bl_fidx2file(bl, srcs_data[mini]))) { + if (feof(srcs_files[mini])) { // on feof, queue size permanently smaller toku_free(keys[mini].data); toku_free(vals[mini].data); } else { - r = ferror(toku_bl_fidx2file(bl, srcs_data[mini])); + r = ferror(srcs_files[mini]); assert(r!=0); return r; } @@ -1389,13 +1388,18 @@ int merge_files (struct merge_fileset *fs, //printf("%s:%d merging\n", __FILE__, __LINE__); FIDX merged_data = FIDX_NULL; - FIDX *MALLOC_N(n_to_merge, datafiles); - for (int i=0; in_temp_files -1 -i; - datafiles[i] = fs->data_fidxs[idx]; - result = brtloader_fi_reopen(&bl->file_infos, datafiles[i], "r"); + data_fidxs[i] = fs->data_fidxs[idx]; + result = brtloader_fi_reopen(&bl->file_infos, data_fidxs[i], "r"); if (result) { printf("%s:%d r=%d\n", __FILE__, __LINE__, result); break; } + data_files[i] = toku_bl_fidx2file(bl, data_fidxs[i]); } if (result==0 && !to_queue) { result = extend_fileset(bl, &next_file_set, &merged_data); @@ -1403,7 +1407,7 @@ int merge_files (struct merge_fileset *fs, } if (result==0) { - result = merge_some_files(to_queue, merged_data, output_q, n_to_merge, datafiles, bl, which_db, dest_db, compare, progress_allocation_for_this_subpass); + result = merge_some_files(to_queue, merged_data, output_q, n_to_merge, data_fidxs, data_files, bl, which_db, dest_db, compare, progress_allocation_for_this_subpass); // if result!=0, fall through if (result==0) { /*nothing*/;// this is gratuitous, but we need something to give code coverage tools to help us know that it's important to distinguish between result==0 and result!=0 @@ -1412,16 +1416,17 @@ int merge_files (struct merge_fileset *fs, //printf("%s:%d merged\n", __FILE__, __LINE__); for (int i=0; ifile_infos, datafiles[i]); + int r = brtloader_fi_close(&bl->file_infos, data_fidxs[i]); if (r!=0 && result==0) result = r; } { - int r = brtloader_fi_unlink(&bl->file_infos, datafiles[i]); + int r = brtloader_fi_unlink(&bl->file_infos, data_fidxs[i]); if (r!=0 && result==0) result = r; } - datafiles[i] = FIDX_NULL; + data_fidxs[i] = FIDX_NULL; + data_files[i] = NULL; } } @@ -1430,7 +1435,8 @@ int merge_files (struct merge_fileset *fs, int r = brtloader_fi_close(&bl->file_infos, merged_data); if (r!=0 && result==0) result = r; } - toku_free(datafiles); + toku_free(data_files); + toku_free(data_fidxs); if (result!=0) break; } @@ -2249,9 +2255,10 @@ static int read_some_pivots (FIDX pivots_file, int n_to_read, BRTLOADER bl, /*out*/ DBT pivots[/*n_to_read*/]) // pivots is an array to be filled in. The pivots array is uninitialized. { + FILE *pivots_stream = toku_bl_fidx2file(bl, pivots_file); for (int i=0; i