From 788b96e4079295166daeda11ee2009afaa83b92b Mon Sep 17 00:00:00 2001 From: "Bradley C. Kuszmaul" Date: Tue, 16 Apr 2013 23:59:03 -0400 Subject: [PATCH] Merge 2216b onto main line. Refs #2216. [t:2216] {{{ svn merge -r18206:18672 https://svn.tokutek.com/tokudb/toku/tokudb.2216b svn merge -r 18738:18746 https://svn.tokutek.com/tokudb/toku/tokudb.2216b }}} . git-svn-id: file:///svn/toku/tokudb@18749 c7de825b-a66e-492c-adef-691d508d4ae1 --- buildheader/db.h_4_1 | 2 +- buildheader/db.h_4_3 | 2 +- buildheader/db.h_4_4 | 2 +- buildheader/db.h_4_5 | 2 +- buildheader/db.h_4_6 | 2 +- buildheader/make_db_h.c | 2 +- buildheader/tdb.h | 2 +- include/db.h | 2 +- newbrt/brtloader-internal.h | 74 +++- newbrt/brtloader.c | 575 ++++++++++++++++++++++-------- newbrt/brtloader.h | 6 +- newbrt/tests/brtloader-test.c | 168 +++++---- src/loader.c | 32 +- src/loader.h | 2 +- src/tests/Makefile | 1 + src/tests/loader-dup-test.c | 378 ++++++++++++++++++++ src/tests/loader-reference-test.c | 5 +- src/tests/loader-stress-test.c | 5 +- src/ydb.c | 9 +- src/ydb_load.h | 7 + 20 files changed, 1013 insertions(+), 265 deletions(-) create mode 100644 src/tests/loader-dup-test.c diff --git a/buildheader/db.h_4_1 b/buildheader/db.h_4_1 index d58c65df016..95801e8ad3a 100644 --- a/buildheader/db.h_4_1 +++ b/buildheader/db.h_4_1 @@ -50,7 +50,7 @@ typedef struct __toku_loader DB_LOADER; struct __toku_loader_internal; struct __toku_loader { struct __toku_loader_internal *i; - int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra)); /* set the error callback */ + int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */ int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */ int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */ int (*close)(DB_LOADER *loader); /* finish loading, free memory */ diff --git a/buildheader/db.h_4_3 b/buildheader/db.h_4_3 index 4139c08b56b..9a0d869bc7b 100644 --- a/buildheader/db.h_4_3 +++ b/buildheader/db.h_4_3 @@ -50,7 +50,7 @@ typedef struct __toku_loader DB_LOADER; struct __toku_loader_internal; struct __toku_loader { struct __toku_loader_internal *i; - int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra)); /* set the error callback */ + int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */ int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */ int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */ int (*close)(DB_LOADER *loader); /* finish loading, free memory */ diff --git a/buildheader/db.h_4_4 b/buildheader/db.h_4_4 index ac794750757..04dc0c135c8 100644 --- a/buildheader/db.h_4_4 +++ b/buildheader/db.h_4_4 @@ -50,7 +50,7 @@ typedef struct __toku_loader DB_LOADER; struct __toku_loader_internal; struct __toku_loader { struct __toku_loader_internal *i; - int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra)); /* set the error callback */ + int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */ int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */ int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */ int (*close)(DB_LOADER *loader); /* finish loading, free memory */ diff --git a/buildheader/db.h_4_5 b/buildheader/db.h_4_5 index 2557b95a3c0..3bc47c6cb83 100644 --- a/buildheader/db.h_4_5 +++ b/buildheader/db.h_4_5 @@ -50,7 +50,7 @@ typedef struct __toku_loader DB_LOADER; struct __toku_loader_internal; struct __toku_loader { struct __toku_loader_internal *i; - int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra)); /* set the error callback */ + int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */ int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */ int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */ int (*close)(DB_LOADER *loader); /* finish loading, free memory */ diff --git a/buildheader/db.h_4_6 b/buildheader/db.h_4_6 index 11584aa487f..1cab58f602a 100644 --- a/buildheader/db.h_4_6 +++ b/buildheader/db.h_4_6 @@ -50,7 +50,7 @@ typedef struct __toku_loader DB_LOADER; struct __toku_loader_internal; struct __toku_loader { struct __toku_loader_internal *i; - int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra)); /* set the error callback */ + int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */ int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */ int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */ int (*close)(DB_LOADER *loader); /* finish loading, free memory */ diff --git a/buildheader/make_db_h.c b/buildheader/make_db_h.c index 99594fb892c..a2a23ed529a 100644 --- a/buildheader/make_db_h.c +++ b/buildheader/make_db_h.c @@ -404,7 +404,7 @@ int main (int argc __attribute__((__unused__)), char *const argv[] __attribute__ printf("struct __toku_loader_internal;\n"); printf("struct __toku_loader {\n"); printf(" struct __toku_loader_internal *i;\n"); - printf(" int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra)); /* set the error callback */\n"); + printf(" int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */\n"); printf(" int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */\n"); printf(" int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */\n"); printf(" int (*close)(DB_LOADER *loader); /* finish loading, free memory */\n"); diff --git a/buildheader/tdb.h b/buildheader/tdb.h index de79495253f..808e0a92a23 100644 --- a/buildheader/tdb.h +++ b/buildheader/tdb.h @@ -50,7 +50,7 @@ typedef struct __toku_loader DB_LOADER; struct __toku_loader_internal; struct __toku_loader { struct __toku_loader_internal *i; - int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra)); /* set the error callback */ + int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */ int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */ int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */ int (*close)(DB_LOADER *loader); /* finish loading, free memory */ diff --git a/include/db.h b/include/db.h index ac918fe9409..27187db0cf2 100644 --- a/include/db.h +++ b/include/db.h @@ -50,7 +50,7 @@ typedef struct __toku_loader DB_LOADER; struct __toku_loader_internal; struct __toku_loader { struct __toku_loader_internal *i; - int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra)); /* set the error callback */ + int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */ int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */ int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */ int (*close)(DB_LOADER *loader); /* finish loading, free memory */ diff --git a/newbrt/brtloader-internal.h b/newbrt/brtloader-internal.h index deff83da798..880e5e2bbab 100644 --- a/newbrt/brtloader-internal.h +++ b/newbrt/brtloader-internal.h @@ -4,7 +4,24 @@ /* These functions are exported to allow the tests to compile. */ -int brtloader_open_temp_file (BRTLOADER bl, FILE **filep, char **fnamep); +/* These structures maintain a collection of all the open temporary files used by the loader. */ +struct file_info { + BOOL is_open; + BOOL is_extant; // if true, the file must be unlinked. + char *fname; + FILE *file; +}; +struct file_infos { + int n_files; + int n_files_limit; + struct file_info *file_infos; + int n_files_open, n_files_extant; +}; +typedef struct fidx { int idx; } FIDX; +static const FIDX FIDX_NULL __attribute__((__unused__)) = {-1}; + +int brtloader_open_temp_file (BRTLOADER bl, FIDX*file_idx); + struct brtloader_s { int panic; @@ -20,15 +37,17 @@ struct brtloader_s { const char **new_fnames_in_env; // the file names that the final data will be written to (relative to env). const char *temp_file_template; - FILE *fprimary_rows; char *fprimary_rows_name; - FILE *fprimary_idx; char *fprimary_idx_name; + FIDX fprimary_rows; // the file index (in the file_infos) for the data + FIDX fprimary_idx; // the file index for the index u_int64_t fprimary_offset; CACHETABLE cachetable; + /* To make it easier to recover from errors, we don't use FILE*, instead we use an index into the file_infos. */ + struct file_infos file_infos; }; /* These data structures are used for manipulating a collection of rows in main memory. */ struct row { - char *data; + size_t off; // the offset in the data array. int klen,vlen; }; struct rowset { @@ -38,25 +57,42 @@ struct rowset { char *data; }; -void init_rowset (struct rowset *rows); +int init_rowset (struct rowset *rows); void destroy_rowset (struct rowset *rows); void add_row (struct rowset *rows, DBT *key, DBT *val); -int loader_write_row(DBT *key, DBT *val, FILE *data, FILE *idx, u_int64_t *dataoff, BRTLOADER bl); -int loader_read_row (FILE *f, DBT *key, DBT *val, BRTLOADER bl); +int loader_write_row(DBT *key, DBT *val, FIDX data, FIDX idx, u_int64_t *dataoff, BRTLOADER bl); +int loader_read_row (FIDX f, DBT *key, DBT *val, BRTLOADER bl); -void merge (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn, - DB *dest_db, brt_compare_func); -void mergesort_row_array (struct row rows[/*n*/], int n, DB *dest_db, brt_compare_func); - -struct fileset { - int n_temp_files, n_temp_files_limit; - char **temp_data_names; - char **temp_idx_names; +struct error_callback_s { + void (*error_callback)(DB *, int which_db, int err, DBT *key, DBT *val, void *extra); + DB *db; + int which_db; + void *extra; }; -void init_fileset (struct fileset *fs); +int merge (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn, + DB *dest_db, brt_compare_func, + struct error_callback_s *, + struct rowset *); +int mergesort_row_array (struct row rows[/*n*/], int n, DB *dest_db, brt_compare_func, struct error_callback_s *, struct rowset *); -int sort_and_write_rows (struct rowset *rows, struct fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func); -int merge_files (struct fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func); -int write_file_to_dbfile (int outfile, FILE *infile, BRTLOADER bl, const struct descriptor *descriptor); +struct merge_fileset { + int n_temp_files, n_temp_files_limit; + FIDX *data_fidxs; + FIDX *idx_fidxs; +}; + +void init_merge_fileset (struct merge_fileset *fs); +void destroy_merge_fileset (struct merge_fileset *fs); + +int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func, + struct error_callback_s *error_callback); +int merge_files (struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func, struct error_callback_s *); +int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct descriptor *descriptor); + +int brtloader_init_file_infos (struct file_infos *fi); +void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error); +int brtloader_fi_close (struct file_infos *fi, FIDX idx); +int brtloader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode); +int brtloader_fi_unlink (struct file_infos *fi, FIDX idx); diff --git a/newbrt/brtloader.c b/newbrt/brtloader.c index 5d0b804b02e..fc6a089ea5d 100644 --- a/newbrt/brtloader.c +++ b/newbrt/brtloader.c @@ -16,9 +16,119 @@ #include "brtloader-internal.h" #include "brt-internal.h" -int brtloader_open_temp_file (BRTLOADER bl, FILE **filep, char **fnamep) +static size_t (*os_fwrite_fun)(const void *,size_t,size_t,FILE*)=NULL; +void brtloader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)) { + os_fwrite_fun=fwrite_fun; +} +static size_t do_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *stream) { + if (os_fwrite_fun) { + return os_fwrite_fun(ptr, size, nmemb, stream); + } else { + return fwrite(ptr, size, nmemb, stream); + } +} + + +int brtloader_init_file_infos (struct file_infos *fi) { + fi->n_files = 0; + fi->n_files_limit = 1; + fi->n_files_open = 0; + fi->n_files_extant = 0; + MALLOC_N(fi->n_files_limit, fi->file_infos); + if (fi->n_files_limit) return 0; + else return errno; +} + +void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error) +// Effect: Free the resources in the fi. +// If is_error then we close and unlink all the temp files. +// If !is_error then requires that all the temp files have been closed and destroyed +// No error codes are returned. If anything goes wrong with closing and unlinking then it's only in an is_error case, so we don't care. +{ + if (!is_error) { + assert(fi->n_files_open==0); + assert(fi->n_files_extant==0); + } + for (int i=0; in_files; i++) { + if (fi->file_infos[i].is_open) { + assert(is_error); + fclose(fi->file_infos[i].file); // don't check for errors, since we are in an error case. + } + if (fi->file_infos[i].is_extant) { + assert(is_error); + unlink(fi->file_infos[i].fname); + toku_free(fi->file_infos[i].fname); + } + } + toku_free(fi->file_infos); + fi->n_files=0; + fi->n_files_limit=0; + fi->file_infos = NULL; +} + +static void open_file_add (struct file_infos *fi, + FILE *file, + char *fname, + /* out */ FIDX *idx) +{ + if (fi->n_files >= fi->n_files_limit) { + fi->n_files_limit *=2; + XREALLOC_N(fi->n_files_limit, fi->file_infos); + } + assert(fi->n_files < fi->n_files_limit); + fi->file_infos[fi->n_files].is_open = TRUE; + fi->file_infos[fi->n_files].is_extant = TRUE; + fi->file_infos[fi->n_files].fname = fname; + fi->file_infos[fi->n_files].file = file; + idx->idx = fi->n_files; + fi->n_files++; + fi->n_files_extant++; + fi->n_files_open++; +} + +int brtloader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode) { + int i = idx.idx; + assert(i>=0 && in_files); + assert(!fi->file_infos[i].is_open); + assert(fi->file_infos[i].is_extant); + fi->file_infos[i].file = fopen(fi->file_infos[i].fname, mode); + if (fi->file_infos[i].file==NULL) return errno; + fi->file_infos[i].is_open = TRUE; + fi->n_files_open++; + return 0; +} + +int brtloader_fi_close (struct file_infos *fi, FIDX idx) +{ + assert(fi->n_files_open>0); + fi->n_files_open--; + assert(idx.idx >=0 && idx.idx < fi->n_files); + assert(fi->file_infos[idx.idx].is_open); + fi->file_infos[idx.idx].is_open = FALSE; + int r = fclose(fi->file_infos[idx.idx].file); + if (r!=0) return errno; + else return 0; +} + +int brtloader_fi_unlink (struct file_infos *fi, FIDX idx) { + assert(fi->n_files_extant>0); + fi->n_files_extant--; + int id = idx.idx; + assert(id >=0 && id < fi->n_files); + assert(!fi->file_infos[id].is_open); // must be closed before we unlink + assert(fi->file_infos[id].is_extant); // must still exist + fi->file_infos[id].is_extant = FALSE; + int r = unlink(fi->file_infos[id].fname); if (r!=0) r=errno; + toku_free(fi->file_infos[id].fname); + fi->file_infos[id].fname = NULL; + return r; +} + +int brtloader_open_temp_file (BRTLOADER bl, FIDX *file_idx) /* Effect: Open a temporary file in read-write mode. Save enough information to close and delete the file later. * Return value: 0 on success, an error number otherwise. + * On error, *file_idx and *fnamep will be unmodified. + * The open file will be saved in bl->file_infos so that even if errors happen we can free them all. */ { char *fname = toku_strdup(bl->temp_file_template); @@ -26,11 +136,27 @@ int brtloader_open_temp_file (BRTLOADER bl, FILE **filep, char **fnamep) if (fd<0) { int r = errno; toku_free(fname); return r; } FILE *f = fdopen(fd, "r+"); if (f==NULL) { int r = errno; toku_free(fname); close(fd); return r; } - *filep = f; - *fnamep = fname; + open_file_add(&bl->file_infos, f, fname, file_idx); + + static int counter=0; + //fprintf(stderr, "%s:%d %d: %s\n", __FILE__, __LINE__, counter, fname); + counter++; return 0; } +static void brtloader_destroy (BRTLOADER bl, BOOL is_error) { + // These frees rely on the fact that if you free a NULL pointer then nothing bad happens. + toku_free(bl->dbs); + toku_free(bl->descriptors); + for (int i=0; iN; i++) { + if (bl->new_fnames_in_env) toku_free((char*)bl->new_fnames_in_env[i]); + } + toku_free(bl->new_fnames_in_env); + toku_free(bl->bt_compare_funs); + toku_free((char*)bl->temp_file_template); + brtloader_fi_destroy(&bl->file_infos, is_error); +} + int toku_brt_loader_open (/* out */ BRTLOADER *blp, CACHETABLE cachetable, generate_row_for_put_func g, @@ -52,7 +178,9 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp, * Return value: 0 on success, an error number otherwise. */ { - BRTLOADER XCALLOC(bl); + BRTLOADER CALLOC(bl); // initialized to all zeros (hence CALLOC) + if (!bl) return errno; + bl->panic = 0; bl->panic_errno = 0; @@ -61,26 +189,38 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp, bl->src_db = src_db; bl->N = N; - MALLOC_N(N, bl->dbs); + +#define MY_CALLOC_N(n,v) CALLOC_N(n,v); if (!v) { int r = errno; brtloader_destroy(bl, TRUE); return r; } +#define MY_STRDUP(s) ({ char *v = toku_strdup(s); if (!v) { int r = errno; brtloader_destroy(bl, TRUE); return r; } v; }) + + MY_CALLOC_N(N, bl->dbs); for (int i=0; idbs[i]=dbs[i]; - MALLOC_N(N, bl->descriptors); + MY_CALLOC_N(N, bl->descriptors); for (int i=0; idescriptors[i]=descriptors[i]; - MALLOC_N(N, bl->new_fnames_in_env); - for (int i=0; inew_fnames_in_env[i] = toku_strdup(new_fnames_in_env[i]); - MALLOC_N(N, bl->bt_compare_funs); + MY_CALLOC_N(N, bl->new_fnames_in_env); + for (int i=0; inew_fnames_in_env[i] = MY_STRDUP(new_fnames_in_env[i]); + MY_CALLOC_N(N, bl->bt_compare_funs); for (int i=0; ibt_compare_funs[i] = bt_compare_functions[i]; - bl->temp_file_template = toku_strdup(temp_file_template); - bl->fprimary_rows = bl->fprimary_idx = NULL; - { int r = brtloader_open_temp_file(bl, &bl->fprimary_rows, &bl->fprimary_rows_name); if (r!=0) return r; } - { int r = brtloader_open_temp_file(bl, &bl->fprimary_idx, &bl->fprimary_idx_name); if (r!=0) return r; } + brtloader_init_file_infos(&bl->file_infos); + + bl->temp_file_template = MY_STRDUP(temp_file_template); + bl->fprimary_rows = bl->fprimary_idx = FIDX_NULL; + { int r = brtloader_open_temp_file(bl, &bl->fprimary_rows); if (r!=0) return r; } + { int r = brtloader_open_temp_file(bl, &bl->fprimary_idx); if (r!=0) return r; } bl->fprimary_offset = 0; *blp = bl; return 0; + } +static FILE *bl_fidx2file (BRTLOADER bl, FIDX i) { + assert(i.idx >=0 && i.idx < bl->file_infos.n_files); + assert(bl->file_infos.file_infos[i.idx].is_open); + return bl->file_infos.file_infos[i.idx].file; +} -static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOADER bl) +static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOADER bl) /* Effect: this is a wrapper for fwrite that returns 0 on success, otherwise returns an error number. * Arguments: * ptr the data to be writen. @@ -91,7 +231,8 @@ static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOAD * Return value: 0 on success, an error number otherwise. */ { - size_t r = fwrite(ptr, size, nmemb, stream); + FILE *stream = bl_fidx2file(bl, streami); + size_t r = do_fwrite(ptr, size, nmemb, stream); if (r!=nmemb) { int e = ferror(stream); assert(e!=0); @@ -102,7 +243,7 @@ static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOAD return 0; } -static int bl_fread (void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOADER bl) +static int bl_fread (void *ptr, size_t size, size_t nmemb, FIDX streami, BRTLOADER bl) /* Effect: this is a wrapper for fread that returns 0 on success, otherwise returns an error number. * Arguments: * ptr read data into here. @@ -113,6 +254,7 @@ static int bl_fread (void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOAD * Return value: 0 on success, an error number otherwise. */ { + FILE *stream = bl_fidx2file(bl, streami); size_t r = fread(ptr, size, nmemb, stream); if (r==0) { if (feof(stream)) return EOF; @@ -130,7 +272,7 @@ static int bl_fread (void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOAD } } -static int bl_write_dbt (DBT *dbt, FILE *datafile, uint64_t *dataoff, BRTLOADER bl) +static int bl_write_dbt (DBT *dbt, FIDX datafile, uint64_t *dataoff, BRTLOADER bl) { int r; int dlen = dbt->size; @@ -141,7 +283,7 @@ static int bl_write_dbt (DBT *dbt, FILE *datafile, uint64_t *dataoff, BRTLOADER return 0; } -static int bl_read_dbt (/*in*/DBT *dbt, FILE *datafile, BRTLOADER bl) +static int bl_read_dbt (/*in*/DBT *dbt, FIDX datafile, BRTLOADER bl) { int len; { @@ -158,7 +300,7 @@ static int bl_read_dbt (/*in*/DBT *dbt, FILE *datafile, BRTLOADER bl) return 0; } -int loader_write_row(DBT *key, DBT *val, FILE *data, FILE *idx, u_int64_t *dataoff, BRTLOADER bl) +int loader_write_row(DBT *key, DBT *val, FIDX data, FIDX idx, u_int64_t *dataoff, BRTLOADER bl) /* Effect: Given a key and a val (both DBTs), write them to a file. Increment *dataoff so that it's up to date. * Arguments: * key, val write these. @@ -189,7 +331,7 @@ int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val) return loader_write_row(key, val, bl->fprimary_rows, bl->fprimary_idx, &bl->fprimary_offset, bl); } -int loader_read_row (FILE *f, DBT *key, DBT *val, BRTLOADER bl) +int loader_read_row (FIDX f, DBT *key, DBT *val, BRTLOADER bl) /* Effect: Read a key value pair from a file. The DBTs must have DB_DBT_REALLOC set. * Arguments: * f where to read it from. @@ -211,21 +353,41 @@ int loader_read_row (FILE *f, DBT *key, DBT *val, BRTLOADER bl) } -void init_rowset (struct rowset *rows) +// 1024 is the right number for production. +//#define SIZE_FACTOR 1 +#define SIZE_FACTOR 1024 + + +int init_rowset (struct rowset *rows) /* Effect: Initialize a collection of rows to be empty. */ { + rows->rows = NULL; + rows->data = NULL; + rows->n_rows = 0; rows->n_rows_limit = 100; MALLOC_N(rows->n_rows_limit, rows->rows); rows->n_bytes = 0; - rows->n_bytes_limit = 1024*1024*16; + rows->n_bytes_limit = 1024*SIZE_FACTOR*16; rows->data = toku_malloc(rows->n_bytes_limit); + + if (rows->rows==NULL || rows->data==NULL) { + int r = errno; + toku_free(rows->rows); + toku_free(rows->data); + rows->rows = NULL; + rows->data = NULL; + return r; + } else { + return 0; + } } + void destroy_rowset (struct rowset *rows) { toku_free(rows->data); toku_free(rows->rows); } -const size_t data_buffer_limit = 1024*1024*16; +const size_t data_buffer_limit = 1024*SIZE_FACTOR*64; static int row_wont_fit (struct rowset *rows, size_t size) /* Effect: Return nonzero if adding a row of size SIZE would be too big (bigger than the buffer limit) */ { @@ -246,7 +408,7 @@ void add_row (struct rowset *rows, DBT *key, DBT *val) } size_t off = rows->n_bytes; size_t next_off = off + key->size + val->size; - struct row newrow = {.data = rows->data+off, + struct row newrow = {.off = off, .klen = key->size, .vlen = val->size}; rows->rows[rows->n_rows++] = newrow; @@ -254,6 +416,7 @@ void add_row (struct rowset *rows, DBT *key, DBT *val) while (next_off > rows->n_bytes_limit) { rows->n_bytes_limit = rows->n_bytes_limit*2; } + assert(next_off <= rows->n_bytes_limit); REALLOC_N(rows->n_bytes_limit, rows->data); } memcpy(rows->data+off, key->data, key->size); @@ -261,8 +424,10 @@ void add_row (struct rowset *rows, DBT *key, DBT *val) rows->n_bytes = next_off; } -void merge (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn, - DB *dest_db, brt_compare_func compare) +int merge (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn, + DB *dest_db, brt_compare_func compare, + struct error_callback_s *error_callback, + struct rowset *rowset) /* Effect: Given two arrays of rows, a and b, merge them using the comparison function, and writ them into dest. * This function is suitable for use in a mergesort. * Arguments: @@ -274,9 +439,20 @@ void merge (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row */ { while (an>0 && bn>0) { - DBT akey = {.data=a->data, .size=a->klen}; - DBT bkey = {.data=b->data, .size=b->klen}; - if (compare(dest_db, &akey, &bkey)<0) { + DBT akey = {.data=rowset->data+a->off, .size=a->klen}; + DBT bkey = {.data=rowset->data+b->off, .size=b->klen}; + int compare_result = compare(dest_db, &akey, &bkey); + if (compare_result==0) { + if (error_callback->error_callback) { + DBT aval = {.data=rowset->data + a->off + a->klen, .size = a->vlen}; + error_callback->error_callback(error_callback->db, error_callback->which_db, + DB_KEYEXIST, + &akey, &aval, + error_callback->extra + ); + return DB_KEYEXIST; + } + } else if (compare_result<0) { // a is smaller *dest = *a; dest++; a++; an--; @@ -293,9 +469,10 @@ void merge (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row *dest = *b; dest++; b++; bn--; } + return 0; } -void mergesort_row_array (struct row rows[/*n*/], int n, DB *dest_db, brt_compare_func compare) +int mergesort_row_array (struct row rows[/*n*/], int n, DB *dest_db, brt_compare_func compare, struct error_callback_s *error_callback, struct rowset *rowset) /* Sort an array of rows (using mergesort). * Arguments: * rows sort this array of rows. @@ -304,37 +481,65 @@ void mergesort_row_array (struct row rows[/*n*/], int n, DB *dest_db, brt_compar * compare the compare function */ { - if (n<=1) return; // base case is sorted + if (n<=1) return 0; // base case is sorted int mid = n/2; - mergesort_row_array (rows, mid, dest_db, compare); - mergesort_row_array (rows+mid, n-mid, dest_db, compare); + { + int r = mergesort_row_array (rows, mid, dest_db, compare, error_callback, rowset); + if (r!=0) return r; + } + { + int r = mergesort_row_array (rows+mid, n-mid, dest_db, compare, error_callback, rowset); + if (r!=0) return r; + } struct row *MALLOC_N(n, tmp); - merge(tmp, rows, mid, rows+mid, n-mid, dest_db, compare); + { + int r = merge(tmp, rows, mid, rows+mid, n-mid, dest_db, compare, error_callback, rowset); + if (r!=0) { + toku_free(tmp); + return r; + } + } memcpy(rows, tmp, sizeof(*tmp)*n); toku_free(tmp); + return 0; } -static void sort_rows (struct rowset *rows, DB *dest_db, brt_compare_func compare) +static int sort_rows (struct rowset *rows, DB *dest_db, brt_compare_func compare, + struct error_callback_s *error_callback) /* Effect: Sort a collection of rows. + * If any duplicates are found, then call the error_callback function and return non zero. + * Otherwise return 0. * Arguments: * rowset the */ { - mergesort_row_array(rows->rows, rows->n_rows, dest_db, compare); + return mergesort_row_array(rows->rows, rows->n_rows, dest_db, compare, error_callback, rows); } /* filesets Maintain a collection of files. Typically these files are each individually sorted, and we will merge them. * These files have two parts, one is for the data rows, and the other is a collection of offsets so we an more easily parallelize the manipulation (e.g., by allowing us to find the offset of the ith row quickly). */ -void init_fileset (struct fileset *fs) +void init_merge_fileset (struct merge_fileset *fs) /* Effect: Initialize a fileset */ { fs->n_temp_files = 0; fs->n_temp_files_limit = 0; - fs->temp_data_names = NULL; - fs->temp_idx_names = NULL; + fs->data_fidxs = NULL; + fs->idx_fidxs = NULL; } -static int extend_fileset (BRTLOADER bl, struct fileset *fs, FILE **ffile, FILE **fidx) +void destroy_merge_fileset (struct merge_fileset *fs) +/* Effect: Destroy a fileset. */ +{ + fs->n_temp_files = 0; + fs->n_temp_files_limit = 0; + toku_free(fs->data_fidxs); + toku_free(fs->idx_fidxs); + fs->data_fidxs = NULL; + fs->idx_fidxs = NULL; +} + + +static int extend_fileset (BRTLOADER bl, struct merge_fileset *fs, FIDX*ffile, FIDX*fidx) /* Effect: Add two files (one for data and one for idx) to the fileset. * Arguments: * bl the brtloader (needed to panic if anything goes wrong, and also to get the temp_file_template. @@ -343,19 +548,18 @@ static int extend_fileset (BRTLOADER bl, struct fileset *fs, FILE **ffile, FILE * fidx the index file (which will be open) */ { - char *sfilename, *sidxname; - FILE *sfile, *sidx; + FIDX sfile, sidx; int r; - r = brtloader_open_temp_file(bl, &sfile, &sfilename); if (r!=0) return r; - r = brtloader_open_temp_file(bl, &sidx, &sidxname); if (r!=0) return r; + r = brtloader_open_temp_file(bl, &sfile); if (r!=0) return r; + r = brtloader_open_temp_file(bl, &sidx); if (r!=0) return r; if (fs->n_temp_files+1 > fs->n_temp_files_limit) { fs->n_temp_files_limit = (fs->n_temp_files+1)*2; - REALLOC_N(fs->n_temp_files_limit, fs->temp_data_names); - REALLOC_N(fs->n_temp_files_limit, fs->temp_idx_names); + REALLOC_N(fs->n_temp_files_limit, fs->data_fidxs); + REALLOC_N(fs->n_temp_files_limit, fs->idx_fidxs); } - fs->temp_data_names[fs->n_temp_files] = sfilename; - fs->temp_idx_names [fs->n_temp_files] = sidxname; + fs->data_fidxs[fs->n_temp_files] = sfile; + fs->idx_fidxs [fs->n_temp_files] = sidx; fs->n_temp_files++; *ffile = sfile; @@ -363,7 +567,8 @@ static int extend_fileset (BRTLOADER bl, struct fileset *fs, FILE **ffile, FILE return 0; } -int sort_and_write_rows (struct rowset *rows, struct fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func compare) +int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func compare, + struct error_callback_s *error_callback) /* Effect: Given a rowset, sort it and write it to a temporary file. * Arguments: * rows the rowset @@ -374,26 +579,28 @@ int sort_and_write_rows (struct rowset *rows, struct fileset *fs, BRTLOADER bl, * Returns 0 on success, otherwise an error number. */ { - FILE *sfile, *sidx; + FIDX sfile, sidx; u_int64_t soffset=0; // TODO: erase the files, and deal with all the cleanup on error paths - int r; - sort_rows(rows, dest_db, compare); + int r = sort_rows(rows, dest_db, compare, error_callback); + if (r!=0) { + return r; + } r = extend_fileset(bl, fs, &sfile, &sidx); if (r!=0) return r; for (size_t i=0; in_rows; i++) { - DBT skey = {.data = rows->rows[i].data, .size=rows->rows[i].klen}; - DBT sval = {.data = rows->rows[i].data + rows->rows[i].klen, .size=rows->rows[i].vlen}; + DBT skey = {.data = rows->data + rows->rows[i].off, .size=rows->rows[i].klen}; + DBT sval = {.data = rows->data + rows->rows[i].off + rows->rows[i].klen, .size=rows->rows[i].vlen}; r = loader_write_row(&skey, &sval, sfile, sidx, &soffset, bl); if (r!=0) return r; } - r = fclose(sfile); if (r!=0) return errno; - r = fclose(sidx); if (r!=0) return errno; + r = brtloader_fi_close(&bl->file_infos, sfile); if (r!=0) return r; + r = brtloader_fi_close(&bl->file_infos, sidx); if (r!=0) return r; return 0; } -static int merge_some_files (FILE *dest_data, FILE *dest_idx, int n_sources, FILE *srcs_data[/*n_sources*/], FILE *srcs_idx[/*n_sources*/], BRTLOADER bl, DB *dest_db, brt_compare_func compare) +static int merge_some_files (FIDX dest_data, FIDX dest_idx, int n_sources, FIDX srcs_data[/*n_sources*/], FIDX srcs_idx[/*n_sources*/], BRTLOADER bl, DB *dest_db, brt_compare_func compare, struct error_callback_s *error_callback) /* Effect: Given an array of FILE*'s each containing sorted, merge the data and write it to dest. All the files remain open after the merge. * This merge is performed in one pass, so don't pass too many files in. If you need a tree of merges do it elsewhere. * Modifies: May modify the arrays of files (but if modified, it must be a permutation so the caller can use that array to close everything.) @@ -411,17 +618,19 @@ static int merge_some_files (FILE *dest_data, FILE *dest_idx, int n_sources, FIL */ { // We'll use a really stupid heap: O(n) time per pop instead of O(log n), because we need to get this working soon. ??? - FILE *datas[n_sources]; - FILE *idxs [n_sources]; + FIDX datas[n_sources]; + FIDX idxs [n_sources]; DBT keys[n_sources]; DBT vals[n_sources]; u_int64_t dataoff[n_sources]; DBT zero = {.data=0, .flags=DB_DBT_REALLOC, .size=0, .ulen=0}; + for (int i=0; i0) { int mini=0; for (int j=1; j0) { + int compare_result = compare(dest_db, &keys[mini], &keys[j]); + if (compare_result==0) { + if (error_callback->error_callback) { + error_callback->error_callback(error_callback->db, error_callback->which_db, + DB_KEYEXIST, + &keys[mini], &vals[mini], + error_callback->extra + ); + for (int i=0; i0) { mini=j; } } @@ -440,7 +664,7 @@ static int merge_some_files (FILE *dest_data, FILE *dest_idx, int n_sources, FIL { int r = loader_read_row(datas[mini], &keys[mini], &vals[mini], bl); if (r!=0) { - if (feof(datas[mini])) { + if (feof(bl_fidx2file(bl, datas[mini]))) { toku_free(keys[mini].data); toku_free(vals[mini].data); datas[mini] = datas[n_sources-1]; @@ -449,7 +673,7 @@ static int merge_some_files (FILE *dest_data, FILE *dest_idx, int n_sources, FIL vals[mini] = vals[n_sources-1]; n_sources--; } else { - r = ferror(datas[mini]); + r = ferror(bl_fidx2file(bl, datas[mini])); assert(r!=0); return r; } @@ -465,49 +689,59 @@ static int int_min (int a, int b) else return b; } -int merge_files (struct fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func compare) +int merge_files (struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func compare, struct error_callback_s *error_callback) /* Effect: Given a fileset, merge all the files into one file. At the end the fileset will have one file in it. * All the other files will be closed and unlinked. * Return value: 0 on success, otherwise an error number. + * On error *fs will contain no open files. All the files (including any temporary files) will be closed and unlinked. + * (however the fs will still need to be deallocated.) */ { + int r = 0; while (fs->n_temp_files!=1) { assert(fs->n_temp_files>0); - struct fileset next_file_set; - init_fileset(&next_file_set); + struct merge_fileset next_file_set; + init_merge_fileset(&next_file_set); while (fs->n_temp_files>0) { // grab some files and merge them. const int mergelimit = 256; int n_to_merge = int_min(mergelimit, fs->n_temp_files); - FILE **MALLOC_N(n_to_merge, datafiles); - FILE **MALLOC_N(n_to_merge, idxfiles); + FIDX *MALLOC_N(n_to_merge, datafiles); + FIDX *MALLOC_N(n_to_merge, idxfiles); + for (int i=0; in_temp_files -1 -i; - datafiles[i] = fopen(fs->temp_data_names[idx], "r"); if (datafiles[i]==NULL) return errno; - idxfiles[i] = fopen(fs->temp_idx_names [idx], "r"); if (idxfiles[i]==NULL) return errno; + datafiles[i] = fs->data_fidxs[idx]; + idxfiles [i] = fs->idx_fidxs[idx]; + r = brtloader_fi_reopen(&bl->file_infos, datafiles[i], "r"); if (r) goto error; + r = brtloader_fi_reopen(&bl->file_infos, idxfiles[i], "r"); if (r) goto error; } - FILE *merged_data, *merged_idx; - int r; - r = extend_fileset(bl, &next_file_set, &merged_data, &merged_idx); if (r!=0) return r; - r = merge_some_files(merged_data, merged_idx, n_to_merge, datafiles, idxfiles, bl, dest_db, compare); if (r!=0) return r; + FIDX merged_data, merged_idx; + r = extend_fileset(bl, &next_file_set, &merged_data, &merged_idx); if (r!=0) goto error; + r = merge_some_files(merged_data, merged_idx, n_to_merge, datafiles, idxfiles, bl, dest_db, compare, error_callback); if (r!=0) goto error; for (int i=0; in_temp_files -1 -i; - r = fclose(datafiles[i]); if (r!=0) return errno; - r = fclose(idxfiles[i]); if (r!=0) return errno; - r = unlink(fs->temp_data_names[idx]); if (r!=0) return errno; - r = unlink(fs->temp_idx_names [idx]); if (r!=0) return errno; - toku_free(fs->temp_data_names[idx]); - toku_free(fs->temp_idx_names[idx]); + r = brtloader_fi_close(&bl->file_infos, datafiles[i]); if (r!=0) goto error; + r = brtloader_fi_close(&bl->file_infos, idxfiles[i]); if (r!=0) goto error; + r = brtloader_fi_unlink(&bl->file_infos, datafiles[i]); if (r!=0) goto error; + r = brtloader_fi_unlink(&bl->file_infos, idxfiles[i]); if (r!=0) goto error; } fs->n_temp_files -= n_to_merge; - r = fclose(merged_data); assert(r==0); - r = fclose(merged_idx); assert(r==0); + r = brtloader_fi_close(&bl->file_infos, merged_data); assert(r==0); + r = brtloader_fi_close(&bl->file_infos, merged_idx); assert(r==0); toku_free(datafiles); toku_free(idxfiles); + if (0) { + error: + toku_free(fs->data_fidxs); + toku_free(fs->idx_fidxs); + toku_free(datafiles); + toku_free(idxfiles); + return r; + } } assert(fs->n_temp_files==0); - toku_free(fs->temp_data_names); - toku_free(fs->temp_idx_names); + toku_free(fs->data_fidxs); + toku_free(fs->idx_fidxs); *fs = next_file_set; } return 0; @@ -517,26 +751,35 @@ static int loader_do_i (BRTLOADER bl, DB *dest_db, brt_compare_func compare, const struct descriptor *descriptor, - const char *new_fname) + const char *new_fname, + int which_db, + void (*error_callback)(DB *, int which_db, int err, DBT *key, DBT *val, void *extra), + void *error_callback_extra + ) /* Effect: Handle the file creating for one particular DB in the bulk loader. */ { - int r = fseek(bl->fprimary_rows, 0, SEEK_SET); - assert(r==0); + int r = fseek(bl_fidx2file(bl, bl->fprimary_rows), 0, SEEK_SET); + if (r!=0) return errno; DBT pkey={.data=0, .flags=DB_DBT_REALLOC, .size=0, .ulen=0}; DBT pval=pkey; DBT skey=pkey; DBT sval=pkey; + struct merge_fileset fs; + init_merge_fileset(&fs); struct rowset rows; - init_rowset(&rows); - struct fileset fs; - init_fileset(&fs); + r = init_rowset(&rows); + if (r!=0) return r; + struct error_callback_s ec = {.error_callback = error_callback, + .db = dest_db, + .which_db = which_db, + .extra = error_callback_extra}; while (0==(r=loader_read_row(bl->fprimary_rows, &pkey, &pval, bl))) { r = bl->generate_row_for_put(dest_db, bl->src_db, &skey, &sval, &pkey, &pval, NULL); assert(r==0); if (row_wont_fit(&rows, skey.size + sval.size)) { - r = sort_and_write_rows(&rows, &fs, bl, dest_db, compare); - if (r!=0) return r; + r = sort_and_write_rows(&rows, &fs, bl, dest_db, compare, &ec); + if (r!=0) goto error; reset_rows(&rows); } add_row(&rows, &skey, &sval); @@ -554,64 +797,84 @@ static int loader_do_i (BRTLOADER bl, sval.flags = DB_DBT_REALLOC; } } - toku_free (pkey.data); - toku_free (pval.data); - //Clean up memory in skey, sval - if (skey.data) toku_free(skey.data); - if (sval.data) toku_free(sval.data); - if (rows.n_rows > 0) { - r = sort_and_write_rows(&rows, &fs, bl, dest_db, compare); - if (r!=0) return r; + { // clean up this stuff early, to save memory + toku_free(skey.data); + toku_free(sval.data); + toku_free (pkey.data); + toku_free (pval.data); + skey.data = sval.data = pkey.data = pval.data = NULL; // set to NULL so that the final cleanup won't free them again. } - toku_free(rows.data); - toku_free(rows.rows); - r = merge_files(&fs, bl, dest_db, compare); - if (r!=0) return r; + + if (rows.n_rows > 0) { + r = sort_and_write_rows(&rows, &fs, bl, dest_db, compare, &ec); + if (r!=0) goto error; + } + { + // clean up this stuff early, to save memory + toku_free(rows.data); + toku_free(rows.rows); + rows.data = NULL; //set to NULL so the final cleanup won't free them again. + rows.rows = NULL; + } + r = merge_files(&fs, bl, dest_db, compare, &ec); + if (r!=0) goto error; // Now it's down to one file. Need to write the data out. The file is in fs. mode_t mode = S_IRWXU|S_IRWXG|S_IRWXO; int fd = open(new_fname, O_RDWR| O_CREAT | O_BINARY, mode); assert(fd>=0); assert(fs.n_temp_files==1); - FILE *inf = fopen(fs.temp_data_names[0], "r"); - r = write_file_to_dbfile(fd, inf, bl, descriptor); - assert(r==0); + r = brtloader_fi_reopen(&bl->file_infos, fs.data_fidxs[0], "r"); + if (r) goto error; + r = write_file_to_dbfile(fd, fs.data_fidxs[0], bl, descriptor); + if (r) goto error; + r = fsync(fd); + if (r) { r=errno; goto error; } r = close(fd); - assert(r==0); - r = fclose(inf); - assert(r==0); + if (r) { r=errno; goto error; } + r = brtloader_fi_close(&bl->file_infos, fs.data_fidxs[0]); + if (r) goto error; + r = brtloader_fi_unlink(&bl->file_infos, fs.data_fidxs[0]); + if (r) goto error; + r = brtloader_fi_unlink(&bl->file_infos, fs.idx_fidxs[0]); - toku_free(fs.temp_data_names[0]); - toku_free(fs.temp_idx_names[0]); - toku_free(fs.temp_data_names); - toku_free(fs.temp_idx_names); - - return 0; + error: // this is the cleanup code. Even if r==0 (no error) we fall through to here. + // if we get here we need to free up the merge_fileset and the rowset, as well as the keys + toku_free(rows.data); + toku_free(rows.rows); + toku_free(fs.data_fidxs); + toku_free(fs.idx_fidxs); + toku_free(skey.data); + toku_free(sval.data); + toku_free (pkey.data); + toku_free (pval.data); + return r; } -int toku_brt_loader_close (BRTLOADER bl) +int toku_brt_loader_close (BRTLOADER bl, + void (*error_callback)(DB *, int i, int err, DBT *key, DBT *val, void *extra), void *error_callback_extra + ) /* Effect: Close the bulk loader. * Return all the file descriptors in the array fds. */ { + int result = 0; for (int i=0; iN; i++) { char * fname_in_cwd = toku_cachetable_get_fname_in_cwd(bl->cachetable, bl->new_fnames_in_env[i]); - - int r = loader_do_i(bl, bl->dbs[i], bl->bt_compare_funs[i], bl->descriptors[i], fname_in_cwd); + result = loader_do_i(bl, bl->dbs[i], bl->bt_compare_funs[i], bl->descriptors[i], fname_in_cwd, i, error_callback, error_callback_extra); toku_free(fname_in_cwd); - if (r!=0) return r; + if (result!=0) goto error; toku_free((void*)bl->new_fnames_in_env[i]); bl->new_fnames_in_env[i] = NULL; } - toku_free(bl->dbs); - toku_free(bl->descriptors); - toku_free(bl->new_fnames_in_env); - toku_free(bl->bt_compare_funs); - toku_free((void*)bl->temp_file_template); - { int r = fclose(bl->fprimary_rows); assert (r==0); } - toku_free(bl->fprimary_rows_name); - { int r = fclose(bl->fprimary_idx); assert (r==0); } - toku_free(bl->fprimary_idx_name); - return 0; + result = brtloader_fi_close (&bl->file_infos, bl->fprimary_rows); if (result) goto error; + result = brtloader_fi_unlink(&bl->file_infos, bl->fprimary_rows); if (result) goto error; + result = brtloader_fi_close (&bl->file_infos, bl->fprimary_idx); if (result) goto error; + result = brtloader_fi_unlink(&bl->file_infos, bl->fprimary_idx); if (result) goto error; + assert(bl->file_infos.n_files_open == 0); + assert(bl->file_infos.n_files_extant == 0); + error: + brtloader_destroy(bl, result!=0); + return result; } struct dbuf { @@ -672,7 +935,7 @@ struct leaf_buf { int nkeys_p, ndata_p, dsize_p, n_in_buf_p; }; -const int nodesize = 1<<22; +const int nodesize = 1<<15; struct translation { int64_t off, size; @@ -721,7 +984,7 @@ static struct leaf_buf *start_leaf (struct dbout *out, const struct descriptor * dbuf_init(&lbuf->dbuf); int height=0; int flags=0; - int layout_version=11; + int layout_version=BRT_LAYOUT_VERSION; putbuf_bytes(&lbuf->dbuf, "tokuleaf", 8); putbuf_int32(&lbuf->dbuf, layout_version); putbuf_int32(&lbuf->dbuf, layout_version); // layout_version original @@ -895,7 +1158,7 @@ static void write_header (struct dbout *out, long long translation_location_on_d .nodesize = nodesize, .root = root_blocknum_on_disk, .flags = 0, - .layout_version_original = 11 + .layout_version_original = BRT_LAYOUT_VERSION }; unsigned int size = toku_serialize_brt_header_size (&h); struct wbuf wbuf; @@ -932,7 +1195,7 @@ static void allocate_node (struct subtrees_info *sts, int64_t b, const struct su sts->n_subtrees++; } -static int read_some_pivots (FILE *pivots_file, int n_to_read, BRTLOADER bl, +static int read_some_pivots (FIDX pivots_file, int n_to_read, BRTLOADER bl, /*out*/ DBT pivots[/*n_to_read*/]) // pivots is an array to be filled in. The pivots array is uninitialized. { @@ -945,8 +1208,8 @@ static int read_some_pivots (FILE *pivots_file, int n_to_read, BRTLOADER bl, } static int setup_nonleaf_block (int n_children, - struct subtrees_info *subtrees, FILE *pivots_file, int64_t first_child_offset_in_subtrees, - struct subtrees_info *next_subtrees, FILE *next_pivots_file, + struct subtrees_info *subtrees, FIDX pivots_file, int64_t first_child_offset_in_subtrees, + struct subtrees_info *next_subtrees, FIDX next_pivots_file, struct dbout *out, BRTLOADER bl, /*out*/int64_t *blocknum, /*out*/struct subtree_info **subtrees_info_p, @@ -1068,7 +1331,7 @@ int write_nonleaf_node (struct dbout *out, int64_t blocknum_of_new_node, int n_c } static -int write_nonleaves (BRTLOADER bl, FILE *pivots_file, char *pivots_fname, struct dbout *out, struct subtrees_info *sts, const struct descriptor *descriptor) { +int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const struct descriptor *descriptor) { int height=1; // Watch out for the case where we saved the last pivot but didn't write any more nodes out. @@ -1083,13 +1346,12 @@ int write_nonleaves (BRTLOADER bl, FILE *pivots_file, char *pivots_fname, struct // 2) We put the 15 pivots and 16 blocks into an non-leaf node. // 3) We put the 16th pivot into the next pivots file. { - int r = fseek(pivots_file, 0, SEEK_SET); + int r = fseek(bl_fidx2file(bl, pivots_fidx), 0, SEEK_SET); if (r!=0) { assert(errno!=0); return errno; } } - FILE *next_pivots_file; - char *next_pivots_name; - brtloader_open_temp_file (bl, &next_pivots_file, &next_pivots_name); + FIDX next_pivots_file; + brtloader_open_temp_file (bl, &next_pivots_file); struct subtrees_info next_sts = {.n_subtrees = 0, .n_subtrees_limit = 1}; @@ -1103,7 +1365,7 @@ int write_nonleaves (BRTLOADER bl, FILE *pivots_file, char *pivots_fname, struct int64_t blocknum_of_new_node; struct subtree_info *subtree_info; int r = setup_nonleaf_block (n_per_block, - sts, pivots_file, n_subtrees_used, + sts, pivots_fidx, n_subtrees_used, &next_sts, next_pivots_file, out, bl, &blocknum_of_new_node, &subtree_info, &pivots); @@ -1122,7 +1384,7 @@ int write_nonleaves (BRTLOADER bl, FILE *pivots_file, char *pivots_fname, struct int64_t blocknum_of_new_node; struct subtree_info *subtree_info; int r = setup_nonleaf_block(n_first, - sts, pivots_file, n_subtrees_used, + sts, pivots_fidx, n_subtrees_used, &next_sts, next_pivots_file, out, bl, &blocknum_of_new_node, &subtree_info, &pivots); @@ -1138,7 +1400,7 @@ int write_nonleaves (BRTLOADER bl, FILE *pivots_file, char *pivots_fname, struct int64_t blocknum_of_new_node; struct subtree_info *subtree_info; int r = setup_nonleaf_block(n_blocks_left, - sts, pivots_file, n_subtrees_used, + sts, pivots_fidx, n_subtrees_used, &next_sts, next_pivots_file, out, bl, &blocknum_of_new_node, &subtree_info, &pivots); @@ -1149,28 +1411,25 @@ int write_nonleaves (BRTLOADER bl, FILE *pivots_file, char *pivots_fname, struct } assert(n_subtrees_used == sts->n_subtrees); // Now set things up for the next iteration. - int r = fclose(pivots_file); assert(r==0); - pivots_file = next_pivots_file; - r = unlink(pivots_fname); assert(r==0); - toku_free(pivots_fname); - pivots_fname = next_pivots_name; + int r = brtloader_fi_close(&bl->file_infos, pivots_fidx); assert(r==0); + r = brtloader_fi_unlink(&bl->file_infos, pivots_fidx); assert(r==0); + pivots_fidx = next_pivots_file; toku_free(sts->subtrees); *sts = next_sts; height++; } - { int r = fclose(pivots_file); assert(r==0); } - toku_free(pivots_fname); + { int r = brtloader_fi_close (&bl->file_infos, pivots_fidx); assert(r==0); } + { int r = brtloader_fi_unlink(&bl->file_infos, pivots_fidx); assert(r==0); } return 0; } -int write_file_to_dbfile (int outfile, FILE *infile, BRTLOADER bl, const struct descriptor *descriptor) { +int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct descriptor *descriptor) { // The pivots file will contain all the pivot strings (in the form ) // The pivots_fname is the name of the pivots file. // Note that the pivots file will have one extra pivot in it (the last key in the dictionary) which will not appear in the tree. int64_t n_pivots=0; // number of pivots in pivots_file - FILE *pivots_file; // the file - char *pivots_fname; // the filename - brtloader_open_temp_file (bl, &pivots_file, &pivots_fname); + FIDX pivots_file; // the file + brtloader_open_temp_file (bl, &pivots_file); // The blocks_array will contain all the block numbers that correspond to the pivots. Generally there should be one more block than pivot. struct subtrees_info sts = {.next_free_block = 3, @@ -1227,7 +1486,7 @@ int write_file_to_dbfile (int outfile, FILE *infile, BRTLOADER bl, const struct finish_leafnode(&out, lbuf); { - int r = write_nonleaves(bl, pivots_file, pivots_fname, &out, &sts, descriptor); + int r = write_nonleaves(bl, pivots_file, &out, &sts, descriptor); assert(r==0); } diff --git a/newbrt/brtloader.h b/newbrt/brtloader.h index 414f1958172..eb0d0d0c177 100644 --- a/newbrt/brtloader.h +++ b/newbrt/brtloader.h @@ -16,6 +16,10 @@ int toku_brt_loader_open (BRTLOADER *bl, brt_compare_func bt_compare_functions[/*N*/], const char *temp_file_template); int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val); -int toku_brt_loader_close (BRTLOADER bl); +int toku_brt_loader_close (BRTLOADER bl, + void (*error_callback)(DB *, int which_db, int err, DBT *key, DBT *val, void *extra), + void *extra); + +void brtloader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)); #endif // BRTLOADER_H diff --git a/newbrt/tests/brtloader-test.c b/newbrt/tests/brtloader-test.c index 39cbe57d97e..897c0ea1597 100644 --- a/newbrt/tests/brtloader-test.c +++ b/newbrt/tests/brtloader-test.c @@ -22,41 +22,72 @@ static int compare_ints (DB *dest_db, const DBT *akey, const DBT *bkey) { return qsort_compare_ints(akey->data, bkey->data); } -static void test_merge_internal (int a[], int na, int b[], int nb) { +static void err_cb(DB *db UU(), int dbn UU(), int err UU(), DBT *key UU(), DBT *val UU(), void *extra UU()) { + fprintf(stderr, "error in test"); + abort(); +} +BOOL founddup; +static void expect_dups_cb(DB *db UU(), int dbn UU(), int err UU(), DBT *key UU(), DBT *val UU(), void *extra UU()) { + founddup=TRUE; +} + +static void test_merge_internal (int a[], int na, int b[], int nb, BOOL dups) { + int *MALLOC_N(na+nb, ab); // the combined array a and b + for (int i=0; ib[j]); - j++; - } else { - assert(0); + struct error_callback_s cb; + if (dups) { + cb.error_callback = expect_dups_cb; + founddup=FALSE; + } else { + cb.error_callback = err_cb; + } + struct rowset rs = {.data=(char*)ab}; + merge(cr, ar, na, br, nb, dest_db, compare_ints, &cb, &rs); + if (dups) { + assert(founddup); + } else { + // verify the merge + int i=0; + int j=0; + for (int k=0; kb[j]); + j++; + } else { + assert(0); + } } } } toku_free(cr); toku_free(ar); toku_free(br); + toku_free(ab); } /* Test the basic merger. */ @@ -64,38 +95,41 @@ static void test_merge (void) { { int avals[]={1,2,3,4,5}; int *bvals = NULL; //icc won't let us use a zero-sized array explicitly or by [] = {} construction. - test_merge_internal(avals, 5, bvals, 0); - test_merge_internal(bvals, 0, avals, 5); + test_merge_internal(avals, 5, bvals, 0, FALSE); + test_merge_internal(bvals, 0, avals, 5, FALSE); } { int avals[]={1,3,5,7}; int bvals[]={2,4}; - test_merge_internal(avals, 4, bvals, 2); - test_merge_internal(bvals, 2, avals, 4); + test_merge_internal(avals, 4, bvals, 2, FALSE); + test_merge_internal(bvals, 2, avals, 4, FALSE); } { int avals[]={1,2,3,5,6,7}; int bvals[]={2,4,5,6,8}; - test_merge_internal(avals, 6, bvals, 5); - test_merge_internal(bvals, 5, avals, 6); + test_merge_internal(avals, 6, bvals, 5, TRUE); + test_merge_internal(bvals, 5, avals, 6, TRUE); } } static void test_internal_mergesort_row_array (int a[], int n) { struct row *MALLOC_N(n, ar); for (int i=0; i= fs.n_temp_files); destroy_rowset(&aset); destroy_rowset(&bset); - for (int i=0; i<2; i++) assert(fs.temp_data_names[i] != NULL && fs.temp_idx_names[i] != NULL); + for (int i=0; i<2; i++) assert(fs.data_fidxs[i].idx != -1 && fs.idx_fidxs[i].idx != -1); - r = merge_files(&fs, &bl, dest_db, compare_ints); CKERR(r); + r = merge_files(&fs, &bl, dest_db, compare_ints, &cb); CKERR(r); assert(fs.n_temp_files==1); - FILE *inf = fopen(fs.temp_data_names[0], "r"); + FIDX inf = fs.data_fidxs[0]; + r = brtloader_fi_reopen(&bl.file_infos, inf, "r"); + CKERR(r); char *name = toku_strdup(template); int fd = mkstemp(name); fprintf(stderr, "Final data in %s\n", name); @@ -231,18 +280,15 @@ static void test_merge_files (char *template) { struct descriptor desc = {.version = 1, .dbt = (DBT){.size = 4, .data="abcd"}}; r = write_file_to_dbfile(fd, inf, &bl, &desc); CKERR(r); - r = fclose(inf); + r = brtloader_fi_close(&bl.file_infos, inf); CKERR(r); - r = unlink(fs.temp_data_names[0]); + r = brtloader_fi_unlink(&bl.file_infos, fs.data_fidxs[0]); CKERR(r); - r = unlink(fs.temp_idx_names[0]); + r = brtloader_fi_unlink(&bl.file_infos, fs.idx_fidxs[0]); CKERR(r); - - toku_free(fs.temp_data_names[0]); - toku_free(fs.temp_idx_names[0]); - toku_free(fs.temp_data_names); - toku_free(fs.temp_idx_names); + destroy_merge_fileset(&fs); + brtloader_fi_destroy(&bl.file_infos, FALSE); toku_free(name); } diff --git a/src/loader.c b/src/loader.c index 1080ed01c29..208103e60e1 100644 --- a/src/loader.c +++ b/src/loader.c @@ -38,7 +38,8 @@ struct __toku_loader_internal { uint32_t *dbt_flags; uint32_t loader_flags; void *extra; - void (*error_callback)(DB *db, int i, int err, DBT *key, DBT *val, void *extra); + void (*error_callback)(DB *db, int i, int err, DBT *key, DBT *val, void *extra_extra); + void *error_extra; int (*poll_func)(void *extra, float progress); char *temp_file_template; @@ -147,7 +148,7 @@ int toku_loader_create_loader(DB_ENV *env, } loader->i->ekeys = NULL; loader->i->evals = NULL; - r = ydb_load_inames (env, txn, N, dbs, new_inames_in_env); + r = locked_ydb_load_inames (env, txn, N, dbs, new_inames_in_env); assert(r==0); toku_brt_loader_open(&loader->i->brt_loader, loader->i->env->i->cachetable, @@ -175,9 +176,11 @@ int toku_loader_set_poll_function(DB_LOADER *loader, } int toku_loader_set_error_callback(DB_LOADER *loader, - void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra)) + void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra), + void *error_extra) { loader->i->error_callback = error_cb; + loader->i->error_extra = error_extra; return 0; } @@ -230,31 +233,36 @@ int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val) int toku_loader_close(DB_LOADER *loader) { + int r=0; if ( loader->i->err_errno != 0 ) { if ( loader->i->error_callback != NULL ) { - loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, NULL); + loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra); } toku_free(loader->i->err_key.data); toku_free(loader->i->err_val.data); } if ( !(loader->i->loader_flags & LOADER_USE_PUTS ) ) { - toku_brt_loader_close(loader->i->brt_loader); + r = toku_brt_loader_close(loader->i->brt_loader, loader->i->error_callback, loader->i->error_extra); + if (r!=0) goto cleanup_and_return_r; for (int i=0; ii->N; i++) { toku_ydb_lock(); //Must hold ydb lock for dictionary_redirect. - int r = toku_dictionary_redirect(loader->i->inames_in_env[i], - loader->i->dbs[i]->i->brt, - db_txn_struct_i(loader->i->txn)->tokutxn); + r = toku_dictionary_redirect(loader->i->inames_in_env[i], + loader->i->dbs[i]->i->brt, + db_txn_struct_i(loader->i->txn)->tokutxn); assert(r==0); toku_ydb_unlock(); - toku_free(loader->i->inames_in_env[i]); } + cleanup_and_return_r: + for (int i=0; ii->N; i++) { + toku_free(loader->i->inames_in_env[i]); + } toku_free(loader->i->inames_in_env); toku_free(loader->i->brt_loader); // TODO: release table locks - } - if (loader->i->loader_flags & LOADER_USE_PUTS) { + } else { + // (loader->i->loader_flags & LOADER_USE_PUTS); int num_dbts = loader->i->N; for (int i=0; ii->ekeys && @@ -274,7 +282,7 @@ int toku_loader_close(DB_LOADER *loader) toku_free(loader->i->temp_file_template); toku_free(loader->i); toku_free(loader); - return 0; + return r; } int toku_loader_abort(DB_LOADER *loader) diff --git a/src/loader.h b/src/loader.h index 5a6a44d2cbc..d36d922aef2 100644 --- a/src/loader.h +++ b/src/loader.h @@ -10,7 +10,7 @@ */ int toku_loader_create_loader(DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[N], uint32_t db_flags[N], uint32_t dbt_flags[N], uint32_t loader_flags, void *extra); -int toku_loader_set_error_callback(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra)); +int toku_loader_set_error_callback(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra), void *extra); int toku_loader_set_poll_function(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val); int toku_loader_close(DB_LOADER *loader); diff --git a/src/tests/Makefile b/src/tests/Makefile index 9b150fdbe80..4ab5ee1b806 100644 --- a/src/tests/Makefile +++ b/src/tests/Makefile @@ -128,6 +128,7 @@ BDB_DONTRUN_TESTS = \ recover-delboth-after-checkpoint \ loader-reference-test \ loader-stress-test \ + loader-dup-test \ recover-lsn-filter-multiple \ recover-put-multiple-fdelete-all \ recover-put-multiple-fdelete-some \ diff --git a/src/tests/loader-dup-test.c b/src/tests/loader-dup-test.c new file mode 100644 index 00000000000..70c6d263a15 --- /dev/null +++ b/src/tests/loader-dup-test.c @@ -0,0 +1,378 @@ +/* -*- mode: C; c-basic-offset: 4 -*- */ +#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved." +#ident "$Id$" + +#include "test.h" +#include "toku_pthread.h" +#include +#include + +DB_ENV *env; +enum {MAX_NAME=128}; +enum {MAX_DBS=256}; +int NUM_DBS=5; +int NUM_ROWS=100000; +int CHECK_RESULTS=0; +int USE_PUTS=0; +enum {MAGIC=311}; + +// +// Functions to create unique key/value pairs, row generators, checkers, ... for each of NUM_DBS +// + +// a is the bit-wise permute table. For DB[i], permute bits as described in a[i] using 'twiddle32' +// inv is the inverse bit-wise permute of a[]. To get the original value from a twiddled value, twiddle32 (again) with inv[] +int a[MAX_DBS][32]; +int inv[MAX_DBS][32]; + +#if defined(__cilkplusplus) || defined (__cplusplus) +extern "C" { +#endif + +// rotate right and left functions +static inline unsigned int rotr32(const unsigned int x, const unsigned int num) { + const unsigned int n = num % 32; + return (x >> n) | ( x << (32 - n)); +} +static inline unsigned int rotl32(const unsigned int x, const unsigned int num) { + const unsigned int n = num % 32; + return (x << n) | ( x >> (32 - n)); +} + +static void generate_permute_tables(void) { + int i, j, tmp; + for(int db=0;db> i ) & 1) << a[db][i]; + } + return b; +} + +// permute bits of x based on inverse permute table bitmap +static unsigned int inv_twiddle32(unsigned int x, int db) +{ + unsigned int b = 0; + for(int i=0;i<32;i++) { + b |= (( x >> i ) & 1) << inv[db][i]; + } + return b; +} + +// generate val from key, index +static unsigned int generate_val(int key, int i) { + return rotl32((key + MAGIC), i); +} +static unsigned int pkey_for_val(int key, int i) { + return rotr32(key, i) - MAGIC; +} + +// There is no handlerton in this test, so this function is a local replacement +// for the handlerton's generate_row_for_put(). +static int put_multiple_generate(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val, void *extra) { + + src_db = src_db; + extra = extra; + + uint32_t which = *(uint32_t*)dest_db->app_private; + + if ( which == 0 ) { + if (dest_key->flags==DB_DBT_REALLOC) { + if (dest_key->data) toku_free(dest_key->data); + dest_key->flags = 0; + dest_key->ulen = 0; + } + if (dest_val->flags==DB_DBT_REALLOC) { + if (dest_val->data) toku_free(dest_val->data); + dest_val->flags = 0; + dest_val->ulen = 0; + } + dbt_init(dest_key, src_key->data, src_key->size); + dbt_init(dest_val, src_val->data, src_val->size); + } + else { + assert(dest_key->flags==DB_DBT_REALLOC); + if (dest_key->ulen < sizeof(unsigned int)) { + dest_key->data = toku_xrealloc(dest_key->data, sizeof(unsigned int)); + dest_key->ulen = sizeof(unsigned int); + } + assert(dest_val->flags==DB_DBT_REALLOC); + if (dest_val->ulen < sizeof(unsigned int)) { + dest_val->data = toku_xrealloc(dest_val->data, sizeof(unsigned int)); + dest_val->ulen = sizeof(unsigned int); + } + unsigned int *new_key = (unsigned int *)dest_key->data; + unsigned int *new_val = (unsigned int *)dest_val->data; + + *new_key = twiddle32(*(unsigned int*)src_key->data, which); + *new_val = generate_val(*(unsigned int*)src_key->data, which); + + dest_key->size = sizeof(unsigned int); + dest_val->size = sizeof(unsigned int); + //data is already set above + } + +// printf("dest_key.data = %d\n", *(int*)dest_key->data); +// printf("dest_val.data = %d\n", *(int*)dest_val->data); + + return 0; +} + +#if defined(__cilkplusplus) || defined(__cplusplus) +} // extern "C" +#endif + +static void check_results(DB **dbs) +{ + for(int j=0;jtxn_begin(env, NULL, &txn, 0); + CKERR(r); + + DBC *cursor; + r = dbs[j]->cursor(dbs[j], txn, &cursor, 0); + CKERR(r); + for(int i=0;ic_get(cursor, &key, &val, DB_NEXT); + CKERR(r); + k = *(unsigned int*)key.data; + pkey_for_db_key = (j == 0) ? k : inv_twiddle32(k, j); + v = *(unsigned int*)val.data; + // test that we have the expected keys and values + assert((unsigned int)pkey_for_db_key == (unsigned int)pkey_for_val(v, j)); +// printf(" DB[%d] key = %10u, val = %10u, pkey_for_db_key = %10u, pkey_for_val=%10d\n", j, v, k, pkey_for_db_key, pkey_for_val(v, j)); + } + {printf("."); fflush(stdout);} + r = cursor->c_close(cursor); + CKERR(r); + r = txn->commit(txn, 0); + CKERR(r); + } + printf("\nCheck OK\n"); +} + +struct error_extra { + int bad_i; + int error_count; +}; + +static void error_callback (DB *db, int which_db, int err, DBT *key, DBT *val, void *extra) { + assert(db); + assert(extra); + assert(err==DB_KEYEXIST); + assert(which_db>=0); + assert(key->size==4); + assert(which_db==0); + struct error_extra *e =(struct error_extra *)extra; + assert(e->bad_i == *(int*)key->data); + val=val; + assert(e->error_count==0); + e->error_count++; +} + +static void test_loader(DB **dbs) +{ + int r; + DB_TXN *txn; + DB_LOADER *loader; + uint32_t db_flags[MAX_DBS]; + uint32_t dbt_flags[MAX_DBS]; + for(int i=0;itxn_begin(env, NULL, &txn, 0); + CKERR(r); + r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags, NULL); + CKERR(r); + struct error_extra error_extra = {.error_count=0}; + r = loader->set_error_callback(loader, error_callback, (void*)&error_extra); + CKERR(r); + r = loader->set_poll_function(loader, NULL); + CKERR(r); + + // using loader->put, put values into DB + DBT key, val; + unsigned int k, v; + { // put a duplicate row in. + int i = NUM_ROWS; + k = i; + v = generate_val(i, 0); + dbt_init(&key, &k, sizeof(unsigned int)); + dbt_init(&val, &v, sizeof(unsigned int)); + r = loader->put(loader, &key, &val); + CKERR(r); + if ( CHECK_RESULTS || verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} } + error_extra.bad_i = i; + } + for(int i=1;i<=NUM_ROWS;i++) { + k = i; + v = generate_val(i, 0); + dbt_init(&key, &k, sizeof(unsigned int)); + dbt_init(&val, &v, sizeof(unsigned int)); + r = loader->put(loader, &key, &val); + CKERR(r); + if ( CHECK_RESULTS || verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} } + } + if( CHECK_RESULTS || verbose ) {printf("\n"); fflush(stdout);} + + // close the loader + if (verbose) { printf("closing"); fflush(stdout); } + r = loader->close(loader); + if (verbose) { printf(" done\n"); } + assert(r==DB_KEYEXIST); + assert(error_extra.error_count==1); + + r = txn->commit(txn, 0); + CKERR(r); + + // verify the DBs + if ( CHECK_RESULTS ) { + check_results(dbs); + } +} + + +static void run_test(void) +{ + int r; + r = system("rm -rf " ENVDIR); CKERR(r); + r = toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); + + r = db_env_create(&env, 0); CKERR(r); + r = env->set_default_bt_compare(env, uint_dbt_cmp); CKERR(r); + r = env->set_default_dup_compare(env, uint_dbt_cmp); CKERR(r); + r = env->set_generate_row_callback_for_put(env, put_multiple_generate); + CKERR(r); + int envflags = DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE; + r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); + env->set_errfile(env, stderr); + //Disable auto-checkpointing + r = env->checkpointing_set_period(env, 0); CKERR(r); + + DBT desc; + dbt_init(&desc, "foo", sizeof("foo")); + char name[MAX_NAME*2]; + + DB **dbs = (DB**)toku_malloc(sizeof(DB*) * NUM_DBS); + assert(dbs != NULL); + int idx[MAX_DBS]; + for(int i=0;iset_descriptor(dbs[i], 1, &desc, abort_on_upgrade); CKERR(r); + dbs[i]->app_private = &idx[i]; + snprintf(name, sizeof(name), "db_%04x", i); + r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r); + } + + generate_permute_tables(); + +// printf("running test_loader()\n"); + // -------------------------- // + test_loader(dbs); + // -------------------------- // +// printf("done test_loader()\n"); + + for(int i=0;iclose(dbs[i], 0); CKERR(r); + dbs[i] = NULL; + } + r = env->close(env, 0); CKERR(r); + toku_free(dbs); +} + +// ------------ infrastructure ---------- +static void do_args(int argc, char * const argv[]); + +int num_rows_set = FALSE; + +int test_main(int argc, char * const *argv) { + do_args(argc, argv); + if (num_rows_set) + run_test(); + else { + int sizes[]={1,4000000,-1}; + for (int i=0; sizes[i]>=0; i++) { + if (verbose) printf("Doing %d\n", sizes[i]); + NUM_ROWS = sizes[i]; + run_test(); + } + } + return 0; +} + +static void do_args(int argc, char * const argv[]) { + int resultcode; + char *cmd = argv[0]; + argc--; argv++; + while (argc>0) { + if (strcmp(argv[0], "-v")==0) { + verbose++; + } else if (strcmp(argv[0],"-q")==0) { + verbose--; + if (verbose<0) verbose=0; + } else if (strcmp(argv[0], "-h")==0) { + resultcode=0; + do_usage: + fprintf(stderr, "Usage: -h -c -d -r \n%s\n", cmd); + exit(resultcode); + } else if (strcmp(argv[0], "-d")==0) { + argc--; argv++; + NUM_DBS = atoi(argv[0]); + if ( NUM_DBS > MAX_DBS ) { + fprintf(stderr, "max value for -d field is %d\n", MAX_DBS); + resultcode=1; + goto do_usage; + } + } else if (strcmp(argv[0], "-v")==0) { + verbose++; + } else if (strcmp(argv[0],"-q")==0) { + verbose--; + if (verbose<0) verbose=0; + } else if (strcmp(argv[0], "-r")==0) { + argc--; argv++; + NUM_ROWS = atoi(argv[0]); + num_rows_set = TRUE; + } else if (strcmp(argv[0], "-c")==0) { + CHECK_RESULTS = 1; + } else if (strcmp(argv[0], "-p")==0) { + USE_PUTS = 1; + } else { + fprintf(stderr, "Unknown arg: %s\n", argv[0]); + resultcode=1; + goto do_usage; + } + argc--; + argv++; + } +} diff --git a/src/tests/loader-reference-test.c b/src/tests/loader-reference-test.c index 03ea14e3b6f..bc801627430 100644 --- a/src/tests/loader-reference-test.c +++ b/src/tests/loader-reference-test.c @@ -54,7 +54,7 @@ static void test_loader(DB **dbs) CKERR(r); r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags, NULL); CKERR(r); - r = loader->set_error_callback(loader, NULL); + r = loader->set_error_callback(loader, NULL, NULL); CKERR(r); r = loader->set_poll_function(loader, NULL); CKERR(r); @@ -108,7 +108,8 @@ static void run_test(void) r = env->set_default_dup_compare(env, int64_dbt_cmp); CKERR(r); r = env->set_generate_row_callback_for_put(env, put_multiple_generate); CKERR(r); - int envflags = DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE | DB_INIT_LOG; +// int envflags = DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE | DB_INIT_LOG; + int envflags = DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE | DB_INIT_LOG; r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); env->set_errfile(env, stderr); //Disable auto-checkpointing diff --git a/src/tests/loader-stress-test.c b/src/tests/loader-stress-test.c index 26362dd20c5..7d541e26337 100644 --- a/src/tests/loader-stress-test.c +++ b/src/tests/loader-stress-test.c @@ -195,7 +195,7 @@ static void test_loader(DB **dbs) CKERR(r); r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags, NULL); CKERR(r); - r = loader->set_error_callback(loader, NULL); + r = loader->set_error_callback(loader, NULL, NULL); CKERR(r); r = loader->set_poll_function(loader, NULL); CKERR(r); @@ -241,7 +241,8 @@ static void run_test(void) r = env->set_default_dup_compare(env, uint_dbt_cmp); CKERR(r); r = env->set_generate_row_callback_for_put(env, put_multiple_generate); CKERR(r); - int envflags = DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE; +// int envflags = DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE; + int envflags = DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE; r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); env->set_errfile(env, stderr); //Disable auto-checkpointing diff --git a/src/ydb.c b/src/ydb.c index 1f634509ebe..389205013e7 100644 --- a/src/ydb.c +++ b/src/ydb.c @@ -1404,7 +1404,6 @@ env_get_engine_status(DB_ENV * env, ENGINE_STATUS * engstat) { engstat->enospc_threads_blocked = enospc_threads_blocked; engstat->enospc_total = enospc_total; } - } return r; } @@ -5377,6 +5376,14 @@ ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_iname return rval; } +int +locked_ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_inames_in_env[N]) { + toku_ydb_lock(); + int r = ydb_load_inames(env, txn, N, dbs, new_inames_in_env); + toku_ydb_unlock(); + return r; +} + // TODO 2216: Patch out this (dangerous) function when loader is working and // we don't need to test the low-level redirect anymore. // for use by test programs only, just a wrapper around brt call: diff --git a/src/ydb_load.h b/src/ydb_load.h index 7e7e89ad053..e02d84e42bd 100644 --- a/src/ydb_load.h +++ b/src/ydb_load.h @@ -25,5 +25,12 @@ int ydb_load_inames(DB_ENV * env, DB * dbs[/*N*/], /*out*/ char * new_inames_in_env[N]); +// Wrapper to ydb_load_inames if you are not holding the ydb lock. +int locked_ydb_load_inames(DB_ENV * env, + DB_TXN * txn, + int N, + DB * dbs[/*N*/], + /*out*/ char * new_inames_in_env[N]); + #endif