From f39252351425e004d039fcc4a07f59eeafa83cab Mon Sep 17 00:00:00 2001 From: Zardosht Kasheff Date: Tue, 16 Apr 2013 23:59:46 -0400 Subject: [PATCH] [t:3436], [t:3782], merge ydb level bulk fetch to main line git-svn-id: file:///svn/toku/tokudb@33770 c7de825b-a66e-492c-adef-691d508d4ae1 --- buildheader/db.h_4_1 | 1 + buildheader/db.h_4_3 | 1 + buildheader/db.h_4_4 | 1 + buildheader/db.h_4_5 | 1 + buildheader/db.h_4_6 | 1 + buildheader/make_db_h.c | 2 + buildheader/tdb.h | 1 + db-benchmark-test/scanscan.c | 5 +- include/db.h | 1 + newbrt/brt-internal.h | 6 - newbrt/brt-test-helpers.c | 2 +- newbrt/brt-verify.c | 2 +- newbrt/brt.c | 444 +++++++++++---------------------- newbrt/cachetable.c | 6 +- newbrt/logger.c | 2 +- newbrt/omt.c | 165 +----------- newbrt/omt.h | 105 +------- newbrt/recover.c | 8 +- newbrt/roll.c | 4 +- newbrt/rollback.c | 24 +- newbrt/tests/omt-cursor-test.c | 118 --------- newbrt/tests/omt-test.c | 219 +--------------- newbrt/txn.c | 18 +- newbrt/ule.c | 6 +- src/lock_tree/locktree.c | 6 +- src/range_tree/log_nooverlap.c | 16 +- src/tests/test_bulk_fetch.c | 153 ++++++++++++ src/ydb.c | 28 +-- 28 files changed, 382 insertions(+), 964 deletions(-) delete mode 100644 newbrt/tests/omt-cursor-test.c create mode 100644 src/tests/test_bulk_fetch.c diff --git a/buildheader/db.h_4_1 b/buildheader/db.h_4_1 index a47a6b93017..05bc39481d7 100644 --- a/buildheader/db.h_4_1 +++ b/buildheader/db.h_4_1 @@ -290,6 +290,7 @@ typedef enum { #define TOKUDB_UPGRADE_FAILURE -100011 #define TOKUDB_TRY_AGAIN -100012 #define TOKUDB_NEEDS_REPAIR -100013 +#define TOKUDB_CURSOR_CONTINUE -100014 /* LOADER flags */ #define LOADER_USE_PUTS 1 typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val); diff --git a/buildheader/db.h_4_3 b/buildheader/db.h_4_3 index 91a107fca67..ea2e9761e2b 100644 --- a/buildheader/db.h_4_3 +++ b/buildheader/db.h_4_3 @@ -292,6 +292,7 @@ typedef enum { #define TOKUDB_UPGRADE_FAILURE -100011 #define TOKUDB_TRY_AGAIN -100012 #define TOKUDB_NEEDS_REPAIR -100013 +#define TOKUDB_CURSOR_CONTINUE -100014 /* LOADER flags */ #define LOADER_USE_PUTS 1 typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val); diff --git a/buildheader/db.h_4_4 b/buildheader/db.h_4_4 index 2eb2714ad1e..b75ad826a2a 100644 --- a/buildheader/db.h_4_4 +++ b/buildheader/db.h_4_4 @@ -292,6 +292,7 @@ typedef enum { #define TOKUDB_UPGRADE_FAILURE -100011 #define TOKUDB_TRY_AGAIN -100012 #define TOKUDB_NEEDS_REPAIR -100013 +#define TOKUDB_CURSOR_CONTINUE -100014 /* LOADER flags */ #define LOADER_USE_PUTS 1 typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val); diff --git a/buildheader/db.h_4_5 b/buildheader/db.h_4_5 index afc02a0adcb..d19daaf48d6 100644 --- a/buildheader/db.h_4_5 +++ b/buildheader/db.h_4_5 @@ -292,6 +292,7 @@ typedef enum { #define TOKUDB_UPGRADE_FAILURE -100011 #define TOKUDB_TRY_AGAIN -100012 #define TOKUDB_NEEDS_REPAIR -100013 +#define TOKUDB_CURSOR_CONTINUE -100014 /* LOADER flags */ #define LOADER_USE_PUTS 1 typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val); diff --git a/buildheader/db.h_4_6 b/buildheader/db.h_4_6 index 85dc991fbf9..b8540fbb44c 100644 --- a/buildheader/db.h_4_6 +++ b/buildheader/db.h_4_6 @@ -293,6 +293,7 @@ typedef enum { #define TOKUDB_UPGRADE_FAILURE -100011 #define TOKUDB_TRY_AGAIN -100012 #define TOKUDB_NEEDS_REPAIR -100013 +#define TOKUDB_CURSOR_CONTINUE -100014 /* LOADER flags */ #define LOADER_USE_PUTS 1 typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val); diff --git a/buildheader/make_db_h.c b/buildheader/make_db_h.c index 682e5b4b94e..86a2af300b1 100644 --- a/buildheader/make_db_h.c +++ b/buildheader/make_db_h.c @@ -75,6 +75,7 @@ enum { TOKUDB_UPGRADE_FAILURE = -100011, TOKUDB_TRY_AGAIN = -100012, TOKUDB_NEEDS_REPAIR = -100013, + TOKUDB_CURSOR_CONTINUE = -100014, }; static void print_defines (void) { @@ -226,6 +227,7 @@ static void print_defines (void) { dodefine(TOKUDB_UPGRADE_FAILURE); dodefine(TOKUDB_TRY_AGAIN); dodefine(TOKUDB_NEEDS_REPAIR); + dodefine(TOKUDB_CURSOR_CONTINUE); /* LOADER flags */ printf("/* LOADER flags */\n"); diff --git a/buildheader/tdb.h b/buildheader/tdb.h index a2b9cb61a01..c2de761bb9f 100644 --- a/buildheader/tdb.h +++ b/buildheader/tdb.h @@ -293,6 +293,7 @@ typedef enum { #define TOKUDB_UPGRADE_FAILURE -100011 #define TOKUDB_TRY_AGAIN -100012 #define TOKUDB_NEEDS_REPAIR -100013 +#define TOKUDB_CURSOR_CONTINUE -100014 /* LOADER flags */ #define LOADER_USE_PUTS 1 typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val); diff --git a/db-benchmark-test/scanscan.c b/db-benchmark-test/scanscan.c index 8927617e1c8..b691f21eadd 100644 --- a/db-benchmark-test/scanscan.c +++ b/db-benchmark-test/scanscan.c @@ -283,7 +283,8 @@ static int counttotalbytes (DBT const *key, DBT const *data, void *extrav) { printf("%s:%d %"PRIu64" %"PRIu64"\n", __FUNCTION__, __LINE__, k, expect_key); expect_key = k + 1; } - return 0; + return TOKUDB_CURSOR_CONTINUE; + //return 0; } static void scanscan_lwc (void) { @@ -306,7 +307,7 @@ static void scanscan_lwc (void) { rowcounter++; if (limitcount>0 && rowcounter>=limitcount) break; } - assert(r==DB_NOTFOUND); + //assert(r==DB_NOTFOUND); r = dbc->c_close(dbc); assert(r==0); double thistime = gettime(); double tdiff = thistime-prevtime; diff --git a/include/db.h b/include/db.h index a2b9cb61a01..c2de761bb9f 100644 --- a/include/db.h +++ b/include/db.h @@ -293,6 +293,7 @@ typedef enum { #define TOKUDB_UPGRADE_FAILURE -100011 #define TOKUDB_TRY_AGAIN -100012 #define TOKUDB_NEEDS_REPAIR -100013 +#define TOKUDB_CURSOR_CONTINUE -100014 /* LOADER flags */ #define LOADER_USE_PUTS 1 typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val); diff --git a/newbrt/brt-internal.h b/newbrt/brt-internal.h index 4e89808b375..6bcc1599cf5 100644 --- a/newbrt/brt-internal.h +++ b/newbrt/brt-internal.h @@ -429,7 +429,6 @@ struct brt_header { unsigned int flags; DESCRIPTOR_S descriptor; - u_int64_t root_put_counter; // the generation number of the brt BLOCK_TABLE blocktable; // If a transaction created this BRT, which one? @@ -549,8 +548,6 @@ struct brt_cursor_leaf_info_to_be { // Values to be used to pin a leaf for shortcut searches struct brt_cursor_leaf_info { - BLOCKNUM blocknumber; - u_int32_t fullhash; struct brt_cursor_leaf_info_to_be to_be; }; @@ -558,13 +555,10 @@ struct brt_cursor_leaf_info { struct brt_cursor { struct toku_list cursors_link; BRT brt; - BOOL current_in_omt; BOOL prefetching; DBT key, val; // The key-value pair that the cursor currently points to DBT range_lock_left_key, range_lock_right_key; BOOL left_is_neg_infty, right_is_pos_infty; - OMTCURSOR omtcursor; - u_int64_t root_put_counter; // what was the count on the BRT when we validated the cursor? TXNID oldest_living_xid;// what was the oldest live txnid when we created the cursor? BOOL is_snapshot_read; // true if query is read_committed, false otherwise BOOL is_leaf_mode; diff --git a/newbrt/brt-test-helpers.c b/newbrt/brt-test-helpers.c index d3233e74919..19c8fb4e0d2 100644 --- a/newbrt/brt-test-helpers.c +++ b/newbrt/brt-test-helpers.c @@ -142,7 +142,7 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke struct cmd_leafval_heaviside_extra be = {brt, &keydbt}; - r = toku_omt_find_zero(BLB_BUFFER(node, 0), toku_cmd_leafval_heaviside, &be, &storeddatav, &idx, NULL); + r = toku_omt_find_zero(BLB_BUFFER(node, 0), toku_cmd_leafval_heaviside, &be, &storeddatav, &idx); if (r==0) { diff --git a/newbrt/brt-verify.c b/newbrt/brt-verify.c index 6531c1de8e0..11acf8ffb0e 100644 --- a/newbrt/brt-verify.c +++ b/newbrt/brt-verify.c @@ -83,7 +83,7 @@ verify_msg_in_child_buffer(BRT brt, int type, MSN msn, bytevec key, ITEMLEN keyl static LEAFENTRY get_ith_leafentry (BASEMENTNODE bn, int i) { OMTVALUE le_v; - int r = toku_omt_fetch(bn->buffer, i, &le_v, NULL); + int r = toku_omt_fetch(bn->buffer, i, &le_v); invariant(r == 0); // this is a bad failure if it happens. return (LEAFENTRY)le_v; } diff --git a/newbrt/brt.c b/newbrt/brt.c index a978759a5ae..bd491fb3e6f 100644 --- a/newbrt/brt.c +++ b/newbrt/brt.c @@ -143,14 +143,6 @@ toku_brt_header_suppress_rollbacks(struct brt_header *h, TOKUTXN txn) { h->root_that_created_or_locked_when_empty = rootid; } -static void brt_cursor_invalidate(BRT_CURSOR brtcursor); - -// We invalidate all the OMTCURSORS any time we push into the root of the BRT for that OMT. -// We keep a counter on each brt header, but if the brt header is evicted from the cachetable -// then we lose that counter. So we also keep a global counter. -// An alternative would be to keep only the global counter. But that would invalidate all OMTCURSORS -// even from unrelated BRTs. This way we only invalidate an OMTCURSOR if -static u_int64_t global_root_put_counter = 0; enum reactivity { RE_STABLE, RE_FUSIBLE, RE_FISSIBLE }; @@ -509,7 +501,7 @@ toku_verify_estimates (BRT t, BRTNODE node, ANCESTORS ancestors, struct pivot_bo static LEAFENTRY fetch_from_buf (OMT omt, u_int32_t idx) { OMTVALUE v = 0; - int r = toku_omt_fetch(omt, idx, &v, NULL); + int r = toku_omt_fetch(omt, idx, &v); assert_zero(r); return (LEAFENTRY)v; } @@ -1167,7 +1159,7 @@ brtleaf_disk_size(BRTNODE node) for (j=0; j < n_leafentries; j++) { OMTVALUE v; LEAFENTRY curr_le = NULL; - int r = toku_omt_fetch(curr_buffer, j, &v, NULL); + int r = toku_omt_fetch(curr_buffer, j, &v); curr_le = v; assert_zero(r); retval += leafentry_disksize(curr_le); @@ -1195,7 +1187,7 @@ brtleaf_get_split_loc( for (j=0; j < n_leafentries; j++) { LEAFENTRY curr_le = NULL; OMTVALUE v; - int r = toku_omt_fetch(curr_buffer, j, &v, NULL); + int r = toku_omt_fetch(curr_buffer, j, &v); curr_le = v; assert_zero(r); size_so_far += leafentry_disksize(curr_le); @@ -1376,7 +1368,7 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, if (splitk) { memset(splitk, 0, sizeof *splitk); OMTVALUE lev = 0; - int r=toku_omt_fetch(BLB_BUFFER(node, split_node), toku_omt_size(BLB_BUFFER(node, split_node))-1, &lev, NULL); + int r=toku_omt_fetch(BLB_BUFFER(node, split_node), toku_omt_size(BLB_BUFFER(node, split_node))-1, &lev); assert_zero(r); // that fetch should have worked. LEAFENTRY le=lev; splitk->size = le_keylen(le); @@ -1903,7 +1895,7 @@ brt_leaf_put_cmd ( *made_change = 1; if (doing_seqinsert) { idx = toku_omt_size(bn->buffer); - r = toku_omt_fetch(bn->buffer, idx-1, &storeddatav, NULL); + r = toku_omt_fetch(bn->buffer, idx-1, &storeddatav); if (r != 0) goto fz; storeddata = storeddatav; int cmp = toku_cmd_leafval_heaviside(storeddata, &be); @@ -1912,7 +1904,7 @@ brt_leaf_put_cmd ( } else { fz: r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be, - &storeddatav, &idx, NULL); + &storeddatav, &idx); } if (r==DB_NOTFOUND) { storeddata = 0; @@ -1944,7 +1936,7 @@ brt_leaf_put_cmd ( // Apply to all the matches r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be, - &storeddatav, &idx, NULL); + &storeddatav, &idx); if (r == DB_NOTFOUND) break; assert(r==0); storeddata=storeddatav; @@ -1969,7 +1961,7 @@ brt_leaf_put_cmd ( assert(idx <= num_leafentries_after); if (idx == num_leafentries_after) break; //Reached the end of the leaf - r = toku_omt_fetch(bn->buffer, idx, &storeddatav, NULL); + r = toku_omt_fetch(bn->buffer, idx, &storeddatav); assert_zero(r); } storeddata=storeddatav; @@ -1995,7 +1987,7 @@ brt_leaf_put_cmd ( // Apply to all leafentries omt_size = toku_omt_size(bn->buffer); for (u_int32_t idx = 0; idx < omt_size; ) { - r = toku_omt_fetch(bn->buffer, idx, &storeddatav, NULL); + r = toku_omt_fetch(bn->buffer, idx, &storeddatav); assert_zero(r); storeddata=storeddatav; int deleted = 0; @@ -2022,7 +2014,7 @@ brt_leaf_put_cmd ( // Apply to all leafentries if txn is represented omt_size = toku_omt_size(bn->buffer); for (u_int32_t idx = 0; idx < omt_size; ) { - r = toku_omt_fetch(bn->buffer, idx, &storeddatav, NULL); + r = toku_omt_fetch(bn->buffer, idx, &storeddatav); assert_zero(r); storeddata=storeddatav; int deleted = 0; @@ -2047,7 +2039,7 @@ brt_leaf_put_cmd ( case BRT_UPDATE: { u_int32_t idx; r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be, - &storeddatav, &idx, NULL); + &storeddatav, &idx); if (r==DB_NOTFOUND) { r = do_update(t, bn, se, cmd, idx, NULL, logger, made_change, workdone); } else if (r==0) { @@ -2061,7 +2053,7 @@ brt_leaf_put_cmd ( u_int32_t idx = 0; u_int32_t num_leafentries_before; while (idx < (num_leafentries_before = toku_omt_size(bn->buffer))) { - r = toku_omt_fetch(bn->buffer, idx, &storeddatav, NULL); + r = toku_omt_fetch(bn->buffer, idx, &storeddatav); assert(r==0); storeddata=storeddatav; r = do_update(t, bn, se, cmd, idx, storeddata, logger, made_change, workdone); @@ -3156,7 +3148,6 @@ toku_brt_root_put_cmd (BRT brt, BRT_MSG_S * cmd) //assert(0==toku_cachetable_assert_all_unpinned(brt->cachetable)); assert(brt->h); - brt->h->root_put_counter = global_root_put_counter++; u_int32_t fullhash; rootp = toku_calculate_root_offset_pointer(brt, &fullhash); @@ -3721,8 +3712,6 @@ brt_init_header_partial (BRT t, TOKUTXN txn) { compute_and_fill_remembered_hash(t); - t->h->root_put_counter = global_root_put_counter++; - BLOCKNUM root = t->h->root; if ((r=setup_initial_brt_root_node(t, root))!=0) { return r; } //printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, t->h, 0); @@ -3815,7 +3804,6 @@ int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, LSN max_acceptabl } if (r!=0) return r; h->cf = cf; - h->root_put_counter = global_root_put_counter++; toku_cachefile_set_userdata(cf, (void*)h, brtheader_log_fassociate_during_checkpoint, @@ -4154,7 +4142,6 @@ brt_redirect_cursors (BRT brt_to, BRT brt_from) { while (!toku_list_empty(&brt_from->cursors)) { struct toku_list * c_list = toku_list_head(&brt_from->cursors); BRT_CURSOR c = toku_list_struct(c_list, struct brt_cursor, cursors_link); - brt_cursor_invalidate(c); toku_list_remove(&c->cursors_link); @@ -4799,12 +4786,10 @@ int toku_brt_flush (BRT brt) { static inline void brt_cursor_cleanup_dbts(BRT_CURSOR c) { - if (!c->current_in_omt) { - if (c->key.data) toku_free(c->key.data); - if (c->val.data) toku_free(c->val.data); - memset(&c->key, 0, sizeof(c->key)); - memset(&c->val, 0, sizeof(c->val)); - } + if (c->key.data) toku_free(c->key.data); + if (c->val.data) toku_free(c->val.data); + memset(&c->key, 0, sizeof(c->key)); + memset(&c->val, 0, sizeof(c->val)); } // @@ -4834,14 +4819,13 @@ int does_txn_read_entry(TXNID id, TOKUTXN context) { return rval; } -static inline int brt_cursor_extract_key_and_val( +static inline void brt_cursor_extract_key_and_val( LEAFENTRY le, BRT_CURSOR cursor, u_int32_t *keylen, void **key, u_int32_t *vallen, void **val) { - int r = 0; if (toku_brt_cursor_is_leaf_mode(cursor)) { *key = le_key_and_len(le, keylen); *val = le; @@ -4859,53 +4843,6 @@ static inline int brt_cursor_extract_key_and_val( *key = le_key_and_len(le, keylen); *val = le_latest_val_and_len(le, vallen); } - return r; -} - -static inline void load_dbts_from_omt(BRT_CURSOR c, DBT *key, DBT *val) { - OMTVALUE le = 0; - int r = toku_omt_cursor_current(c->omtcursor, &le); - assert_zero(r); - r = brt_cursor_extract_key_and_val(le, - c, - &key->size, - &key->data, - &val->size, - &val->data); - assert_zero(r); -} - -// When an omt cursor is invalidated, this is the brt-level function -// that is called. This function is only called by the omt logic. -// This callback is called when either (a) the brt logic invalidates one -// cursor (see brt_cursor_invalidate()) or (b) when the omt logic invalidates -// all the cursors for an omt. -static void -brt_cursor_invalidate_callback(OMTCURSOR UU(omt_c), void *extra) { - BRT_CURSOR cursor = extra; - - //TODO: #1378 assert that this thread owns omt lock in brtcursor - - if (cursor->current_in_omt) { - DBT key,val; - load_dbts_from_omt(cursor, toku_init_dbt(&key), toku_init_dbt(&val)); - cursor->key.data = toku_memdup(key.data, key.size); - cursor->val.data = toku_memdup(val.data, val.size); - cursor->key.size = key.size; - cursor->val.size = val.size; - //TODO: Find some way to deal with ENOMEM here. - //Until then, just assert that the memdups worked. - assert(cursor->key.data && cursor->val.data); - cursor->current_in_omt = FALSE; - } -} - -// Called at start of every slow query, and only from slow queries. -// When all cursors are invalidated (from writer thread, or insert/delete), -// this function is not used. -static void -brt_cursor_invalidate(BRT_CURSOR brtcursor) { - toku_omt_cursor_invalidate(brtcursor->omtcursor); // will call brt_cursor_invalidate_callback() } int toku_brt_cursor ( @@ -4929,7 +4866,6 @@ int toku_brt_cursor ( return ENOMEM; memset(cursor, 0, sizeof(*cursor)); cursor->brt = brt; - cursor->current_in_omt = FALSE; cursor->prefetching = FALSE; toku_init_dbt(&cursor->range_lock_left_key); toku_init_dbt(&cursor->range_lock_right_key); @@ -4940,11 +4876,6 @@ int toku_brt_cursor ( cursor->is_leaf_mode = FALSE; cursor->ttxn = ttxn; toku_list_push(&brt->cursors, &cursor->cursors_link); - int r = toku_omt_cursor_create(&cursor->omtcursor); - assert_zero(r); - toku_omt_cursor_set_invalidate_callback(cursor->omtcursor, - brt_cursor_invalidate_callback, cursor); - cursor->root_put_counter=0; *cursorptr = cursor; return 0; } @@ -4986,19 +4917,9 @@ toku_brt_cursor_set_range_lock(BRT_CURSOR cursor, const DBT *left, const DBT *ri } } -// Called during cursor destruction -// It is the same as brt_cursor_invalidate, except that -// we make sure the callback function is never called. -static void -brt_cursor_invalidate_no_callback(BRT_CURSOR brtcursor) { - toku_omt_cursor_set_invalidate_callback(brtcursor->omtcursor, NULL, NULL); - toku_omt_cursor_invalidate(brtcursor->omtcursor); // will NOT call brt_cursor_invalidate_callback() -} - //TODO: #1378 When we split the ydb lock, touching cursor->cursors_link //is not thread safe. int toku_brt_cursor_close(BRT_CURSOR cursor) { - brt_cursor_invalidate_no_callback(cursor); brt_cursor_cleanup_dbts(cursor); if (cursor->range_lock_left_key.data) { toku_free(cursor->range_lock_left_key.data); @@ -5009,7 +4930,6 @@ int toku_brt_cursor_close(BRT_CURSOR cursor) { toku_destroy_dbt(&cursor->range_lock_right_key); } toku_list_remove(&cursor->cursors_link); - toku_omt_cursor_destroy(&cursor->omtcursor); toku_free_n(cursor, sizeof *cursor); return 0; } @@ -5026,7 +4946,7 @@ static inline BOOL brt_cursor_prefetching(BRT_CURSOR cursor) { static BOOL brt_cursor_not_set(BRT_CURSOR cursor) { assert((cursor->key.data==NULL) == (cursor->val.data==NULL)); - return (BOOL)(!cursor->current_in_omt && cursor->key.data == NULL); + return (BOOL)(cursor->key.data == NULL); } static int @@ -5056,30 +4976,6 @@ heaviside_from_search_t (OMTVALUE lev, void *extra) { } -// This is the only function that associates a brt cursor (and its contained -// omt cursor) with a brt node (and its associated omt). This is different -// from older code because the old code associated the omt cursor with the -// omt when the search found a match. In this new design, the omt cursor -// will not be associated with the omt until after the application-level -// callback accepts the search result. -// The lock is necessary because we don't want two threads modifying -// the omt's list of cursors simultaneously. -// Note, this is only place in brt code that calls toku_omt_cursor_set_index(). -// Requires: cursor->omtcursor is valid -static inline void -brt_cursor_update(BRT_CURSOR brtcursor) { - //Free old version if it is using local memory. - OMTCURSOR omtcursor = brtcursor->omtcursor; - if (!brtcursor->current_in_omt) { - brt_cursor_cleanup_dbts(brtcursor); - brtcursor->current_in_omt = TRUE; - toku_omt_cursor_associate(brtcursor->leaf_info.to_be.omt, omtcursor); - //no longer touching linked list, and - //only one thread can touch cursor at a time, protected by ydb lock - } - toku_omt_cursor_set_index(omtcursor, brtcursor->leaf_info.to_be.index); -} - // // Returns true if the value that is to be read is empty. // @@ -5402,6 +5298,20 @@ exit: VERIFY_NODE(t, node); } + +static int +brt_cursor_shortcut ( + BRT_CURSOR cursor, + int direction, + BRT_GET_CALLBACK_FUNCTION getf, + void *getf_v, + u_int32_t *keylen, + void **key, + u_int32_t *vallen, + void **val + ); + + // This is a bottom layer of the search functions. static int brt_search_basement_node( @@ -5410,9 +5320,8 @@ brt_search_basement_node( BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, BOOL *doprefetch, - BLOCKNUM thisnodename, - u_int32_t fullhash, - BRT_CURSOR brtcursor + BRT_CURSOR brtcursor, + BOOL can_bulk_fetch ) { assert(bn->max_dsn_applied.dsn >= MIN_DSN.dsn); @@ -5432,7 +5341,7 @@ brt_search_basement_node( heaviside_from_search_t, search, direction, - &datav, &idx, NULL); + &datav, &idx); if (r!=0) return r; LEAFENTRY le = datav; @@ -5454,7 +5363,7 @@ brt_search_basement_node( default: assert(FALSE); } - r = toku_omt_fetch(bn->buffer, idx, &datav, NULL); + r = toku_omt_fetch(bn->buffer, idx, &datav); assert_zero(r); // we just validated the index le = datav; if (!is_le_val_del(le,brtcursor)) goto got_a_good_value; @@ -5467,35 +5376,42 @@ got_a_good_value: u_int32_t vallen; void *val; - r = brt_cursor_extract_key_and_val(le, - brtcursor, - &keylen, - &key, - &vallen, - &val); + brt_cursor_extract_key_and_val(le, + brtcursor, + &keylen, + &key, + &vallen, + &val + ); - assert(brtcursor->current_in_omt == FALSE); - if (r==0) { - r = getf(keylen, key, vallen, val, getf_v); - } - if (r==0) { - // Leave the omtcursor alone above (pass NULL to omt_find/fetch) - // This prevents the omt from calling associate(), which would - // require a lock to keep the list of cursors safe when the omt - // is used by the brt. (We don't want to impose the locking requirement - // on the omt for non-brt uses.) - // - // Instead, all associating of omtcursors with omts (for leaf nodes) - // is done in brt_cursor_update. + r = getf(keylen, key, vallen, val, getf_v); + if (r==0 || r == TOKUDB_CURSOR_CONTINUE) { brtcursor->leaf_info.to_be.omt = bn->buffer; brtcursor->leaf_info.to_be.index = idx; - brtcursor->leaf_info.fullhash = fullhash; - brtcursor->leaf_info.blocknumber = thisnodename; - brt_cursor_update(brtcursor); + + if (r == TOKUDB_CURSOR_CONTINUE && can_bulk_fetch) { + r = brt_cursor_shortcut( + brtcursor, + direction, + getf, + getf_v, + &keylen, + &key, + &vallen, + &val + ); + } + + brt_cursor_cleanup_dbts(brtcursor); + brtcursor->key.data = toku_memdup(key, keylen); + brtcursor->val.data = toku_memdup(val, vallen); + brtcursor->key.size = keylen; + brtcursor->val.size = vallen; //The search was successful. Prefetching can continue. *doprefetch = TRUE; } } + if (r == TOKUDB_CURSOR_CONTINUE) r = 0; return r; } @@ -5511,7 +5427,8 @@ brt_search_node ( BRT_CURSOR brtcursor, UNLOCKERS unlockers, ANCESTORS, - struct pivot_bounds const * const bounds + struct pivot_bounds const * const bounds, + BOOL can_bulk_fetch ); // the number of nodes to prefetch @@ -5591,7 +5508,7 @@ unlock_brtnode_fun (void *v) { /* search in a node's child */ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, BOOL *doprefetch, BRT_CURSOR brtcursor, UNLOCKERS unlockers, - ANCESTORS ancestors, struct pivot_bounds const * const bounds) + ANCESTORS ancestors, struct pivot_bounds const * const bounds, BOOL can_bulk_fetch) // Effect: Search in a node's child. Searches are read-only now (at least as far as the hardcopy is concerned). { struct ancestors next_ancestors = {node, childnum, ancestors}; @@ -5624,7 +5541,7 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_ struct unlock_brtnode_extra unlock_extra = {brt,childnode}; struct unlockers next_unlockers = {TRUE, unlock_brtnode_fun, (void*)&unlock_extra, unlockers}; - int r = brt_search_node(brt, childnode, search, bfe.child_to_read, getf, getf_v, doprefetch, brtcursor, &next_unlockers, &next_ancestors, bounds); + int r = brt_search_node(brt, childnode, search, bfe.child_to_read, getf, getf_v, doprefetch, brtcursor, &next_unlockers, &next_ancestors, bounds, can_bulk_fetch); if (r!=TOKUDB_TRY_AGAIN) { // Even if r is reactive, we want to handle the maybe reactive child. @@ -5714,7 +5631,8 @@ brt_search_node( BRT_CURSOR brtcursor, UNLOCKERS unlockers, ANCESTORS ancestors, - struct pivot_bounds const * const bounds + struct pivot_bounds const * const bounds, + BOOL can_bulk_fetch ) { int r = 0; // assert that we got a valid child_to_search @@ -5749,7 +5667,8 @@ brt_search_node( brtcursor, unlockers, ancestors, - &next_bounds + &next_bounds, + can_bulk_fetch ); } else { @@ -5759,9 +5678,8 @@ brt_search_node( getf, getf_v, doprefetch, - node->thisnodename, - node->fullhash, - brtcursor + brtcursor, + can_bulk_fetch ); } if (r == 0) return r; //Success @@ -5799,7 +5717,7 @@ brt_search_node( } static int -toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, BRT_CURSOR brtcursor, u_int64_t *root_put_counter) +toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, BRT_CURSOR brtcursor, BOOL can_bulk_fetch) // Effect: Perform a search. Associate cursor with a leaf if possible. // All searches are performed through this function. { @@ -5809,8 +5727,6 @@ toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, assert(brt->h); - *root_put_counter = brt->h->root_put_counter; - u_int32_t fullhash; CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt, &fullhash); @@ -5861,7 +5777,7 @@ toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, { BOOL doprefetch = FALSE; //static int counter = 0; counter++; - r = brt_search_node(brt, node, search, bfe.child_to_read, getf, getf_v, &doprefetch, brtcursor, &unlockers, (ANCESTORS)NULL, &infinite_bounds); + r = brt_search_node(brt, node, search, bfe.child_to_read, getf, getf_v, &doprefetch, brtcursor, &unlockers, (ANCESTORS)NULL, &infinite_bounds, can_bulk_fetch); if (r==TOKUDB_TRY_AGAIN) { // there are two cases where we get TOKUDB_TRY_AGAIN // case 1 is when some later call to toku_pin_brtnode returned @@ -5913,10 +5829,9 @@ struct brt_cursor_search_struct { /* search for the first kv pair that matches the search object */ static int -brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) +brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, BOOL can_bulk_fetch) { - brt_cursor_invalidate(cursor); - int r = toku_brt_search(cursor->brt, search, getf, getf_v, cursor, &cursor->root_put_counter); + int r = toku_brt_search(cursor->brt, search, getf, getf_v, cursor, can_bulk_fetch); return r; } @@ -5946,7 +5861,6 @@ brt_cursor_current_getf(ITEMLEN keylen, bytevec key, } else { BRT_CURSOR cursor = bcss->cursor; DBT newkey = {.size=keylen, .data=(void*)key}; // initializes other fields to zero - //Safe to access cursor->key/val because current_in_omt is FALSE if (compare_k_x(cursor->brt, &cursor->key, &newkey) != 0) { r = bcss->getf(0, NULL, 0, NULL, bcss->getf_v); // This was once DB_KEYEMPTY if (r==0) r = TOKUDB_FOUND_BUT_REJECTED; @@ -5963,14 +5877,12 @@ toku_brt_cursor_current(BRT_CURSOR cursor, int op, BRT_GET_CALLBACK_FUNCTION get if (brt_cursor_not_set(cursor)) return EINVAL; if (op == DB_CURRENT) { - brt_cursor_invalidate(cursor); struct brt_cursor_search_struct bcss = {getf, getf_v, cursor, 0}; brt_search_t search; brt_search_init(&search, brt_cursor_compare_set, BRT_SEARCH_LEFT, &cursor->key, cursor->brt); - int r = toku_brt_search(cursor->brt, &search, brt_cursor_current_getf, &bcss, cursor, &cursor->root_put_counter); + int r = toku_brt_search(cursor->brt, &search, brt_cursor_current_getf, &bcss, cursor, FALSE); brt_search_finish(&search); return r; } - brt_cursor_invalidate(cursor); return getf(cursor->key.size, cursor->key.data, cursor->val.size, cursor->val.data, getf_v); // brt_cursor_copyout(cursor, outkey, outval); } @@ -5988,7 +5900,7 @@ toku_brt_flatten(BRT brt, TOKUTXN ttxn) int r = toku_brt_cursor(brt, &tmp_cursor, ttxn, FALSE); if (r!=0) return r; brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_LEFT, 0, tmp_cursor->brt); - r = brt_cursor_search(tmp_cursor, &search, brt_flatten_getf, NULL); + r = brt_cursor_search(tmp_cursor, &search, brt_flatten_getf, NULL, FALSE); brt_search_finish(&search); if (r==DB_NOTFOUND) r = 0; { @@ -6003,7 +5915,7 @@ int toku_brt_cursor_first(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) { brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_LEFT, 0, cursor->brt); - int r = brt_cursor_search(cursor, &search, getf, getf_v); + int r = brt_cursor_search(cursor, &search, getf, getf_v, FALSE); brt_search_finish(&search); return r; } @@ -6012,8 +5924,8 @@ int toku_brt_cursor_last(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) { brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_RIGHT, 0, cursor->brt); - int r = brt_cursor_search(cursor, &search, getf, getf_v); - brt_search_finish(&search);; + int r = brt_cursor_search(cursor, &search, getf, getf_v, FALSE); + brt_search_finish(&search); return r; } @@ -6022,113 +5934,66 @@ static int brt_cursor_compare_next(brt_search_t *search, DBT *x) { return compare_k_x(brt, search->k, x) < 0; /* return min xy: kv < xy */ } -static int -brt_cursor_shortcut (BRT_CURSOR cursor, int direction, u_int32_t limit, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) { - int r; - OMTCURSOR omtcursor = cursor->omtcursor; - OMT omt = toku_omt_cursor_get_omt(omtcursor); - u_int64_t h_put_counter = cursor->brt->h->root_put_counter; - u_int64_t c_put_counter = cursor->root_put_counter; - BOOL found = FALSE; - - //Verify that no messages have been inserted - //since the last time the cursor's pointer was set. - //Also verify the omt cursor is still valid. - //(Necessary to recheck after the maybe_get_and_pin) - if (c_put_counter==h_put_counter && toku_omt_cursor_is_valid(cursor->omtcursor)) { - u_int32_t index = 0; - r = toku_omt_cursor_current_index(omtcursor, &index); - assert_zero(r); - - //Starting with the prev, find the first real (non-provdel) leafentry. - while (index != limit) { - OMTVALUE le = NULL; - index += direction; - r = toku_omt_fetch(omt, index, &le, NULL); - assert_zero(r); - - if (toku_brt_cursor_is_leaf_mode(cursor) || !is_le_val_del(le, cursor)) { - u_int32_t keylen; - void *key; - u_int32_t vallen; - void *val; - - r = brt_cursor_extract_key_and_val(le, - cursor, - &keylen, - &key, - &vallen, - &val); - - if (r==0) { - r = getf(keylen, key, vallen, val, getf_v); - } - if (r==0) { - //Update cursor. - cursor->leaf_info.to_be.index = index; - brt_cursor_update(cursor); - found = TRUE; - } - break; - } - } - if (r==0 && !found) r = DB_NOTFOUND; - } - else r = EINVAL; - - return r; -} static int -brt_cursor_maybe_get_and_pin_leaf(BRT_CURSOR brtcursor, BRTNODE* leafp) { - void *leafv; - int r = toku_cachetable_maybe_get_and_pin_clean(brtcursor->brt->cf, - brtcursor->leaf_info.blocknumber, - brtcursor->leaf_info.fullhash, - &leafv); - if (r == 0) { - *leafp = leafv; - } - return r; -} - -static void -brt_cursor_unpin_leaf(BRT_CURSOR brtcursor, BRTNODE leaf) { - toku_unpin_brtnode(brtcursor->brt, leaf); -} - - -static int -brt_cursor_next_shortcut (BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) -// Effect: If possible, increment the cursor and return the key-value pair -// (i.e., the next one from what the cursor pointed to before.) -// That is, do DB_NEXT on DUP databases, and do DB_NEXT_NODUP on NODUP databases. +brt_cursor_shortcut ( + BRT_CURSOR cursor, + int direction, + BRT_GET_CALLBACK_FUNCTION getf, + void *getf_v, + u_int32_t *keylen, + void **key, + u_int32_t *vallen, + void **val + ) { - int r; - if (toku_omt_cursor_is_valid(cursor->omtcursor)) { - BRTNODE leaf; - r = brt_cursor_maybe_get_and_pin_leaf(cursor, &leaf); - if (r == 0) { - u_int32_t limit = toku_omt_size(toku_omt_cursor_get_omt(cursor->omtcursor)) - 1; - r = brt_cursor_shortcut(cursor, 1, limit, getf, getf_v); - brt_cursor_unpin_leaf(cursor, leaf); + int r = 0; + u_int32_t index = cursor->leaf_info.to_be.index; + OMT omt = cursor->leaf_info.to_be.omt; + // if we are searching towards the end, limit is last element + // if we are searching towards the beginning, limit is the first element + u_int32_t limit = (direction > 0) ? (toku_omt_size(omt) - 1) : 0; + + //Starting with the prev, find the first real (non-provdel) leafentry. + while (index != limit) { + OMTVALUE le = NULL; + index += direction; + r = toku_omt_fetch(omt, index, &le); + assert_zero(r); + + if (toku_brt_cursor_is_leaf_mode(cursor) || !is_le_val_del(le, cursor)) { + + brt_cursor_extract_key_and_val( + le, + cursor, + keylen, + key, + vallen, + val + ); + + r = getf(*keylen, *key, *vallen, *val, getf_v); + if (r==0 || r == TOKUDB_CURSOR_CONTINUE) { + //Update cursor. + cursor->leaf_info.to_be.index = index; + } + if (r== TOKUDB_CURSOR_CONTINUE) { + continue; + } + else { + break; + } } } - else r = EINVAL; return r; } int toku_brt_cursor_next(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) { - int r; - if (brt_cursor_next_shortcut(cursor, getf, getf_v)==0) { - r = 0; - } else { - brt_search_t search; brt_search_init(&search, brt_cursor_compare_next, BRT_SEARCH_LEFT, &cursor->key, cursor->brt); - r = brt_cursor_search(cursor, &search, getf, getf_v); - brt_search_finish(&search); - } + brt_search_t search; brt_search_init(&search, brt_cursor_compare_next, BRT_SEARCH_LEFT, &cursor->key, cursor->brt); + int r = brt_cursor_search(cursor, &search, getf, getf_v, TRUE); + brt_search_finish(&search); if (r == 0) brt_cursor_set_prefetching(cursor); return r; } @@ -6158,33 +6023,11 @@ brt_cursor_search_eq_k_x_getf(ITEMLEN keylen, bytevec key, static int brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) { - brt_cursor_invalidate(cursor); struct brt_cursor_search_struct bcss = {getf, getf_v, cursor, search}; - int r = toku_brt_search(cursor->brt, search, brt_cursor_search_eq_k_x_getf, &bcss, cursor, &cursor->root_put_counter); + int r = toku_brt_search(cursor->brt, search, brt_cursor_search_eq_k_x_getf, &bcss, cursor, FALSE); return r; } -static int -brt_cursor_prev_shortcut (BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) -// Effect: If possible, decrement the cursor and return the key-value pair -// (i.e., the previous one from what the cursor pointed to before.) -// That is, do DB_PREV on DUP databases, and do DB_PREV_NODUP on NODUP databases. -{ - int r; - if (toku_omt_cursor_is_valid(cursor->omtcursor)) { - BRTNODE leaf; - r = brt_cursor_maybe_get_and_pin_leaf(cursor, &leaf); - if (r == 0) { - r = brt_cursor_shortcut(cursor, -1, 0, getf, getf_v); - brt_cursor_unpin_leaf(cursor, leaf); - } - } - else r = EINVAL; - return r; -} - - - static int brt_cursor_compare_prev(brt_search_t *search, DBT *x) { BRT brt = search->context; return compare_k_x(brt, search->k, x) > 0; /* return max xy: kv > xy */ @@ -6193,10 +6036,8 @@ static int brt_cursor_compare_prev(brt_search_t *search, DBT *x) { int toku_brt_cursor_prev(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) { - if (brt_cursor_prev_shortcut(cursor, getf, getf_v)==0) - return 0; brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev, BRT_SEARCH_RIGHT, &cursor->key, cursor->brt); - int r = brt_cursor_search(cursor, &search, getf, getf_v); + int r = brt_cursor_search(cursor, &search, getf, getf_v, TRUE); brt_search_finish(&search); return r; } @@ -6219,7 +6060,7 @@ int toku_brt_cursor_set_range(BRT_CURSOR cursor, DBT *key, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) { brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range, BRT_SEARCH_LEFT, key, cursor->brt); - int r = brt_cursor_search(cursor, &search, getf, getf_v); + int r = brt_cursor_search(cursor, &search, getf, getf_v, FALSE); brt_search_finish(&search); return r; } @@ -6233,7 +6074,7 @@ int toku_brt_cursor_set_range_reverse(BRT_CURSOR cursor, DBT *key, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) { brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range_reverse, BRT_SEARCH_RIGHT, key, cursor->brt); - int r = brt_cursor_search(cursor, &search, getf, getf_v); + int r = brt_cursor_search(cursor, &search, getf, getf_v, FALSE); brt_search_finish(&search); return r; } @@ -6284,7 +6125,6 @@ toku_brt_cursor_peek(BRT_CURSOR cursor, const DBT **pkey, const DBT **pval) // Requires: The caller must be in the context of a // BRT_GET_(STRADDLE_)CALLBACK_FUNCTION { - if (cursor->current_in_omt) load_dbts_from_omt(cursor, &cursor->key, &cursor->val); *pkey = &cursor->key; *pval = &cursor->val; } @@ -6355,12 +6195,6 @@ toku_brt_cursor_delete(BRT_CURSOR cursor, int flags, TOKUTXN txn) { r = toku_brt_cursor_current(cursor, DB_CURRENT, getf_nothing, NULL); } if (r == 0) { - //We need to have access to the (key,val) that the cursor points to. - //By invalidating the cursor we guarantee we have a local copy. - // - //If we try to use the omtcursor, there exists a race condition - //(node could be evicted), but maybe_get_and_pin() prevents delete. - brt_cursor_invalidate(cursor); r = toku_brt_delete(cursor->brt, &cursor->key, txn); } } @@ -6417,7 +6251,7 @@ static void toku_brt_keyrange_internal (BRT brt, CACHEKEY nodename, if (BP_STATE(node,i) == PT_AVAIL) { struct cmd_leafval_heaviside_extra be = {brt, key}; u_int32_t idx; - int r = toku_omt_find_zero(BLB_BUFFER(node, i), toku_cmd_leafval_heaviside, &be, 0, &idx, NULL); + int r = toku_omt_find_zero(BLB_BUFFER(node, i), toku_cmd_leafval_heaviside, &be, 0, &idx); *less += idx; *greater += toku_omt_size(BLB_BUFFER(node, i))-idx; if (r==0) { @@ -6543,7 +6377,7 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, struct kv_ if (0) for (int j=0; j=0 // If there is such an i and h(V_i,extra)==0 then set *index=i and return 0. // If there is such an i and h(V_i,extra)>0 then set *index=i and return DB_NOTFOUND. // If there is no such i then set *index=toku_omt_size(V) and return DB_NOTFOUND. // Requires: index!=NULL -int toku_omt_find(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, int direction, OMTVALUE *value, u_int32_t *idx, OMTCURSOR c); +int toku_omt_find(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, int direction, OMTVALUE *value, u_int32_t *idx); // Effect: // If direction >0 then find the smallest i such that h(V_i,extra)>0. // If direction <0 then find the largest i such that h(V_i,extra)<0. @@ -422,98 +415,6 @@ void toku_omt_clear(OMT omt); unsigned long toku_omt_memory_size (OMT omt); // Effect: Return the size (in bytes) of the omt, as it resides in main memory. Don't include any of the OMTVALUES. -int toku_omt_cursor_create (OMTCURSOR *p); -// Effect: Create an OMTCURSOR. Stores it in *p. The OMTCURSOR is -// initially invalid. -// Requires: p != NULL -// Returns: -// 0 success -// ENOMEM out of memory (and doesn't modify *omtp) -// Performance: constant time. - -void toku_omt_cursor_destroy (OMTCURSOR *p); -// Effect: Invalidates *p (if it is valid) and frees any memory -// associated with *p. -// Also sets *p=NULL. -// Rationale: The usage is to do something like -// toku_omt_cursor_destroy(&c); -// and now c will have a NULL pointer instead of a dangling freed pointer. -// Rationale: Returns no values since free() cannot fail. - -int toku_omt_cursor_is_valid (OMTCURSOR c); -// Effect: returns 0 iff c is invalid. -// Performance: time=O(1) - -int toku_omt_cursor_next (OMTCURSOR c, OMTVALUE *v); -// Effect: Increment c's offset, and find and store the value in v. -// Requires: v != NULL -// Returns -// 0 success -// EINVAL if the offset goes out of range or c is invalid. -// On nonzero return, *v is unchanged and c is invalidated. -// Performance: time=O(log N) worst case, expected time=O(1) for a randomly -// chosen initial position. - -int toku_omt_cursor_current_index(OMTCURSOR c, u_int32_t *idx); -// Effect: Stores c's offset in *index. -// Requires: index != NULL -// Returns -// 0 success -// EINVAL c is invalid -// On nonzero return, *index is unchanged and c is unchanged. -// Performance: time=O(1) - -OMT toku_omt_cursor_get_omt(OMTCURSOR c); -// Effect: returns the associated omt or NULL if not associated. -// Performance: time=O(1) - -int toku_omt_cursor_current (OMTCURSOR c, OMTVALUE *v); -// Effect: Store in v the value pointed by c's abstract offset -// Requires: v != NULL -// Returns -// 0 success -// EINVAL if c is invalid -// On non-zero return, *v is unchanged -// Performance: O(1) time - -int toku_omt_cursor_prev (OMTCURSOR c, OMTVALUE *v); -// Effect: Decrement c's offset, and find and store the value in v. -// Requires: v != NULL -// Returns -// 0 success -// EINVAL if the offset goes out of range or c is invalid. -// On nonzero return, *v is unchanged and c is invalidated. -// Performance: time=O(log N) worst case, expected time=O(1) for a randomly -// chosen initial position. - - -void toku_omt_cursor_invalidate (OMTCURSOR c); -// Effect: Invalidate c. (This does not mean that c is destroyed or -// that its memory is freed.) -// If c is valid, the invalidate callback function (if any) will be called -// before invalidating c. - -void toku_omt_cursor_set_invalidate_callback(OMTCURSOR c, void (*f)(OMTCURSOR,void*), void* extra); -// Effect: -// Saves function 'f' to be called whenever the cursor is invalidated. -// 'extra' is passed as an additional parameter to f. -// Requires: -// The lifetime of the 'extra' parameter must continue at least till the cursor -// is destroyed. - -void toku_omt_cursor_associate(OMT omt, OMTCURSOR c); -// Effect: -// Associates an omtcursor with an omt. -// Requires: -// The omtcursor is not associated with any other omt. -// Requires: -// toku_omt_associate must be called when the omt-lock is held -// Rationale: -// This is used by brt_cursors for omts representing leaf nodes. -// These omts are touched by multiple threads, and therefore require locks -// for modifying the list of omtcursors. -// We do not want to grab two locks (one for omt, and one for the old -// associated omt). #if defined(__cplusplus) || defined(__cilkplusplus) }; diff --git a/newbrt/recover.c b/newbrt/recover.c index 57e654d96ef..d9521a757c9 100644 --- a/newbrt/recover.c +++ b/newbrt/recover.c @@ -99,7 +99,7 @@ static void file_map_close_dictionaries(struct file_map *fmap, BOOL recovery_suc if (n == 0) break; OMTVALUE v; - r = toku_omt_fetch(fmap->filenums, n-1, &v, NULL); + r = toku_omt_fetch(fmap->filenums, n-1, &v); assert(r == 0); r = toku_omt_delete_at(fmap->filenums, n-1); assert(r == 0); @@ -150,7 +150,7 @@ static int file_map_insert (struct file_map *fmap, FILENUM fnum, BRT brt, char * static void file_map_remove(struct file_map *fmap, FILENUM fnum) { OMTVALUE v; u_int32_t idx; - int r = toku_omt_find_zero(fmap->filenums, file_map_h, &fnum, &v, &idx, NULL); + int r = toku_omt_find_zero(fmap->filenums, file_map_h, &fnum, &v, &idx); if (r == 0) { struct file_map_tuple *tuple = v; r = toku_omt_delete_at(fmap->filenums, idx); @@ -162,7 +162,7 @@ static void file_map_remove(struct file_map *fmap, FILENUM fnum) { // Look up file info: given FILENUM, return file_map_tuple (or DB_NOTFOUND) static int file_map_find(struct file_map *fmap, FILENUM fnum, struct file_map_tuple **file_map_tuple) { OMTVALUE v; u_int32_t idx; - int r = toku_omt_find_zero(fmap->filenums, file_map_h, &fnum, &v, &idx, NULL); + int r = toku_omt_find_zero(fmap->filenums, file_map_h, &fnum, &v, &idx); if (r == 0) { struct file_map_tuple *tuple = v; assert(tuple->filenum.fileid == fnum.fileid); @@ -1142,7 +1142,7 @@ static void recover_abort_live_txns(RECOVER_ENV renv) { if (n_live_txns == 0) break; OMTVALUE v; - r = toku_omt_fetch(renv->logger->live_txns, n_live_txns-1, &v, NULL); + r = toku_omt_fetch(renv->logger->live_txns, n_live_txns-1, &v); if (r != 0) break; TOKUTXN txn = (TOKUTXN) v; diff --git a/newbrt/roll.c b/newbrt/roll.c index 6c4347a7d89..fc2a7384b89 100644 --- a/newbrt/roll.c +++ b/newbrt/roll.c @@ -152,7 +152,7 @@ static int do_insertion (enum brt_msg_type type, FILENUM filenum, BYTESTRING key (void)toku_cachefile_get_and_pin_fd(cf); if (!toku_cachefile_is_dev_null_unlocked(cf)) { OMTVALUE brtv=NULL; - r = toku_omt_find_zero(txn->open_brts, find_brt_from_filenum, &filenum, &brtv, NULL, NULL); + r = toku_omt_find_zero(txn->open_brts, find_brt_from_filenum, &filenum, &brtv, NULL); assert(r==0); BRT brt = brtv; @@ -561,7 +561,7 @@ toku_rollback_change_fdescriptor(FILENUM filenum, fd = toku_cachefile_get_and_pin_fd(cf); if (!toku_cachefile_is_dev_null_unlocked(cf)) { OMTVALUE brtv=NULL; - r = toku_omt_find_zero(txn->open_brts, find_brt_from_filenum, &filenum, &brtv, NULL, NULL); + r = toku_omt_find_zero(txn->open_brts, find_brt_from_filenum, &filenum, &brtv, NULL); assert(r==0); BRT brt = brtv; DESCRIPTOR_S d; diff --git a/newbrt/rollback.c b/newbrt/rollback.c index 9254e72d0ea..02a0644bb60 100644 --- a/newbrt/rollback.c +++ b/newbrt/rollback.c @@ -176,7 +176,7 @@ live_list_reverse_note_txn_end_iter(OMTVALUE live_xidv, u_int32_t UU(index), voi int r; OMT reverse = txn->logger->live_list_reverse; - r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, live_xid, &pairv, &idx, NULL); + r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, live_xid, &pairv, &idx); invariant(r==0); pair = pairv; invariant(pair->xid1 == *live_xid); //sanity check @@ -188,7 +188,7 @@ live_list_reverse_note_txn_end_iter(OMTVALUE live_xidv, u_int32_t UU(index), voi OMT snapshot = txn->logger->snapshot_txnids; BOOL should_delete = TRUE; // find the youngest txn in snapshot that is older than xid - r = toku_omt_find(snapshot, toku_find_xid_by_xid, &xid, -1, &olderv, &olderidx, NULL); + r = toku_omt_find(snapshot, toku_find_xid_by_xid, &xid, -1, &olderv, &olderidx); if (r==0) { //There is an older txn olderxid = olderv; @@ -237,7 +237,7 @@ void toku_rollback_txn_close (TOKUTXN txn) { //Remove txn from list (omt) of live transactions OMTVALUE txnagain; u_int32_t idx; - r = toku_omt_find_zero(txn->logger->live_txns, find_xid, txn, &txnagain, &idx, NULL); + r = toku_omt_find_zero(txn->logger->live_txns, find_xid, txn, &txnagain, &idx); assert(r==0); assert(txn==txnagain); r = toku_omt_delete_at(txn->logger->live_txns, idx); @@ -248,7 +248,7 @@ void toku_rollback_txn_close (TOKUTXN txn) { OMTVALUE txnagain; u_int32_t idx; //Remove txn from list of live root txns - r = toku_omt_find_zero(txn->logger->live_root_txns, find_xid, txn, &txnagain, &idx, NULL); + r = toku_omt_find_zero(txn->logger->live_root_txns, find_xid, txn, &txnagain, &idx); assert(r==0); assert(txn==txnagain); r = toku_omt_delete_at(txn->logger->live_root_txns, idx); @@ -266,7 +266,7 @@ void toku_rollback_txn_close (TOKUTXN txn) { u_int32_t idx; OMTVALUE v; //Free memory used for snapshot_txnids - r = toku_omt_find_zero(txn->logger->snapshot_txnids, toku_find_xid_by_xid, &txn->txnid64, &v, &idx, NULL); + r = toku_omt_find_zero(txn->logger->snapshot_txnids, toku_find_xid_by_xid, &txn->txnid64, &v, &idx); invariant(r==0); TXNID *xid = v; invariant(*xid == txn->txnid64); @@ -280,7 +280,7 @@ void toku_rollback_txn_close (TOKUTXN txn) { invariant(toku_omt_size(txn->live_root_txn_list) > 0); OMTVALUE v; //store a single array of txnids - r = toku_omt_fetch(txn->live_root_txn_list, 0, &v, NULL); + r = toku_omt_fetch(txn->live_root_txn_list, 0, &v); invariant(r==0); toku_free(v); toku_omt_destroy(&txn->live_root_txn_list); @@ -293,7 +293,7 @@ void toku_rollback_txn_close (TOKUTXN txn) { TOKULOGGER logger = txn->logger; OMTVALUE oldest_txnv; - r = toku_omt_fetch(logger->live_txns, 0, &oldest_txnv, NULL); + r = toku_omt_fetch(logger->live_txns, 0, &oldest_txnv); if (r==0) { TOKUTXN oldest_txn = oldest_txnv; assert(oldest_txn != txn); // We just removed it @@ -638,7 +638,7 @@ int toku_txn_note_brt (TOKUTXN txn, BRT brt) { OMTVALUE txnv; u_int32_t index; // Does brt already know about transaction txn? - int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv, &index, NULL); + int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv, &index); if (r==0) { // It's already there. assert((TOKUTXN)txnv==txn); @@ -669,7 +669,7 @@ static int swap_brt (OMTVALUE txnv, u_int32_t UU(idx), void *extra) { int r; r = toku_txn_note_brt(txn, info->live); //Add new brt. assert(r==0); - r = toku_omt_find_zero(txn->open_brts, find_filenum, info->zombie, &zombie_again, &index, NULL); + r = toku_omt_find_zero(txn->open_brts, find_filenum, info->zombie, &zombie_again, &index); assert(r==0); assert((void*)zombie_again==info->zombie); r = toku_omt_delete_at(txn->open_brts, index); //Delete old brt. @@ -703,7 +703,7 @@ static int remove_brt (OMTVALUE txnv, u_int32_t UU(idx), void *brtv) { BRT brt = brtv; OMTVALUE brtv_again=NULL; u_int32_t index; - int r = toku_omt_find_zero(txn->open_brts, find_filenum, brt, &brtv_again, &index, NULL); + int r = toku_omt_find_zero(txn->open_brts, find_filenum, brt, &brtv_again, &index); assert(r==0); assert((void*)brtv_again==brtv); r = toku_omt_delete_at(txn->open_brts, index); @@ -723,7 +723,7 @@ static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv) { TOKUTXN txn = txnv; OMTVALUE txnv_again=NULL; u_int32_t index; - int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv_again, &index, NULL); + int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv_again, &index); assert(r==0); assert((void*)txnv_again==txnv); r = toku_omt_delete_at(brt->txns, index); @@ -760,7 +760,7 @@ int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr) { struct tokutxn fake_txn; fake_txn.txnid64 = xid; u_int32_t index; OMTVALUE txnv; - int r = toku_omt_find_zero(brt->txns, find_xid, &fake_txn, &txnv, &index, NULL); + int r = toku_omt_find_zero(brt->txns, find_xid, &fake_txn, &txnv, &index); if (r == 0) *txnptr = txnv; return r; } diff --git a/newbrt/tests/omt-cursor-test.c b/newbrt/tests/omt-cursor-test.c deleted file mode 100644 index 99f0a677189..00000000000 --- a/newbrt/tests/omt-cursor-test.c +++ /dev/null @@ -1,118 +0,0 @@ -#ident "$Id$" -#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved." -#include -#include - -#include "test.h" -#include - -#include "omt.h" -#include "memory.h" - -#include "brttypes.h" - -#include -#include - -enum { N=10 }; -struct value { int x; } vs[N]; -OMTVALUE ps[N]; - -#define V(x) ((struct value *)(x)) - -static void callback (OMTCURSOR c, void *extra) { - if (verbose) printf("%s:%d %p %p\n", __FUNCTION__, __LINE__, c, extra); - OMTVALUE v = NULL; - int r = toku_omt_cursor_current(c, &v); - if (verbose) printf("%s:%d %d\n", __FUNCTION__, __LINE__, r); -} - -static void test (void) { - OMT o; - OMTCURSOR curs, curs2, curs3; - int i, r; - OMTVALUE v; - for (i=0; ix==5); - r = toku_omt_cursor_next(curs, &v); - assert(r==0 && V(v)->x==6); - r = toku_omt_cursor_prev(curs, &v); - assert(r==0 && V(v)->x==5); - toku_omt_cursor_destroy(&curs); - toku_omt_destroy(&o); - - // Create two cursors, destroy omt first - r = toku_omt_create_from_sorted_array(&o, ps, 10); assert(r==0); - r = toku_omt_cursor_create(&curs); assert(r==0); - r = toku_omt_fetch(o, 5, &v, curs); assert(r==0); - r = toku_omt_cursor_create(&curs2); assert(r==0); - r = toku_omt_fetch(o, 4, &v, curs2); assert(r==0); - r = toku_omt_cursor_next(curs, &v); assert(r==0 && V(v)->x==6); - toku_omt_destroy(&o); - toku_omt_cursor_destroy(&curs); - toku_omt_cursor_destroy(&curs2); - - // Create two cursors, destroy them first - r = toku_omt_create_from_sorted_array(&o, ps, 10); assert(r==0); - r = toku_omt_cursor_create(&curs); assert(r==0); - r = toku_omt_fetch(o, 5, &v, curs); assert(r==0); - r = toku_omt_cursor_create(&curs2); assert(r==0); - r = toku_omt_fetch(o, 4, &v, curs2); assert(r==0); - r = toku_omt_cursor_next(curs, &v); assert(r==0 && V(v)->x==6); - r = toku_omt_cursor_prev(curs2, &v); assert(r==0 && V(v)->x==3); - toku_omt_cursor_destroy(&curs); - r = toku_omt_cursor_prev(curs2, &v); assert(r==0 && V(v)->x==2); - toku_omt_cursor_destroy(&curs2); - toku_omt_destroy(&o); - - // Create three cursors, destroy them first - r = toku_omt_create_from_sorted_array(&o, ps, 10); assert(r==0); - r = toku_omt_cursor_create(&curs); assert(r==0); - r = toku_omt_fetch(o, 5, &v, curs); assert(r==0); - r = toku_omt_cursor_create(&curs2); assert(r==0); - r = toku_omt_fetch(o, 4, &v, curs2); assert(r==0); - r = toku_omt_cursor_create(&curs3); assert(r==0); - r = toku_omt_fetch(o, 9, &v, curs3); assert(r==0); - r = toku_omt_cursor_next(curs, &v); assert(r==0 && V(v)->x==6); - r = toku_omt_cursor_prev(curs2, &v); assert(r==0 && V(v)->x==3); - r = toku_omt_cursor_next(curs3, &v); assert(r!=0 && !toku_omt_cursor_is_valid(curs3)); - toku_omt_cursor_destroy(&curs); - r = toku_omt_cursor_prev(curs2, &v); assert(r==0 && V(v)->x==2); - r = toku_omt_cursor_prev(curs2, &v); assert(r==0 && V(v)->x==1); - r = toku_omt_fetch(o, 1, &v, curs3); assert(r==0); - r = toku_omt_cursor_prev(curs3, &v); assert(r==0 && V(v)->x==0); - r = toku_omt_cursor_prev(curs3, &v); assert(r!=0 && !toku_omt_cursor_is_valid(curs3)); - toku_omt_cursor_destroy(&curs2); - toku_omt_destroy(&o); - toku_omt_cursor_destroy(&curs3); - - // ticket 1622, invalidate recursion - r = toku_omt_create_from_sorted_array(&o, ps, 10); assert(r==0); - r = toku_omt_cursor_create(&curs); assert(r==0); - toku_omt_cursor_set_invalidate_callback(curs, callback, 0); - r = toku_omt_fetch(o, 9, &v, curs); assert(r==0); - r = toku_omt_cursor_next(curs, &v); - toku_omt_destroy(&o); - toku_omt_cursor_destroy(&curs); -} - -int -test_main (int argc __attribute__((__unused__)), const char *argv[] __attribute__((__unused__))) { - test(); - return 0; -} diff --git a/newbrt/tests/omt-test.c b/newbrt/tests/omt-test.c index cf330338934..f0e15659f5d 100644 --- a/newbrt/tests/omt-test.c +++ b/newbrt/tests/omt-test.c @@ -264,94 +264,30 @@ test_create_from_sorted_array_size (enum create_type create_choice, enum close_w static void test_fetch_verify (OMT omtree, TESTVALUE* val, u_int32_t len ) { u_int32_t i; - int j; int r; TESTVALUE v = (TESTVALUE)&i; TESTVALUE oldv = v; - OMTCURSOR c; - r = toku_omt_cursor_create(&c); - CKERR(r); - assert(len == toku_omt_size(omtree)); for (i = 0; i < len; i++) { assert(oldv!=val[i]); v = NULL; - r = toku_omt_fetch(omtree, i, &v, NULL); + r = toku_omt_fetch(omtree, i, &v); CKERR(r); assert(v != NULL); assert(v != oldv); assert(v == val[i]); assert(V(v)->number == V(val[i])->number); v = oldv; - r = toku_omt_fetch(omtree, i, &v, c); - CKERR(r); - assert(v != NULL); - assert(v != oldv); - assert(v == val[i]); - assert(V(v)->number == V(val[i])->number); - assert(toku_omt_cursor_is_valid(c)); - - v = oldv; - r = toku_omt_cursor_current(c, &v); - CKERR(r); - assert(v != NULL); - assert(v != oldv); - assert(v == val[i]); - assert(V(v)->number == V(val[i])->number); - assert(toku_omt_cursor_is_valid(c)); - - v = oldv; - j = i + 1; - while ((r = toku_omt_cursor_next(c, &v)) == 0) { - assert(toku_omt_cursor_is_valid(c)); - assert(v != NULL); - assert(v != oldv); - assert(v == val[j]); - assert(V(v)->number == V(val[j])->number); - j++; - v = oldv; - } - CKERR2(r, EINVAL); - assert(j == (int) len); - - assert(oldv!=val[i]); - v = NULL; - r = toku_omt_fetch(omtree, i, &v, c); - CKERR(r); - assert(v != NULL); - assert(v != oldv); - assert(v == val[i]); - assert(V(v)->number == V(val[i])->number); - - v = oldv; - j = i - 1; - while ((r = toku_omt_cursor_prev(c, &v)) == 0) { - assert(toku_omt_cursor_is_valid(c)); - assert(v != NULL); - assert(v != oldv); - assert(v == val[j]); - assert(V(v)->number == V(val[j])->number); - j--; - v = oldv; - } - CKERR2(r, EINVAL); - assert(j == -1); - } for (i = len; i < len*2; i++) { v = oldv; - r = toku_omt_fetch(omtree, i, &v, NULL); + r = toku_omt_fetch(omtree, i, &v); CKERR2(r, EINVAL); assert(v == oldv); - v = NULL; - r = toku_omt_fetch(omtree, i, &v, c); - CKERR2(r, EINVAL); - assert(v == NULL); } - toku_omt_cursor_destroy(&c); } static void @@ -700,69 +636,22 @@ heavy_extra (h_extra* extra, u_int32_t first_zero, u_int32_t first_pos) { static void test_find_dir (int dir, void* extra, int (*h)(OMTVALUE, void*), int r_expect, BOOL idx_will_change, u_int32_t idx_expect, - u_int32_t number_expect, BOOL cursor_valid) { + u_int32_t number_expect, BOOL UU(cursor_valid)) { u_int32_t idx = UINT32_MAX; u_int32_t old_idx = idx; - TESTVALUE omt_val, omt_val_curs; - OMTCURSOR c; + TESTVALUE omt_val; int r; - BOOL found; - - r = toku_omt_cursor_create(&c); - CKERR(r); omt_val = NULL; - if (dir == 0) { - r = toku_omt_find_zero(omt, h, extra, &omt_val, &idx, c); - } - else { - r = toku_omt_find( omt, h, extra, dir, &omt_val, &idx, c); - } - CKERR2(r, r_expect); - if (idx_will_change) { - assert(idx == idx_expect); - } - else { - assert(idx == old_idx); - } - if (r == DB_NOTFOUND) { - assert(omt_val == NULL); - found = FALSE; - } - else { - assert(V(omt_val)->number == number_expect); - found = TRUE; - } - - assert(!cursor_valid == !toku_omt_cursor_is_valid(c)); - if (cursor_valid) { - TESTVALUE tmp; - assert(idx_will_change); - omt_val_curs = NULL; - r = toku_omt_cursor_current(c, &omt_val_curs); - CKERR(r); - assert(toku_omt_cursor_is_valid(c)); - r = toku_omt_fetch(omt, idx, &tmp, NULL); - CKERR(r); - if (found) assert(tmp==omt_val); - assert(omt_val_curs != NULL); - assert(omt_val_curs == tmp); - assert(V(omt_val_curs)->number == V(tmp)->number); - if (found) assert(V(omt_val_curs)->number==number_expect); - } - - toku_omt_cursor_invalidate(c); - assert(!toku_omt_cursor_is_valid(c)); - toku_omt_cursor_destroy(&c); /* Verify we can pass NULL value. */ omt_val = NULL; idx = old_idx; if (dir == 0) { - r = toku_omt_find_zero(omt, h, extra, NULL, &idx, NULL); + r = toku_omt_find_zero(omt, h, extra, NULL, &idx); } else { - r = toku_omt_find( omt, h, extra, dir, NULL, &idx, NULL); + r = toku_omt_find( omt, h, extra, dir, NULL, &idx); } CKERR2(r, r_expect); if (idx_will_change) { @@ -777,10 +666,10 @@ test_find_dir (int dir, void* extra, int (*h)(OMTVALUE, void*), omt_val = NULL; idx = old_idx; if (dir == 0) { - r = toku_omt_find_zero(omt, h, extra, &omt_val, 0, NULL); + r = toku_omt_find_zero(omt, h, extra, &omt_val, 0); } else { - r = toku_omt_find( omt, h, extra, dir, &omt_val, 0, NULL); + r = toku_omt_find( omt, h, extra, dir, &omt_val, 0); } CKERR2(r, r_expect); assert(idx == old_idx); @@ -795,10 +684,10 @@ test_find_dir (int dir, void* extra, int (*h)(OMTVALUE, void*), omt_val = NULL; idx = old_idx; if (dir == 0) { - r = toku_omt_find_zero(omt, h, extra, NULL, 0, NULL); + r = toku_omt_find_zero(omt, h, extra, NULL, 0); } else { - r = toku_omt_find( omt, h, extra, dir, NULL, 0, NULL); + r = toku_omt_find( omt, h, extra, dir, NULL, 0); } CKERR2(r, r_expect); assert(idx == old_idx); @@ -879,100 +768,12 @@ test_find (enum create_type create_choice, enum close_when_done do_close) { test_close(do_close); } -static void -invalidate_callback_null (OMTCURSOR c, void *extra) { - assert(c && !extra); -} - -static void -invalidate_callback_inc (OMTCURSOR c, void *extra) { - assert(c); - int *num = extra; - (*num)++; -} - -static void -test_invalidate (enum create_type create_choice, BOOL set_callback, BOOL invalidate_callback) { - init_identity_values(random_seed, 100); - test_create_from_sorted_array(create_choice, KEEP_WHEN_DONE); - - OMTCURSOR c; - int invalidate_count = 0; - - int r = toku_omt_cursor_create(&c); - if (set_callback || invalidate_callback) { - toku_omt_cursor_set_invalidate_callback(c, invalidate_callback_inc, &invalidate_count); - } - if (invalidate_callback) { - toku_omt_cursor_set_invalidate_callback(c, invalidate_callback_null, NULL); - } - OMTVALUE val; - r = toku_omt_fetch(omt, 0, &val, c); CKERR(r); - assert(toku_omt_cursor_is_valid(c)); - assert(invalidate_count==0); - r = toku_omt_cursor_prev(c, &val); CKERR2(r, EINVAL); - assert(!toku_omt_cursor_is_valid(c)); - if (set_callback && !invalidate_callback) assert(invalidate_count==1); - else assert(invalidate_count==0); - r = toku_omt_cursor_prev(c, &val); CKERR2(r, EINVAL); - assert(!toku_omt_cursor_is_valid(c)); - if (set_callback && !invalidate_callback) assert(invalidate_count==1); - else assert(invalidate_count==0); - - r = toku_omt_fetch(omt, toku_omt_size(omt)-1, &val, c); CKERR(r); - assert(toku_omt_cursor_is_valid(c)); - if (set_callback && !invalidate_callback) assert(invalidate_count==1); - else assert(invalidate_count==0); - - r = toku_omt_cursor_prev(c, &val); CKERR(r); - assert(toku_omt_cursor_is_valid(c)); - if (set_callback && !invalidate_callback) assert(invalidate_count==1); - else assert(invalidate_count==0); - r = toku_omt_cursor_next(c, &val); CKERR(r); - assert(toku_omt_cursor_is_valid(c)); - if (set_callback && !invalidate_callback) assert(invalidate_count==1); - else assert(invalidate_count==0); - r = toku_omt_cursor_next(c, &val); CKERR2(r, EINVAL); - assert(!toku_omt_cursor_is_valid(c)); - if (set_callback && !invalidate_callback) assert(invalidate_count==2); - else assert(invalidate_count==0); - - r = toku_omt_fetch(omt, toku_omt_size(omt)-1, &val, c); CKERR(r); - assert(toku_omt_cursor_is_valid(c)); - if (set_callback && !invalidate_callback) assert(invalidate_count==2); - else assert(invalidate_count==0); - - test_close(CLOSE_WHEN_DONE); - assert(!toku_omt_cursor_is_valid(c)); - if (set_callback && !invalidate_callback) assert(invalidate_count==3); - else assert(invalidate_count==0); - - init_identity_values(random_seed, 100); - test_create_from_sorted_array(create_choice, KEEP_WHEN_DONE); - r = toku_omt_fetch(omt, toku_omt_size(omt)-1, &val, c); CKERR(r); - assert(toku_omt_cursor_is_valid(c)); - if (set_callback && !invalidate_callback) assert(invalidate_count==3); - else assert(invalidate_count==0); - - toku_omt_cursor_destroy(&c); - if (set_callback && !invalidate_callback) assert(invalidate_count==4); - else assert(invalidate_count==0); - - test_close(CLOSE_WHEN_DONE); - if (set_callback && !invalidate_callback) assert(invalidate_count==4); - else assert(invalidate_count==0); -} - static void runtests_create_choice (enum create_type create_choice) { test_create_array(create_choice, TEST_SORTED); test_create_array(create_choice, TEST_RANDOM); test_create_array(create_choice, TEST_IDENTITY); test_find( create_choice, CLOSE_WHEN_DONE); - test_invalidate( create_choice, FALSE, FALSE); - test_invalidate( create_choice, FALSE, TRUE); - test_invalidate( create_choice, TRUE, FALSE); - test_invalidate( create_choice, TRUE, TRUE); } int diff --git a/newbrt/txn.c b/newbrt/txn.c index 5d4196d15bb..5db3f8e71af 100644 --- a/newbrt/txn.c +++ b/newbrt/txn.c @@ -114,7 +114,7 @@ live_list_reverse_note_txn_start_iter(OMTVALUE live_xidv, u_int32_t UU(index), v int r; OMT reverse = txn->logger->live_list_reverse; - r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, live_xid, &pairv, &idx, NULL); + r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, live_xid, &pairv, &idx); if (r==0) { pair = pairv; invariant(pair->xid1 == *live_xid); //sanity check @@ -488,7 +488,7 @@ TXNID toku_get_oldest_in_live_root_txn_list(TOKUTXN txn) { invariant(toku_omt_size(omt)>0); OMTVALUE v; int r; - r = toku_omt_fetch(omt, 0, &v, NULL); + r = toku_omt_fetch(omt, 0, &v); assert_zero(r); TXNID *xidp = v; return *xidp; @@ -509,7 +509,7 @@ BOOL toku_is_txn_in_live_root_txn_list(TOKUTXN txn, TXNID xid) { OMTVALUE txnidpv; uint32_t index; BOOL retval = FALSE; - int r = toku_omt_find_zero(omt, find_xidp, &xid, &txnidpv, &index, NULL); + int r = toku_omt_find_zero(omt, find_xidp, &xid, &txnidpv, &index); if (r==0) { TXNID *txnidp = txnidpv; invariant(*txnidp == xid); @@ -536,19 +536,19 @@ verify_snapshot_system(TOKULOGGER logger) { //set up arrays for easier access for (i = 0; i < num_snapshot_txnids; i++) { OMTVALUE v; - r = toku_omt_fetch(logger->snapshot_txnids, i, &v, NULL); + r = toku_omt_fetch(logger->snapshot_txnids, i, &v); assert_zero(r); snapshot_txnids[i] = *(TXNID*)v; } for (i = 0; i < num_live_txns; i++) { OMTVALUE v; - r = toku_omt_fetch(logger->live_txns, i, &v, NULL); + r = toku_omt_fetch(logger->live_txns, i, &v); assert_zero(r); live_txns[i] = v; } for (i = 0; i < num_live_list_reverse; i++) { OMTVALUE v; - r = toku_omt_fetch(logger->live_list_reverse, i, &v, NULL); + r = toku_omt_fetch(logger->live_list_reverse, i, &v); assert_zero(r); live_list_reverse[i] = v; } @@ -566,7 +566,7 @@ verify_snapshot_system(TOKULOGGER logger) { { for (j = 0; j < num_live_root_txn_list; j++) { OMTVALUE v; - r = toku_omt_fetch(snapshot_txn->live_root_txn_list, j, &v, NULL); + r = toku_omt_fetch(snapshot_txn->live_root_txn_list, j, &v); assert_zero(r); live_root_txn_list[j] = *(TXNID*)v; } @@ -595,7 +595,7 @@ verify_snapshot_system(TOKULOGGER logger) { OMTVALUE v2; r = toku_omt_find_zero(logger->snapshot_txnids, toku_find_xid_by_xid, - &pair->xid2, &v2, &index, NULL); + &pair->xid2, &v2, &index); assert_zero(r); } for (j = 0; j < num_live_txns; j++) { @@ -621,7 +621,7 @@ verify_snapshot_system(TOKULOGGER logger) { OMTVALUE v2; r = toku_omt_find_zero(logger->snapshot_txnids, toku_find_xid_by_xid, - &txn->txnid64, &v2, &index, NULL); + &txn->txnid64, &v2, &index); invariant(r==0 || r==DB_NOTFOUND); invariant((r==0) == (expect!=0)); } diff --git a/newbrt/ule.c b/newbrt/ule.c index 0a6240325d8..8065042303f 100644 --- a/newbrt/ule.c +++ b/newbrt/ule.c @@ -137,7 +137,7 @@ get_next_older_txnid(TXNID xc, OMT omt) { OMTVALUE v; uint32_t idx; TXNID rval; - r = toku_omt_find(omt, toku_find_pair_by_xid, &xc, -1, &v, &idx, NULL); + r = toku_omt_find(omt, toku_find_pair_by_xid, &xc, -1, &v, &idx); if (r==0) { xid = v; invariant(*xid < xc); //sanity check @@ -157,7 +157,7 @@ toku_get_youngest_live_list_txnid_for(TXNID xc, OMT live_list_reverse) { uint32_t idx; TXNID rval; int r; - r = toku_omt_find_zero(live_list_reverse, toku_find_pair_by_xid, &xc, &pairv, &idx, NULL); + r = toku_omt_find_zero(live_list_reverse, toku_find_pair_by_xid, &xc, &pairv, &idx); if (r==0) { pair = pairv; invariant(pair->xid1 == xc); //sanity check @@ -226,7 +226,7 @@ garbage_collection(ULE ule, OMT snapshot_xids, OMT live_list_reverse) { { u_int32_t idx; OMTVALUE txnagain; - int r = toku_omt_find_zero(snapshot_xids, toku_find_xid_by_xid, &tl1, &txnagain, &idx, NULL); + int r = toku_omt_find_zero(snapshot_xids, toku_find_xid_by_xid, &tl1, &txnagain, &idx); invariant(r==0); //make sure that the txn you are claiming is live is actually live } // diff --git a/src/lock_tree/locktree.c b/src/lock_tree/locktree.c index 57ba5217bdb..3b1ccc654ba 100644 --- a/src/lock_tree/locktree.c +++ b/src/lock_tree/locktree.c @@ -1710,7 +1710,7 @@ static int lt_do_escalation(toku_lock_tree* lt) { OMTVALUE dbv; assert(toku_omt_size(lt->dbs) > 0); // there is at least one db associated with this locktree - r = toku_omt_fetch(lt->dbs, 0, &dbv, NULL); + r = toku_omt_fetch(lt->dbs, 0, &dbv); assert(r == 0); db = dbv; lt_set_comparison_functions(lt, db); @@ -2143,7 +2143,7 @@ static void lt_add_db(toku_lock_tree* tree, DB *db) { int r; OMTVALUE get_dbv = NULL; uint32_t index; - r = toku_omt_find_zero(tree->dbs, find_db, db, &get_dbv, &index, NULL); + r = toku_omt_find_zero(tree->dbs, find_db, db, &get_dbv, &index); assert(r == DB_NOTFOUND); r = toku_omt_insert_at(tree->dbs, db, index); assert_zero(r); @@ -2155,7 +2155,7 @@ static void lt_remove_db(toku_lock_tree* tree, DB *db) { int r; OMTVALUE get_dbv = NULL; uint32_t index; - r = toku_omt_find_zero(tree->dbs, find_db, db, &get_dbv, &index, NULL); + r = toku_omt_find_zero(tree->dbs, find_db, db, &get_dbv, &index); assert_zero(r); assert(db == get_dbv); r = toku_omt_delete_at(tree->dbs, index); diff --git a/src/range_tree/log_nooverlap.c b/src/range_tree/log_nooverlap.c index a454e77ea06..7db72146f4a 100644 --- a/src/range_tree/log_nooverlap.c +++ b/src/range_tree/log_nooverlap.c @@ -157,7 +157,7 @@ toku_rt_find(toku_range_tree* tree, toku_interval* query, u_int32_t k, extra.end_cmp = tree->end_cmp; extra.query = *query; - r = toku_omt_find_zero(tree->i.omt, rt_heaviside, &extra, NULL, &leftmost, NULL); + r = toku_omt_find_zero(tree->i.omt, rt_heaviside, &extra, NULL, &leftmost); if (r == DB_NOTFOUND) { /* Nothing overlaps. */ *numfound = 0; @@ -199,7 +199,7 @@ toku_rt_insert(toku_range_tree* tree, toku_range* range) { extra.end_cmp = tree->end_cmp; extra.query = range->ends; - r = toku_omt_find_zero(tree->i.omt, rt_heaviside, &extra, NULL, &index, NULL); + r = toku_omt_find_zero(tree->i.omt, rt_heaviside, &extra, NULL, &index); if (r == 0) { r = EDOM; goto cleanup; } @@ -233,7 +233,7 @@ toku_rt_delete(toku_range_tree* tree, toku_range* range) { extra.end_cmp = tree->end_cmp; extra.query = range->ends; - r = toku_omt_find_zero(tree->i.omt, rt_heaviside, &extra, &value, &index, NULL); + r = toku_omt_find_zero(tree->i.omt, rt_heaviside, &extra, &value, &index); if (r != 0) { r = EDOM; goto cleanup; } @@ -270,7 +270,7 @@ rt_neightbor(toku_range_tree* tree, toku_point* point, extra.query.right = point; assert(direction==1 || direction==-1); - r = toku_omt_find(tree->i.omt, rt_heaviside, &extra, direction, &value, &index, NULL); + r = toku_omt_find(tree->i.omt, rt_heaviside, &extra, direction, &value, &index); if (r == DB_NOTFOUND) { *wasfound = FALSE; r = 0; @@ -349,14 +349,14 @@ toku_rt_verify(toku_range_tree *tree) { for (u_int32_t i = 0; i < tree->numelements; i++) { // assert left <= right OMTVALUE omtv; - r = toku_omt_fetch(tree->i.omt, i, &omtv, NULL); + r = toku_omt_fetch(tree->i.omt, i, &omtv); assert_zero(r); toku_range *v = (toku_range *) omtv; assert(tree->end_cmp(v->ends.left, v->ends.right) <= 0); // assert ranges are sorted if (i < tree->numelements-1) { OMTVALUE omtvnext; - r = toku_omt_fetch(tree->i.omt, i+1, &omtvnext, NULL); + r = toku_omt_fetch(tree->i.omt, i+1, &omtvnext); assert_zero(r); toku_range *vnext = (toku_range *) omtvnext; assert(tree->end_cmp(v->ends.right, vnext->ends.left) < 0); @@ -365,11 +365,11 @@ toku_rt_verify(toku_range_tree *tree) { // verify no overlaps for (u_int32_t i = 1; i < tree->numelements; i++) { OMTVALUE omtvprev; - r = toku_omt_fetch(tree->i.omt, i-1, &omtvprev, NULL); + r = toku_omt_fetch(tree->i.omt, i-1, &omtvprev); assert_zero(r); toku_range *vprev = (toku_range *) omtvprev; OMTVALUE omtv; - r = toku_omt_fetch(tree->i.omt, i, &omtv, NULL); + r = toku_omt_fetch(tree->i.omt, i, &omtv); assert_zero(r); toku_range *v = (toku_range *) omtv; assert(!toku__rt_overlap(tree, &vprev->ends, &v->ends)); diff --git a/src/tests/test_bulk_fetch.c b/src/tests/test_bulk_fetch.c new file mode 100644 index 00000000000..f99b48060d6 --- /dev/null +++ b/src/tests/test_bulk_fetch.c @@ -0,0 +1,153 @@ +/* -*- mode: C; c-basic-offset: 4 -*- */ +#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved." +#include "test.h" + +#include +#include + +#include +#include +#include +#include + + +static void +verify_val(DBT const *a, DBT const *b, void *c) { + assert(a->size == sizeof(int)); + assert(b->size == sizeof(int)); + int* expected = (int *)c; + assert(*expected == *(int *)a->data); + assert(*expected == *(int *)b->data); +} + +static int +verify_fwd_fast(DBT const *a, DBT const *b, void *c) { + verify_val(a,b,c); + int* expected = (int *)c; + *expected = *expected + 1; + return TOKUDB_CURSOR_CONTINUE; +} + +static int +verify_fwd_slow(DBT const *a, DBT const *b, void *c) { + verify_val(a,b,c); + int* expected = (int *)c; + *expected = *expected + 1; + return 0; +} + +static int +verify_bwd_fast(DBT const *a, DBT const *b, void *c) { + verify_val(a,b,c); + int* expected = (int *)c; + *expected = *expected - 1; + return TOKUDB_CURSOR_CONTINUE; +} + +static int +verify_bwd_slow(DBT const *a, DBT const *b, void *c) { + verify_val(a,b,c); + int* expected = (int *)c; + *expected = *expected - 1; + return 0; +} + + + +static void +test_bulk_fetch (int n) { + if (verbose) printf("test_rand_insert:%d \n", n); + + DB_TXN * const null_txn = 0; + const char * const fname = "test.bulk_fetch.brt"; + int r; + + r = system("rm -rf " ENVDIR); + CKERR(r); + r=toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0); + + /* create the dup database file */ + DB_ENV *env; + r = db_env_create(&env, 0); assert(r == 0); + r=env->set_default_bt_compare(env, int_dbt_cmp); CKERR(r); + r = env->open(env, ENVDIR, DB_CREATE+DB_PRIVATE+DB_INIT_MPOOL, 0); assert(r == 0); + + DB *db; + r = db_create(&db, env, 0); + assert(r == 0); + r = db->set_flags(db, 0); + assert(r == 0); + r = db->set_pagesize(db, 4096); + assert(r == 0); + r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666); + assert(r == 0); + + int keys[n]; + int i; + for (i=0; iput(db, null_txn, dbt_init(&key, &keys[i], sizeof keys[i]), dbt_init(&val, &i, sizeof i), 0); + assert(r == 0); + } + + // + // data inserted, now verify that using TOKUDB_CURSOR_CONTINUE in the callback works + // + DBC* cursor; + + // verify fast + r = db->cursor(db, NULL, &cursor, 0); + CKERR(r); + int expected = 0; + while (r != DB_NOTFOUND) { + r = cursor->c_getf_next(cursor, 0, verify_fwd_fast, &expected); + assert(r==0 || r==DB_NOTFOUND); + } + r = cursor->c_close(cursor); CKERR(r); + + // verify slow + r = db->cursor(db, NULL, &cursor, 0); + CKERR(r); + expected = 0; + while (r != DB_NOTFOUND) { + r = cursor->c_getf_next(cursor, 0, verify_fwd_slow, &expected); + assert(r==0 || r==DB_NOTFOUND); + } + r = cursor->c_close(cursor); CKERR(r); + + // now do backwards + r = db->cursor(db, NULL, &cursor, 0); + CKERR(r); + expected = n-1; + while (r != DB_NOTFOUND) { + r = cursor->c_getf_prev(cursor, 0, verify_bwd_fast, &expected); + assert(r==0 || r==DB_NOTFOUND); + } + r = cursor->c_close(cursor); CKERR(r); + + // verify slow + r = db->cursor(db, NULL, &cursor, 0); + CKERR(r); + expected = n-1; + while (r != DB_NOTFOUND) { + r = cursor->c_getf_prev(cursor, 0, verify_bwd_slow, &expected); + assert(r==0 || r==DB_NOTFOUND); + } + r = cursor->c_close(cursor); CKERR(r); + + + r = db->close(db, 0); CKERR(r); + r = env->close(env, 0); CKERR(r); +} + +int +test_main(int argc, char *const argv[]) { + parse_args(argc, argv); + test_bulk_fetch(10000); + + return 0; +} diff --git a/src/ydb.c b/src/ydb.c index 4a0949631b4..a9ff5672ca2 100644 --- a/src/ydb.c +++ b/src/ydb.c @@ -2819,7 +2819,7 @@ env_note_db_opened(DB_ENV *env, DB *db) { OMTVALUE dbv; uint32_t idx; env->i->num_open_dbs++; - r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx, NULL); + r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx); assert(r==DB_NOTFOUND); //Must not already be there. r = toku_omt_insert_at(env->i->open_dbs, db, idx); assert(r==0); @@ -2834,7 +2834,7 @@ env_note_db_closed(DB_ENV *env, DB *db) { OMTVALUE dbv; uint32_t idx; env->i->num_open_dbs--; - r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx, NULL); + r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx); assert(r==0); //Must already be there. assert((DB*)dbv == db); r = toku_omt_delete_at(env->i->open_dbs, idx); @@ -2850,7 +2850,7 @@ env_note_zombie_db(DB_ENV *env, DB *db) { OMTVALUE dbv; uint32_t idx; env->i->num_zombie_dbs++; - r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx, NULL); + r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx); assert(r==DB_NOTFOUND); //Must not already be there. r = toku_omt_insert_at(env->i->open_dbs, db, idx); assert(r==0); @@ -2865,7 +2865,7 @@ env_note_zombie_db_closed(DB_ENV *env, DB *db) { OMTVALUE dbv; uint32_t idx; env->i->num_zombie_dbs--; - r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx, NULL); + r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx); assert(r==0); //Must already be there. assert((DB*)dbv == db); r = toku_omt_delete_at(env->i->open_dbs, idx); @@ -2907,7 +2907,7 @@ env_is_db_with_dname_open(DB_ENV *env, const char *dname) { BOOL rval; OMTVALUE dbv; uint32_t idx; - r = toku_omt_find_zero(env->i->open_dbs, find_open_db_by_dname, (void*)dname, &dbv, &idx, NULL); + r = toku_omt_find_zero(env->i->open_dbs, find_open_db_by_dname, (void*)dname, &dbv, &idx); if (r==0) { DB *db = dbv; assert(strcmp(dname, db->i->dname) == 0); @@ -2928,7 +2928,7 @@ env_get_zombie_db_with_dname(DB_ENV *env, const char *dname) { DB* rval; OMTVALUE dbv; uint32_t idx; - r = toku_omt_find_zero(env->i->open_dbs, find_zombie_db_by_dname, (void*)dname, &dbv, &idx, NULL); + r = toku_omt_find_zero(env->i->open_dbs, find_zombie_db_by_dname, (void*)dname, &dbv, &idx); if (r==0) { DB *db = dbv; assert(db); @@ -3461,7 +3461,7 @@ c_getf_first_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, //Call application-layer callback if found and locks were successfully obtained. if (r==0 && key!=NULL) { context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra); - if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR; + r = context->r_user_callback; } //Give brt-layer an error (if any) to return from toku_brt_cursor_first @@ -3512,7 +3512,7 @@ c_getf_last_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, v //Call application-layer callback if found and locks were successfully obtained. if (r==0 && key!=NULL) { context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra); - if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR; + r = context->r_user_callback; } //Give brt-layer an error (if any) to return from toku_brt_cursor_last @@ -3567,7 +3567,7 @@ c_getf_next_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, v //Call application-layer callback if found and locks were successfully obtained. if (r==0 && key!=NULL) { context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra); - if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR; + r = context->r_user_callback; } //Give brt-layer an error (if any) to return from toku_brt_cursor_next @@ -3622,7 +3622,7 @@ c_getf_prev_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, v //Call application-layer callback if found and locks were successfully obtained. if (r==0 && key!=NULL) { context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra); - if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR; + r = context->r_user_callback; } //Give brt-layer an error (if any) to return from toku_brt_cursor_prev @@ -3660,7 +3660,7 @@ c_getf_current_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val //Call application-layer callback if found. if (key!=NULL) { context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra); - if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR; + r = context->r_user_callback; } //Give brt-layer an error (if any) to return from toku_brt_cursor_current @@ -3724,7 +3724,7 @@ c_getf_set_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, vo //Call application-layer callback if found and locks were successfully obtained. if (r==0 && key!=NULL) { context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra); - if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR; + r = context->r_user_callback; } //Give brt-layer an error (if any) to return from toku_brt_cursor_set @@ -3779,7 +3779,7 @@ c_getf_set_range_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec v //Call application-layer callback if found and locks were successfully obtained. if (r==0 && key!=NULL) { context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra); - if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR; + r = context->r_user_callback; } //Give brt-layer an error (if any) to return from toku_brt_cursor_set_range @@ -3835,7 +3835,7 @@ c_getf_set_range_reverse_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, b //Call application-layer callback if found and locks were successfully obtained. if (r==0 && key!=NULL) { context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra); - if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR; + r = context->r_user_callback; } //Give brt-layer an error (if any) to return from toku_brt_cursor_set_range_reverse