Merge in the changes that make the reads not use fidx locks too. Refs #2571. [t:2571]

{{{
svn merge -r 20067:20072 https://svn.tokutek.com/tokudb/toku/tokudb.2571
}}}
.


git-svn-id: file:///svn/toku/tokudb@20073 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
Bradley C. Kuszmaul 2013-04-16 23:59:11 -04:00 committed by Yoni Fogel
parent 28ba5d6e76
commit 0827928d76
3 changed files with 32 additions and 25 deletions

View file

@ -58,7 +58,7 @@ void destroy_rowset (struct rowset *rows);
void add_row (struct rowset *rows, DBT *key, DBT *val);
int loader_write_row(DBT *key, DBT *val, FIDX data, FILE*, u_int64_t *dataoff, BRTLOADER bl);
int loader_read_row (FIDX f, DBT *key, DBT *val, BRTLOADER bl);
int loader_read_row (FILE *f, DBT *key, DBT *val, BRTLOADER bl);
struct merge_fileset {
int n_temp_files, n_temp_files_limit;

View file

@ -381,7 +381,7 @@ static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOAD
return 0;
}
static int bl_fread (void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOADER bl)
static int bl_fread (void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOADER bl)
/* Effect: this is a wrapper for fread that returns 0 on success, otherwise returns an error number.
* Arguments:
* ptr read data into here.
@ -392,7 +392,6 @@ static int bl_fread (void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOAD
* Return value: 0 on success, an error number otherwise.
*/
{
FILE *stream = toku_bl_fidx2file(bl, streami);
size_t r = fread(ptr, size, nmemb, stream);
if (r==0) {
if (feof(stream)) return EOF;
@ -420,18 +419,18 @@ static int bl_write_dbt (DBT *dbt, FILE* datafile, uint64_t *dataoff, BRTLOADER
return 0;
}
static int bl_read_dbt (/*in*/DBT *dbt, FIDX datafile, BRTLOADER bl)
static int bl_read_dbt (/*in*/DBT *dbt, FILE *stream, BRTLOADER bl)
{
int len;
{
int r;
if ((r = bl_fread(&len, sizeof(len), 1, datafile, bl))) return r;
if ((r = bl_fread(&len, sizeof(len), 1, stream, bl))) return r;
assert(len>=0);
}
if ((int)dbt->ulen<len) { dbt->ulen=len; dbt->data=toku_xrealloc(dbt->data, len); }
{
int r;
if ((r = bl_fread(dbt->data, 1, len, datafile, bl))) return r;
if ((r = bl_fread(dbt->data, 1, len, stream, bl))) return r;
}
dbt->size = len;
return 0;
@ -459,7 +458,7 @@ int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, u_int64_t *data
return 0;
}
int loader_read_row (FIDX f, DBT *key, DBT *val, BRTLOADER bl)
int loader_read_row (FILE *f, DBT *key, DBT *val, BRTLOADER bl)
/* Effect: Read a key value pair from a file. The DBTs must have DB_DBT_REALLOC set.
* Arguments:
* f where to read it from.
@ -1162,7 +1161,7 @@ int brt_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *f
#endif
}
static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sources, FIDX srcs_data[/*n_sources*/], BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare, int progress_allocation)
static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sources, FIDX srcs_fidxs[/*n_sources*/], FILE *srcs_files[/*n_sources*/], BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare, int progress_allocation)
/* Effect: Given an array of FILE*'s each containing sorted, merge the data and write it to an output. All the files remain open after the merge.
* This merge is performed in one pass, so don't pass too many files in. If you need a tree of merges do it elsewhere.
* If TO_Q is true then we write rowsets into queue Q. Otherwise we write into dest_data.
@ -1204,7 +1203,7 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
// load pqueue with first value from each source
for (int i=0; i<n_sources; i++) {
BL_TRACE_QUIET(blt_do_i);
int r = loader_read_row(srcs_data[i], &keys[i], &vals[i], bl);
int r = loader_read_row(srcs_files[i], &keys[i], &vals[i], bl);
BL_TRACE_QUIET(blt_read_row);
if (r==EOF) continue; // if the file is empty, don't initialize the pqueue.
if (r!=0) return r;
@ -1225,7 +1224,7 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
dataoff[i] = 0;
{ int r2 = toku_pthread_mutex_lock(&bl->file_infos.lock); assert(r2==0); }
n_rows += bl->file_infos.file_infos[srcs_data[i].idx].n_rows;
n_rows += bl->file_infos.file_infos[srcs_fidxs[i].idx].n_rows;
{ int r2 = toku_pthread_mutex_unlock(&bl->file_infos.lock); assert(r2==0); }
}
u_int64_t n_rows_done = 0;
@ -1278,15 +1277,15 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
{
// read next row from file that just sourced min value
BL_TRACE_QUIET(blt_do_i);
r = loader_read_row(srcs_data[mini], &keys[mini], &vals[mini], bl);
r = loader_read_row(srcs_files[mini], &keys[mini], &vals[mini], bl);
BL_TRACE_QUIET(blt_read_row);
if (r!=0) {
if (feof(toku_bl_fidx2file(bl, srcs_data[mini]))) {
if (feof(srcs_files[mini])) {
// on feof, queue size permanently smaller
toku_free(keys[mini].data);
toku_free(vals[mini].data);
} else {
r = ferror(toku_bl_fidx2file(bl, srcs_data[mini]));
r = ferror(srcs_files[mini]);
assert(r!=0);
return r;
}
@ -1389,13 +1388,18 @@ int merge_files (struct merge_fileset *fs,
//printf("%s:%d merging\n", __FILE__, __LINE__);
FIDX merged_data = FIDX_NULL;
FIDX *MALLOC_N(n_to_merge, datafiles);
for (int i=0; i<n_to_merge; i++) datafiles[i] = FIDX_NULL;
FIDX *MALLOC_N(n_to_merge, data_fidxs);
FILE **MALLOC_N(n_to_merge, data_files);
for (int i=0; i<n_to_merge; i++) {
data_fidxs[i] = FIDX_NULL;
data_files[i] = NULL;
}
for (int i=0; i<n_to_merge; i++) {
int idx = fs->n_temp_files -1 -i;
datafiles[i] = fs->data_fidxs[idx];
result = brtloader_fi_reopen(&bl->file_infos, datafiles[i], "r");
data_fidxs[i] = fs->data_fidxs[idx];
result = brtloader_fi_reopen(&bl->file_infos, data_fidxs[i], "r");
if (result) { printf("%s:%d r=%d\n", __FILE__, __LINE__, result); break; }
data_files[i] = toku_bl_fidx2file(bl, data_fidxs[i]);
}
if (result==0 && !to_queue) {
result = extend_fileset(bl, &next_file_set, &merged_data);
@ -1403,7 +1407,7 @@ int merge_files (struct merge_fileset *fs,
}
if (result==0) {
result = merge_some_files(to_queue, merged_data, output_q, n_to_merge, datafiles, bl, which_db, dest_db, compare, progress_allocation_for_this_subpass);
result = merge_some_files(to_queue, merged_data, output_q, n_to_merge, data_fidxs, data_files, bl, which_db, dest_db, compare, progress_allocation_for_this_subpass);
// if result!=0, fall through
if (result==0) {
/*nothing*/;// this is gratuitous, but we need something to give code coverage tools to help us know that it's important to distinguish between result==0 and result!=0
@ -1412,16 +1416,17 @@ int merge_files (struct merge_fileset *fs,
//printf("%s:%d merged\n", __FILE__, __LINE__);
for (int i=0; i<n_to_merge; i++) {
if (!fidx_is_null(datafiles[i])) {
if (!fidx_is_null(data_fidxs[i])) {
{
int r = brtloader_fi_close(&bl->file_infos, datafiles[i]);
int r = brtloader_fi_close(&bl->file_infos, data_fidxs[i]);
if (r!=0 && result==0) result = r;
}
{
int r = brtloader_fi_unlink(&bl->file_infos, datafiles[i]);
int r = brtloader_fi_unlink(&bl->file_infos, data_fidxs[i]);
if (r!=0 && result==0) result = r;
}
datafiles[i] = FIDX_NULL;
data_fidxs[i] = FIDX_NULL;
data_files[i] = NULL;
}
}
@ -1430,7 +1435,8 @@ int merge_files (struct merge_fileset *fs,
int r = brtloader_fi_close(&bl->file_infos, merged_data);
if (r!=0 && result==0) result = r;
}
toku_free(datafiles);
toku_free(data_files);
toku_free(data_fidxs);
if (result!=0) break;
}
@ -2249,9 +2255,10 @@ static int read_some_pivots (FIDX pivots_file, int n_to_read, BRTLOADER bl,
/*out*/ DBT pivots[/*n_to_read*/])
// pivots is an array to be filled in. The pivots array is uninitialized.
{
FILE *pivots_stream = toku_bl_fidx2file(bl, pivots_file);
for (int i=0; i<n_to_read; i++) {
memset(&pivots[i], 0, sizeof pivots[i]);
int r = bl_read_dbt(&pivots[i], pivots_file, bl);
int r = bl_read_dbt(&pivots[i], pivots_stream, bl);
if (r!=0) return r;
};
return 0;

View file

@ -201,7 +201,7 @@ static void test_read_write_rows (char *template) {
{
int n_read=0;
DBT key={.size=0}, val={.size=0};
while (0==loader_read_row(file, &key, &val, &bl)) {
while (0==loader_read_row(toku_bl_fidx2file(&bl, file), &key, &val, &bl)) {
assert(strlen(keystrings[n_read])==key.size);
assert(strlen(valstrings[n_read])==val.size);
assert(0==memcmp(keystrings[n_read], key.data, key.size));