mirror of
https://github.com/MariaDB/server.git
synced 2025-02-02 12:01:42 +01:00
fix the loader error callback and the pqueue-test closes[t:2576]
git-svn-id: file:///svn/toku/tokudb@19913 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
parent
9cab06f7fb
commit
0612028bcf
6 changed files with 95 additions and 101 deletions
|
@ -71,14 +71,15 @@ struct poll_callback_s {
|
|||
brt_loader_poll_func poll_function;
|
||||
void *poll_extra;
|
||||
};
|
||||
typedef struct poll_callback_s *brtloader_poll_callback;
|
||||
|
||||
int brt_loader_init_poll_callback(BRTLOADER);
|
||||
int brt_loader_init_poll_callback(brtloader_poll_callback);
|
||||
|
||||
void brt_loader_destroy_poll_callback(BRTLOADER);
|
||||
void brt_loader_destroy_poll_callback(brtloader_poll_callback);
|
||||
|
||||
void brt_loader_set_poll_function(BRTLOADER, brt_loader_poll_func poll_function, void *poll_extra);
|
||||
void brt_loader_set_poll_function(brtloader_poll_callback, brt_loader_poll_func poll_function, void *poll_extra);
|
||||
|
||||
int brt_loader_call_poll_function(BRTLOADER, float progress);
|
||||
int brt_loader_call_poll_function(brtloader_poll_callback, float progress);
|
||||
|
||||
struct error_callback_s {
|
||||
brt_loader_error_func error_callback;
|
||||
|
@ -90,24 +91,22 @@ struct error_callback_s {
|
|||
DBT key;
|
||||
DBT val;
|
||||
toku_pthread_mutex_t mutex;
|
||||
|
||||
int (*set_error_and_callback)(BRTLOADER, int error, DB *db, int which_db, DBT *key, DBT *val);
|
||||
BRTLOADER bl;
|
||||
};
|
||||
typedef struct error_callback_s *brtloader_error_callback;
|
||||
|
||||
int brt_loader_init_error_callback(BRTLOADER);
|
||||
int brt_loader_init_error_callback(brtloader_error_callback);
|
||||
|
||||
void brt_loader_destroy_error_callback(BRTLOADER);
|
||||
void brt_loader_destroy_error_callback(brtloader_error_callback);
|
||||
|
||||
int brt_loader_get_error(BRTLOADER);
|
||||
int brt_loader_get_error(brtloader_error_callback);
|
||||
|
||||
void brt_loader_set_error_function(BRTLOADER, brt_loader_error_func error_function, void *extra);
|
||||
void brt_loader_set_error_function(brtloader_error_callback, brt_loader_error_func error_function, void *extra);
|
||||
|
||||
int brt_loader_set_error(BRTLOADER, int error, DB *db, int which_db, DBT *key, DBT *val);
|
||||
int brt_loader_set_error(brtloader_error_callback, int error, DB *db, int which_db, DBT *key, DBT *val);
|
||||
|
||||
int brt_loader_call_error_function(BRTLOADER);
|
||||
int brt_loader_call_error_function(brtloader_error_callback);
|
||||
|
||||
int brt_loader_set_error_and_callback(BRTLOADER, int error, DB *db, int which_db, DBT *key, DBT *val);
|
||||
int brt_loader_set_error_and_callback(brtloader_error_callback, int error, DB *db, int which_db, DBT *key, DBT *val);
|
||||
|
||||
struct brtloader_s {
|
||||
int panic;
|
||||
|
|
|
@ -246,8 +246,8 @@ static void brtloader_destroy (BRTLOADER bl, BOOL is_error) {
|
|||
toku_free(bl->fractal_queues);
|
||||
toku_free(bl->fractal_threads_live);
|
||||
|
||||
brt_loader_destroy_error_callback(bl);
|
||||
brt_loader_destroy_poll_callback(bl);
|
||||
brt_loader_destroy_error_callback(&bl->error_callback);
|
||||
brt_loader_destroy_poll_callback(&bl->poll_callback);
|
||||
}
|
||||
|
||||
static void *extractor_thread (void*);
|
||||
|
@ -326,8 +326,8 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
|
|||
init_merge_fileset(&bl->fs[i]);
|
||||
}
|
||||
|
||||
brt_loader_init_error_callback(bl);
|
||||
brt_loader_init_poll_callback(bl);
|
||||
brt_loader_init_error_callback(&bl->error_callback);
|
||||
brt_loader_init_poll_callback(&bl->poll_callback);
|
||||
|
||||
{ int r = init_rowset(&bl->primary_rowset); if (r!=0) return r; }
|
||||
{ int r = queue_create(&bl->primary_rowset_queue, 1); if (r!=0) return r; }
|
||||
|
@ -809,7 +809,7 @@ int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val)
|
|||
* Return value: 0 on success, an error number otherwise.
|
||||
*/
|
||||
{
|
||||
if (bl->panic || brt_loader_get_error(bl))
|
||||
if (bl->panic || brt_loader_get_error(&bl->error_callback))
|
||||
return EINVAL; // previous panic
|
||||
bl->n_rows++;
|
||||
// return loader_write_row(key, val, bl->fprimary_rows, &bl->fprimary_offset, bl);
|
||||
|
@ -848,7 +848,7 @@ int merge_row_arrays_base (struct row dest[/*an+bn*/], struct row a[/*an*/], int
|
|||
if (compare_result==0) {
|
||||
if (bl->error_callback.error_callback) {
|
||||
DBT aval; memset(&aval, 0, sizeof aval); aval.data=rowset->data + a->off + a->klen; aval.size = a->vlen;
|
||||
brt_loader_set_error(bl, DB_KEYEXIST, dest_db, which_db, &akey, &aval);
|
||||
brt_loader_set_error(&bl->error_callback, DB_KEYEXIST, dest_db, which_db, &akey, &aval);
|
||||
}
|
||||
return DB_KEYEXIST;
|
||||
} else if (compare_result<0) {
|
||||
|
@ -894,7 +894,7 @@ static int binary_search (int *location,
|
|||
if (compare_result==0) {
|
||||
if (bl->error_callback.error_callback) {
|
||||
DBT aval = make_dbt(rowset->data + a[a2].off + a[a2].klen, a[a2].vlen);
|
||||
brt_loader_set_error(bl, DB_KEYEXIST, dest_db, which_db, &akey, &aval);
|
||||
brt_loader_set_error(&bl->error_callback, DB_KEYEXIST, dest_db, which_db, &akey, &aval);
|
||||
}
|
||||
return DB_KEYEXIST;
|
||||
} else if (compare_result<0) {
|
||||
|
@ -1082,7 +1082,7 @@ static int update_progress (int N,
|
|||
bl->progress+=N;
|
||||
|
||||
//printf(" %20s: %d ", message, bl->progress);
|
||||
int r = brt_loader_call_poll_function(bl, (float)bl->progress/(float)PROGRESS_MAX);
|
||||
int r = brt_loader_call_poll_function(&bl->poll_callback, (float)bl->progress/(float)PROGRESS_MAX);
|
||||
toku_pthread_mutex_unlock(&update_progress_lock);
|
||||
return r;
|
||||
}
|
||||
|
@ -1956,15 +1956,15 @@ int toku_brt_loader_close (BRTLOADER bl,
|
|||
|
||||
//printf("Closing\n");
|
||||
|
||||
brt_loader_set_error_function(bl, error_function, error_extra);
|
||||
brt_loader_set_error_function(&bl->error_callback, error_function, error_extra);
|
||||
|
||||
brt_loader_set_poll_function(bl, poll_function, poll_extra);
|
||||
brt_loader_set_poll_function(&bl->poll_callback, poll_function, poll_extra);
|
||||
|
||||
r = finish_extractor(bl);
|
||||
assert(r==0); // !!! should check this error code and cleanup if needed.
|
||||
|
||||
// check for an error during extraction
|
||||
r = brt_loader_call_error_function(bl);
|
||||
r = brt_loader_call_error_function(&bl->error_callback);
|
||||
if (r) {
|
||||
brtloader_destroy(bl, TRUE);
|
||||
return r;
|
||||
|
@ -2398,36 +2398,34 @@ int brt_loader_write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, con
|
|||
}
|
||||
#endif
|
||||
|
||||
int brt_loader_init_error_callback(BRTLOADER bl) {
|
||||
memset(&bl->error_callback, 0, sizeof bl->error_callback);
|
||||
bl->error_callback.set_error_and_callback = brt_loader_set_error_and_callback;
|
||||
bl->error_callback.bl = bl;
|
||||
int r = toku_pthread_mutex_init(&bl->error_callback.mutex, NULL); assert(r == 0);
|
||||
int brt_loader_init_error_callback(brtloader_error_callback loader_error) {
|
||||
memset(loader_error, 0, sizeof *loader_error);
|
||||
int r = toku_pthread_mutex_init(&loader_error->mutex, NULL); assert(r == 0);
|
||||
return r;
|
||||
}
|
||||
|
||||
void brt_loader_destroy_error_callback(BRTLOADER bl) {
|
||||
int r = toku_pthread_mutex_destroy(&bl->error_callback.mutex); assert(r == 0);
|
||||
toku_free(bl->error_callback.key.data);
|
||||
toku_free(bl->error_callback.val.data);
|
||||
memset(&bl->error_callback, 0, sizeof bl->error_callback);
|
||||
void brt_loader_destroy_error_callback(brtloader_error_callback loader_error) {
|
||||
int r = toku_pthread_mutex_destroy(&loader_error->mutex); assert(r == 0);
|
||||
toku_free(loader_error->key.data);
|
||||
toku_free(loader_error->val.data);
|
||||
memset(loader_error, 0, sizeof *loader_error);
|
||||
}
|
||||
|
||||
int brt_loader_get_error(BRTLOADER bl) {
|
||||
return bl->error_callback.error;
|
||||
int brt_loader_get_error(brtloader_error_callback loader_error) {
|
||||
return loader_error->error;
|
||||
}
|
||||
|
||||
void brt_loader_set_error_function(BRTLOADER bl, brt_loader_error_func error_function, void *error_extra) {
|
||||
bl->error_callback.error_callback = error_function;
|
||||
bl->error_callback.extra = error_extra;
|
||||
void brt_loader_set_error_function(brtloader_error_callback loader_error, brt_loader_error_func error_function, void *error_extra) {
|
||||
loader_error->error_callback = error_function;
|
||||
loader_error->extra = error_extra;
|
||||
}
|
||||
|
||||
static void error_callback_lock(BRTLOADER bl) {
|
||||
int r = toku_pthread_mutex_lock(&bl->error_callback.mutex); assert(r == 0);
|
||||
static void error_callback_lock(brtloader_error_callback loader_error) {
|
||||
int r = toku_pthread_mutex_lock(&loader_error->mutex); assert(r == 0);
|
||||
}
|
||||
|
||||
static void error_callback_unlock(BRTLOADER bl) {
|
||||
int r = toku_pthread_mutex_unlock(&bl->error_callback.mutex); assert(r == 0);
|
||||
static void error_callback_unlock(brtloader_error_callback loader_error) {
|
||||
int r = toku_pthread_mutex_unlock(&loader_error->mutex); assert(r == 0);
|
||||
}
|
||||
|
||||
static void copy_dbt(DBT *dest, DBT *src) {
|
||||
|
@ -2438,63 +2436,63 @@ static void copy_dbt(DBT *dest, DBT *src) {
|
|||
}
|
||||
}
|
||||
|
||||
int brt_loader_set_error(BRTLOADER bl, int error, DB *db, int which_db, DBT *key, DBT *val) {
|
||||
int brt_loader_set_error(brtloader_error_callback loader_error, int error, DB *db, int which_db, DBT *key, DBT *val) {
|
||||
int r = 0;
|
||||
error_callback_lock(bl);
|
||||
if (bl->error_callback.error) { // there can be only one
|
||||
error_callback_lock(loader_error);
|
||||
if (loader_error->error) { // there can be only one
|
||||
r = EEXIST;
|
||||
} else {
|
||||
bl->error_callback.error = error; // set the error
|
||||
bl->error_callback.db = db;
|
||||
bl->error_callback.which_db = which_db;
|
||||
copy_dbt(&bl->error_callback.key, key); // copy the data
|
||||
copy_dbt(&bl->error_callback.val, val);
|
||||
loader_error->error = error; // set the error
|
||||
loader_error->db = db;
|
||||
loader_error->which_db = which_db;
|
||||
copy_dbt(&loader_error->key, key); // copy the data
|
||||
copy_dbt(&loader_error->val, val);
|
||||
}
|
||||
error_callback_unlock(bl);
|
||||
error_callback_unlock(loader_error);
|
||||
return r;
|
||||
}
|
||||
|
||||
int brt_loader_call_error_function(BRTLOADER bl) {
|
||||
int brt_loader_call_error_function(brtloader_error_callback loader_error) {
|
||||
int r;
|
||||
error_callback_lock(bl);
|
||||
r = bl->error_callback.error;
|
||||
if (r && !bl->error_callback.did_callback) {
|
||||
bl->error_callback.did_callback = 1;
|
||||
bl->error_callback.error_callback(bl->error_callback.db,
|
||||
bl->error_callback.which_db,
|
||||
bl->error_callback.error,
|
||||
&bl->error_callback.key,
|
||||
&bl->error_callback.val,
|
||||
bl->error_callback.extra);
|
||||
error_callback_lock(loader_error);
|
||||
r = loader_error->error;
|
||||
if (r && !loader_error->did_callback) {
|
||||
loader_error->did_callback = 1;
|
||||
loader_error->error_callback(loader_error->db,
|
||||
loader_error->which_db,
|
||||
loader_error->error,
|
||||
&loader_error->key,
|
||||
&loader_error->val,
|
||||
loader_error->extra);
|
||||
}
|
||||
error_callback_unlock(bl); return r;
|
||||
error_callback_unlock(loader_error); return r;
|
||||
}
|
||||
|
||||
int brt_loader_set_error_and_callback(BRTLOADER bl, int error, DB *db, int which_db, DBT *key, DBT *val) {
|
||||
int r = brt_loader_set_error(bl, error, db, which_db, key, val);
|
||||
int brt_loader_set_error_and_callback(brtloader_error_callback loader_error, int error, DB *db, int which_db, DBT *key, DBT *val) {
|
||||
int r = brt_loader_set_error(loader_error, error, db, which_db, key, val);
|
||||
if (r == 0)
|
||||
r = brt_loader_call_error_function(bl);
|
||||
r = brt_loader_call_error_function(loader_error);
|
||||
return r;
|
||||
}
|
||||
|
||||
int brt_loader_init_poll_callback(BRTLOADER bl) {
|
||||
memset(&bl->poll_callback, 0, sizeof bl->poll_callback);
|
||||
int brt_loader_init_poll_callback(brtloader_poll_callback p) {
|
||||
memset(p, 0, sizeof *p);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void brt_loader_destroy_poll_callback(BRTLOADER bl) {
|
||||
memset(&bl->poll_callback, 0, sizeof bl->poll_callback);
|
||||
void brt_loader_destroy_poll_callback(brtloader_poll_callback p) {
|
||||
memset(p, 0, sizeof *p);
|
||||
}
|
||||
|
||||
void brt_loader_set_poll_function(BRTLOADER bl, brt_loader_poll_func poll_function, void *poll_extra) {
|
||||
bl->poll_callback.poll_function = poll_function;
|
||||
bl->poll_callback.poll_extra = poll_extra;
|
||||
void brt_loader_set_poll_function(brtloader_poll_callback p, brt_loader_poll_func poll_function, void *poll_extra) {
|
||||
p->poll_function = poll_function;
|
||||
p->poll_extra = poll_extra;
|
||||
};
|
||||
|
||||
int brt_loader_call_poll_function(BRTLOADER bl, float progress) {
|
||||
int brt_loader_call_poll_function(brtloader_poll_callback p, float progress) {
|
||||
int r = 0;
|
||||
if (bl->poll_callback.poll_function)
|
||||
r = bl->poll_callback.poll_function(bl->poll_callback.poll_extra, progress);
|
||||
if (p->poll_function)
|
||||
r = p->poll_function(p->poll_extra, progress);
|
||||
return r;
|
||||
}
|
||||
|
||||
|
|
|
@ -55,8 +55,8 @@ static int pqueue_compare(pqueue_t *q, DBT *next_key, DBT *next_val, DBT *curr_k
|
|||
int r = q->compare(q->db, next_key, curr_key);
|
||||
if ( r == 0 ) { // duplicate key : next_key == curr_key
|
||||
q->dup_error = 1;
|
||||
if (q->error_callback->set_error_and_callback)
|
||||
q->error_callback->set_error_and_callback(q->error_callback->bl, DB_KEYEXIST, q->db, q->which_db, next_key, next_val);
|
||||
if (q->error_callback)
|
||||
brt_loader_set_error_and_callback(q->error_callback, DB_KEYEXIST, q->db, q->which_db, next_key, next_val);
|
||||
}
|
||||
return ( r > -1 );
|
||||
}
|
||||
|
|
|
@ -102,8 +102,8 @@ static void test_write_dbfile (char *template, int n, char *output_name) {
|
|||
|
||||
toku_brt_loader_set_n_rows(&bl, n);
|
||||
|
||||
brt_loader_init_error_callback(&bl);
|
||||
brt_loader_set_error_function(&bl, err_cb, NULL);
|
||||
brt_loader_init_error_callback(&bl.error_callback);
|
||||
brt_loader_set_error_function(&bl.error_callback, err_cb, NULL);
|
||||
r = brt_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints, 0); CKERR(r);
|
||||
destroy_rowset(&aset);
|
||||
|
||||
|
@ -163,7 +163,7 @@ static void test_write_dbfile (char *template, int n, char *output_name) {
|
|||
// walk a cursor through the dbfile and verify the rows
|
||||
verify_dbfile(n, output_name);
|
||||
|
||||
brt_loader_destroy_error_callback(&bl);
|
||||
brt_loader_destroy_error_callback(&bl.error_callback);
|
||||
}
|
||||
|
||||
/* Test to see if we can open temporary files. */
|
||||
|
|
|
@ -64,11 +64,11 @@ static void test_merge_internal (int a[], int na, int b[], int nb, BOOL dups) {
|
|||
struct row *MALLOC_N(na+nb, cr);
|
||||
DB *dest_db = NULL;
|
||||
struct brtloader_s bl;
|
||||
brt_loader_init_error_callback(&bl);
|
||||
brt_loader_set_error_function(&bl, dups ? expect_dups_cb : err_cb, NULL);
|
||||
brt_loader_init_error_callback(&bl.error_callback);
|
||||
brt_loader_set_error_function(&bl.error_callback, dups ? expect_dups_cb : err_cb, NULL);
|
||||
struct rowset rs = {.data=(char*)ab};
|
||||
merge_row_arrays_base(cr, ar, na, br, nb, 0, dest_db, compare_ints, &bl, &rs);
|
||||
brt_loader_call_error_function(&bl);
|
||||
brt_loader_call_error_function(&bl.error_callback);
|
||||
if (dups) {
|
||||
assert(founddup);
|
||||
} else {
|
||||
|
@ -95,7 +95,7 @@ static void test_merge_internal (int a[], int na, int b[], int nb, BOOL dups) {
|
|||
toku_free(ar);
|
||||
toku_free(br);
|
||||
toku_free(ab);
|
||||
brt_loader_destroy_error_callback(&bl);
|
||||
brt_loader_destroy_error_callback(&bl.error_callback);
|
||||
}
|
||||
|
||||
/* Test the basic merger. */
|
||||
|
@ -295,7 +295,7 @@ static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], c
|
|||
fill_rowset(&bset, b_keys, b_vals, 4);
|
||||
toku_brt_loader_set_n_rows(&bl, 6+3);
|
||||
|
||||
brt_loader_set_error_function(&bl, err_cb, NULL);
|
||||
brt_loader_set_error_function(&bl.error_callback, err_cb, NULL);
|
||||
r = brt_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints, 0); CKERR(r);
|
||||
r = brt_loader_sort_and_write_rows(&bset, &fs, &bl, 0, dest_db, compare_ints, 0); CKERR(r);
|
||||
assert(fs.n_temp_files==2 && fs.n_temp_files_limit >= fs.n_temp_files);
|
||||
|
|
|
@ -35,11 +35,6 @@ static void err_cb(DB *db, int which_db, int err, DBT *key, DBT *val, void *extr
|
|||
if (verbose) printf("err_cb : key <%d> val <%d>\n", *(int *)key->data, *(int *)val->data);
|
||||
}
|
||||
|
||||
static int err_cb_set_error_and_callback(BRTLOADER bl, int error, DB *db, int which_db, DBT *key, DBT *val) {
|
||||
bl->error_callback.error_callback(db, which_db, error, key, val, bl->error_callback.extra);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int run_test(void)
|
||||
{
|
||||
const int n_sources=10;
|
||||
|
@ -49,13 +44,9 @@ static int run_test(void)
|
|||
DB *dest_db = NULL;
|
||||
brt_compare_func compare = test_compare;
|
||||
int r;
|
||||
struct error_callback_s error_callback = {
|
||||
.error_callback = err_cb,
|
||||
.extra = NULL,
|
||||
.db = NULL,
|
||||
.which_db = 0,
|
||||
.set_error_and_callback = err_cb_set_error_and_callback
|
||||
};
|
||||
struct error_callback_s error_callback;
|
||||
brt_loader_init_error_callback(&error_callback);
|
||||
brt_loader_set_error_function(&error_callback, err_cb, NULL);
|
||||
|
||||
r = pqueue_init(&pq, n_sources, 0, dest_db, compare, &error_callback);
|
||||
if (r) return r;
|
||||
|
@ -84,7 +75,9 @@ static int run_test(void)
|
|||
for (int i=0; i<n_sources; i++) {
|
||||
r = pqueue_pop(pq, &node); assert(r==0);
|
||||
if (verbose) printf("%d : %d\n", i, *(int*)(node->key->data));
|
||||
if ( *(int*)(node->key->data) != i ) { if (verbose) printf("FAIL\n"); return -1; }
|
||||
if ( *(int*)(node->key->data) != i ) {
|
||||
if (verbose) printf("FAIL\n"); return -1;
|
||||
}
|
||||
}
|
||||
pqueue_free(pq);
|
||||
if (verbose) printf("test1 : PASS\n");
|
||||
|
@ -172,8 +165,11 @@ found_duplicate6:
|
|||
if ( found_dup != 6 ) { printf("FAIL\n"); return -1; }
|
||||
if (verbose) printf("test3 : PASS\n");
|
||||
pqueue_free(pq);
|
||||
brt_loader_destroy_error_callback(&error_callback);
|
||||
|
||||
// test 4 - find duplicate when inserting
|
||||
brt_loader_init_error_callback(&error_callback);
|
||||
brt_loader_set_error_function(&error_callback, err_cb, NULL);
|
||||
r = pqueue_init(&pq, 10, 0, dest_db, compare, &error_callback); if (r) return r;
|
||||
|
||||
found_dup = -1;
|
||||
|
@ -211,6 +207,7 @@ found_duplicate0:
|
|||
if (verbose) printf("PASS\n");
|
||||
pqueue_free(pq);
|
||||
toku_free(pq_nodes);
|
||||
brt_loader_destroy_error_callback(&error_callback);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue