Get rid of the gratuitous IDX files.

git-svn-id: file:///svn/toku/tokudb@18835 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
Bradley C. Kuszmaul 2013-04-16 23:59:04 -04:00 committed by Yoni Fogel
parent 0db1dce5de
commit 3bfdfa2921
3 changed files with 21 additions and 64 deletions

View file

@ -41,7 +41,6 @@ struct brtloader_s {
const char *temp_file_template;
FIDX fprimary_rows; // the file index (in the file_infos) for the data
FIDX fprimary_idx; // the file index for the index
u_int64_t fprimary_offset;
CACHETABLE cachetable;
/* To make it easier to recover from errors, we don't use FILE*, instead we use an index into the file_infos. */
@ -74,7 +73,7 @@ int init_rowset (struct rowset *rows);
void destroy_rowset (struct rowset *rows);
void add_row (struct rowset *rows, DBT *key, DBT *val);
int loader_write_row(DBT *key, DBT *val, FIDX data, FIDX idx, u_int64_t *dataoff, BRTLOADER bl);
int loader_write_row(DBT *key, DBT *val, FIDX data, u_int64_t *dataoff, BRTLOADER bl);
int loader_read_row (FIDX f, DBT *key, DBT *val, BRTLOADER bl);
struct error_callback_s {
@ -93,7 +92,6 @@ int mergesort_row_array (struct row rows[/*n*/], int n, DB *dest_db, brt_compare
struct merge_fileset {
int n_temp_files, n_temp_files_limit;
FIDX *data_fidxs;
FIDX *idx_fidxs;
};
void init_merge_fileset (struct merge_fileset *fs);

View file

@ -211,9 +211,8 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
brtloader_init_file_infos(&bl->file_infos);
SET_TO_MY_STRDUP(bl->temp_file_template, temp_file_template);
bl->fprimary_rows = bl->fprimary_idx = FIDX_NULL;
bl->fprimary_rows = FIDX_NULL;
{ int r = brtloader_open_temp_file(bl, &bl->fprimary_rows); if (r!=0) return r; }
{ int r = brtloader_open_temp_file(bl, &bl->fprimary_idx); if (r!=0) return r; }
bl->fprimary_offset = 0;
bl->n_rows = 0;
@ -310,7 +309,7 @@ static int bl_read_dbt (/*in*/DBT *dbt, FIDX datafile, BRTLOADER bl)
return 0;
}
int loader_write_row(DBT *key, DBT *val, FIDX data, FIDX idx, u_int64_t *dataoff, BRTLOADER bl)
int loader_write_row(DBT *key, DBT *val, FIDX data, u_int64_t *dataoff, BRTLOADER bl)
/* Effect: Given a key and a val (both DBTs), write them to a file. Increment *dataoff so that it's up to date.
* Arguments:
* key, val write these.
@ -320,17 +319,13 @@ int loader_write_row(DBT *key, DBT *val, FIDX data, FIDX idx, u_int64_t *dataoff
* Return value: 0 on success, an error number otherwise.
*/
{
int klen = key->size;
int vlen = val->size;
//int klen = key->size;
//int vlen = val->size;
int r;
if ((r=bl_fwrite(dataoff, sizeof(*dataoff), 1, idx, bl))) return r;
int sum = klen+vlen+sizeof(klen)+sizeof(vlen);
if ((r=bl_fwrite(&sum, sizeof(sum), 1, idx, bl))) return r;
// we have a chance to handle the errors because when we close we can delete all the files.
if ((r=bl_write_dbt(key, data, dataoff, bl))) return r;
if ((r=bl_write_dbt(val, data, dataoff, bl))) return r;
bl->file_infos.file_infos[data.idx].n_rows++;
bl->file_infos.file_infos[idx .idx].n_rows++;
return 0;
}
@ -341,7 +336,7 @@ int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val)
{
if (bl->panic) return EINVAL; // previous panic
bl->n_rows++;
return loader_write_row(key, val, bl->fprimary_rows, bl->fprimary_idx, &bl->fprimary_offset, bl);
return loader_write_row(key, val, bl->fprimary_rows, &bl->fprimary_offset, bl);
}
int loader_read_row (FIDX f, DBT *key, DBT *val, BRTLOADER bl)
@ -536,7 +531,6 @@ void init_merge_fileset (struct merge_fileset *fs)
fs->n_temp_files = 0;
fs->n_temp_files_limit = 0;
fs->data_fidxs = NULL;
fs->idx_fidxs = NULL;
}
void destroy_merge_fileset (struct merge_fileset *fs)
@ -545,13 +539,11 @@ void destroy_merge_fileset (struct merge_fileset *fs)
fs->n_temp_files = 0;
fs->n_temp_files_limit = 0;
toku_free(fs->data_fidxs);
toku_free(fs->idx_fidxs);
fs->data_fidxs = NULL;
fs->idx_fidxs = NULL;
}
static int extend_fileset (BRTLOADER bl, struct merge_fileset *fs, FIDX*ffile, FIDX*fidx)
static int extend_fileset (BRTLOADER bl, struct merge_fileset *fs, FIDX*ffile)
/* Effect: Add two files (one for data and one for idx) to the fileset.
* Arguments:
* bl the brtloader (needed to panic if anything goes wrong, and also to get the temp_file_template.
@ -560,22 +552,18 @@ static int extend_fileset (BRTLOADER bl, struct merge_fileset *fs, FIDX*ffile, F
* fidx the index file (which will be open)
*/
{
FIDX sfile, sidx;
FIDX sfile;
int r;
r = brtloader_open_temp_file(bl, &sfile); if (r!=0) return r;
r = brtloader_open_temp_file(bl, &sidx); if (r!=0) return r;
if (fs->n_temp_files+1 > fs->n_temp_files_limit) {
fs->n_temp_files_limit = (fs->n_temp_files+1)*2;
REALLOC_N(fs->n_temp_files_limit, fs->data_fidxs);
REALLOC_N(fs->n_temp_files_limit, fs->idx_fidxs);
}
fs->data_fidxs[fs->n_temp_files] = sfile;
fs->idx_fidxs [fs->n_temp_files] = sidx;
fs->n_temp_files++;
*ffile = sfile;
*fidx = sidx;
return 0;
}
@ -604,7 +592,7 @@ int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADE
*/
{
//printf(" sort_and_write use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
FIDX sfile, sidx;
FIDX sfile;
u_int64_t soffset=0;
// TODO: erase the files, and deal with all the cleanup on error paths
int r = sort_rows(rows, dest_db, compare, error_callback);
@ -615,21 +603,20 @@ int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADE
progress_allocation -= progress_allocation/2;
if (r!=0) return r;
r = extend_fileset(bl, fs, &sfile, &sidx);
r = extend_fileset(bl, fs, &sfile);
if (r!=0) return r;
for (size_t i=0; i<rows->n_rows; i++) {
DBT skey = {.data = rows->data + rows->rows[i].off, .size=rows->rows[i].klen};
DBT sval = {.data = rows->data + rows->rows[i].off + rows->rows[i].klen, .size=rows->rows[i].vlen};
r = loader_write_row(&skey, &sval, sfile, sidx, &soffset, bl);
r = loader_write_row(&skey, &sval, sfile, &soffset, bl);
if (r!=0) return r;
}
r = brtloader_fi_close(&bl->file_infos, sfile); if (r!=0) return r;
r = brtloader_fi_close(&bl->file_infos, sidx); if (r!=0) return r;
return update_progress(progress_allocation, bl, "wrote sorted");
}
static int merge_some_files (FIDX dest_data, FIDX dest_idx, int n_sources, FIDX srcs_data[/*n_sources*/], FIDX srcs_idx[/*n_sources*/], BRTLOADER bl, DB *dest_db, brt_compare_func compare, struct error_callback_s *error_callback, int progress_allocation)
static int merge_some_files (FIDX dest_data, int n_sources, FIDX srcs_data[/*n_sources*/], BRTLOADER bl, DB *dest_db, brt_compare_func compare, struct error_callback_s *error_callback, int progress_allocation)
/* Effect: Given an array of FILE*'s each containing sorted, merge the data and write it to dest. All the files remain open after the merge.
* This merge is performed in one pass, so don't pass too many files in. If you need a tree of merges do it elsewhere.
* Modifies: May modify the arrays of files (but if modified, it must be a permutation so the caller can use that array to close everything.)
@ -637,10 +624,8 @@ static int merge_some_files (FIDX dest_data, FIDX dest_idx, int n_sources, FIDX
* Implementation note: Currently this code uses a really stupid heap O(n) time per pop instead of O(log n), but we'll fix that soon.
* Arguments:
* dest_data where to write the sorted data
* dest_idx where to write the sorted indexes (the offsets of the rows in dest_data)
* n_sources how many source files.
* srcs_data the array of source data files.
* srcs_idx the array of source index files
* bl the brtloader.
* dest_db the destination DB (used in the comparison function).
* Return value: 0 on success, otherwise an error number.
@ -649,19 +634,17 @@ static int merge_some_files (FIDX dest_data, FIDX dest_idx, int n_sources, FIDX
//printf(" merge_some_files progress=%d fin at %d\n", bl->progress, bl->progress+progress_allocation);
// We'll use a really stupid heap: O(n) time per pop instead of O(log n), because we need to get this working soon. ???
FIDX datas[n_sources];
FIDX idxs [n_sources];
DBT keys[n_sources];
DBT vals[n_sources];
u_int64_t dataoff[n_sources];
DBT zero = {.data=0, .flags=DB_DBT_REALLOC, .size=0, .ulen=0};
for (int i=0; i<n_sources; i++) {
keys[i] = vals[i] = zero; // fill these all in with zero so we can delete stuff more reliably.
datas[i] = idxs[i] = FIDX_NULL;
datas[i] = FIDX_NULL;
}
u_int64_t n_rows = 0;
for (int i=0; i<n_sources; i++) {
datas[i] = srcs_data[i];
idxs [i] = srcs_idx[i];
int r = loader_read_row(datas[i], &keys[i], &vals[i], bl);
if (r!=0) return r;
dataoff[i] = 0;
@ -692,7 +675,7 @@ static int merge_some_files (FIDX dest_data, FIDX dest_idx, int n_sources, FIDX
}
}
{
int r = loader_write_row(&keys[mini], &vals[mini], dest_data, dest_idx, &dataoff[mini], bl);
int r = loader_write_row(&keys[mini], &vals[mini], dest_data, &dataoff[mini], bl);
if (r!=0) return r;
}
{
@ -702,7 +685,6 @@ static int merge_some_files (FIDX dest_data, FIDX dest_idx, int n_sources, FIDX
toku_free(keys[mini].data);
toku_free(vals[mini].data);
datas[mini] = datas[n_sources-1];
idxs [mini] = idxs [n_sources-1];
keys[mini] = keys[n_sources-1];
vals[mini] = vals[n_sources-1];
n_sources--;
@ -775,46 +757,37 @@ int merge_files (struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compar
FIDX *MALLOC_N(n_to_merge, datafiles);
FIDX *MALLOC_N(n_to_merge, idxfiles);
for (int i=0; i<n_to_merge; i++) datafiles[i] = idxfiles[i] = FIDX_NULL;
for (int i=0; i<n_to_merge; i++) datafiles[i] = FIDX_NULL;
for (int i=0; i<n_to_merge; i++) {
int idx = fs->n_temp_files -1 -i;
datafiles[i] = fs->data_fidxs[idx];
idxfiles [i] = fs->idx_fidxs[idx];
r = brtloader_fi_reopen(&bl->file_infos, datafiles[i], "r"); if (r) goto error;
r = brtloader_fi_reopen(&bl->file_infos, idxfiles[i], "r"); if (r) goto error;
}
FIDX merged_data, merged_idx;
r = extend_fileset(bl, &next_file_set, &merged_data, &merged_idx);
FIDX merged_data;
r = extend_fileset(bl, &next_file_set, &merged_data);
if (r!=0) goto error;
r = merge_some_files(merged_data, merged_idx, n_to_merge, datafiles, idxfiles, bl, dest_db, compare, error_callback, progress_allocation_for_this_subpass);
r = merge_some_files(merged_data, n_to_merge, datafiles, bl, dest_db, compare, error_callback, progress_allocation_for_this_subpass);
if (r!=0) goto error;
for (int i=0; i<n_to_merge; i++) {
r = brtloader_fi_close(&bl->file_infos, datafiles[i]); if (r!=0) goto error;
r = brtloader_fi_close(&bl->file_infos, idxfiles[i]); if (r!=0) goto error;
r = brtloader_fi_unlink(&bl->file_infos, datafiles[i]); if (r!=0) goto error;
r = brtloader_fi_unlink(&bl->file_infos, idxfiles[i]); if (r!=0) goto error;
}
fs->n_temp_files -= n_to_merge;
r = brtloader_fi_close(&bl->file_infos, merged_data); assert(r==0);
r = brtloader_fi_close(&bl->file_infos, merged_idx); assert(r==0);
toku_free(datafiles);
toku_free(idxfiles);
if (0) {
error:
toku_free(fs->data_fidxs);
toku_free(fs->idx_fidxs);
toku_free(datafiles);
toku_free(idxfiles);
return r;
}
}
assert(fs->n_temp_files==0);
toku_free(fs->data_fidxs);
toku_free(fs->idx_fidxs);
*fs = next_file_set;
// Update the progress
@ -941,7 +914,6 @@ static int loader_do_i (BRTLOADER bl,
if (r) goto error;
r = brtloader_fi_unlink(&bl->file_infos, fs.data_fidxs[0]);
if (r) goto error;
r = brtloader_fi_unlink(&bl->file_infos, fs.idx_fidxs[0]);
assert(expect_progress_at_end == bl->progress);
@ -950,7 +922,6 @@ static int loader_do_i (BRTLOADER bl,
toku_free(rows.data);
toku_free(rows.rows);
toku_free(fs.data_fidxs);
toku_free(fs.idx_fidxs);
toku_free(skey.data);
toku_free(sval.data);
toku_free (pkey.data);
@ -988,8 +959,6 @@ int toku_brt_loader_close (BRTLOADER bl,
}
result = brtloader_fi_close (&bl->file_infos, bl->fprimary_rows); if (result) goto error;
result = brtloader_fi_unlink(&bl->file_infos, bl->fprimary_rows); if (result) goto error;
result = brtloader_fi_close (&bl->file_infos, bl->fprimary_idx); if (result) goto error;
result = brtloader_fi_unlink(&bl->file_infos, bl->fprimary_idx); if (result) goto error;
assert(bl->file_infos.n_files_open == 0);
assert(bl->file_infos.n_files_extant == 0);
assert(bl->progress == PROGRESS_MAX);

View file

@ -169,10 +169,6 @@ static void test_read_write_rows (char *template) {
r = brtloader_open_temp_file(&bl, &file);
CKERR(r);
FIDX idx;
r = brtloader_open_temp_file(&bl, &idx);
CKERR(r);
u_int64_t dataoff=0;
char *keystrings[] = {"abc", "b", "cefgh"};
@ -181,7 +177,7 @@ static void test_read_write_rows (char *template) {
for (int i=0; i<3; i++) {
DBT key = {.size=strlen(keystrings[i]), .data=keystrings[i]};
DBT val = {.size=strlen(valstrings[i]), .data=valstrings[i]};
r = loader_write_row(&key, &val, file, idx, &dataoff, &bl);
r = loader_write_row(&key, &val, file, &dataoff, &bl);
CKERR(r);
actual_size+=key.size + val.size + 8;
}
@ -212,13 +208,9 @@ static void test_read_write_rows (char *template) {
}
r = brtloader_fi_close(&bl.file_infos, file);
CKERR(r);
r = brtloader_fi_close(&bl.file_infos, idx);
CKERR(r);
r = brtloader_fi_unlink(&bl.file_infos, file);
CKERR(r);
r = brtloader_fi_unlink(&bl.file_infos, idx);
CKERR(r);
assert(bl.file_infos.n_files_open==0);
assert(bl.file_infos.n_files_extant==0);
@ -264,7 +256,7 @@ static void test_merge_files (char *template) {
assert(fs.n_temp_files==2 && fs.n_temp_files_limit >= fs.n_temp_files);
destroy_rowset(&aset);
destroy_rowset(&bset);
for (int i=0; i<2; i++) assert(fs.data_fidxs[i].idx != -1 && fs.idx_fidxs[i].idx != -1);
for (int i=0; i<2; i++) assert(fs.data_fidxs[i].idx != -1);
r = merge_files(&fs, &bl, dest_db, compare_ints, &cb, 0); CKERR(r);
@ -284,8 +276,6 @@ static void test_merge_files (char *template) {
CKERR(r);
r = brtloader_fi_unlink(&bl.file_infos, fs.data_fidxs[0]);
CKERR(r);
r = brtloader_fi_unlink(&bl.file_infos, fs.idx_fidxs[0]);
CKERR(r);
destroy_merge_fileset(&fs);
brtloader_fi_destroy(&bl.file_infos, FALSE);