[t:3436], [t:3782], merge ydb level bulk fetch to main line

git-svn-id: file:///svn/toku/tokudb@33770 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
Zardosht Kasheff 2013-04-16 23:59:46 -04:00 committed by Yoni Fogel
parent 79566071b9
commit f392523514
28 changed files with 382 additions and 964 deletions

View file

@ -290,6 +290,7 @@ typedef enum {
#define TOKUDB_UPGRADE_FAILURE -100011 #define TOKUDB_UPGRADE_FAILURE -100011
#define TOKUDB_TRY_AGAIN -100012 #define TOKUDB_TRY_AGAIN -100012
#define TOKUDB_NEEDS_REPAIR -100013 #define TOKUDB_NEEDS_REPAIR -100013
#define TOKUDB_CURSOR_CONTINUE -100014
/* LOADER flags */ /* LOADER flags */
#define LOADER_USE_PUTS 1 #define LOADER_USE_PUTS 1
typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val); typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val);

View file

@ -292,6 +292,7 @@ typedef enum {
#define TOKUDB_UPGRADE_FAILURE -100011 #define TOKUDB_UPGRADE_FAILURE -100011
#define TOKUDB_TRY_AGAIN -100012 #define TOKUDB_TRY_AGAIN -100012
#define TOKUDB_NEEDS_REPAIR -100013 #define TOKUDB_NEEDS_REPAIR -100013
#define TOKUDB_CURSOR_CONTINUE -100014
/* LOADER flags */ /* LOADER flags */
#define LOADER_USE_PUTS 1 #define LOADER_USE_PUTS 1
typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val); typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val);

View file

@ -292,6 +292,7 @@ typedef enum {
#define TOKUDB_UPGRADE_FAILURE -100011 #define TOKUDB_UPGRADE_FAILURE -100011
#define TOKUDB_TRY_AGAIN -100012 #define TOKUDB_TRY_AGAIN -100012
#define TOKUDB_NEEDS_REPAIR -100013 #define TOKUDB_NEEDS_REPAIR -100013
#define TOKUDB_CURSOR_CONTINUE -100014
/* LOADER flags */ /* LOADER flags */
#define LOADER_USE_PUTS 1 #define LOADER_USE_PUTS 1
typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val); typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val);

View file

@ -292,6 +292,7 @@ typedef enum {
#define TOKUDB_UPGRADE_FAILURE -100011 #define TOKUDB_UPGRADE_FAILURE -100011
#define TOKUDB_TRY_AGAIN -100012 #define TOKUDB_TRY_AGAIN -100012
#define TOKUDB_NEEDS_REPAIR -100013 #define TOKUDB_NEEDS_REPAIR -100013
#define TOKUDB_CURSOR_CONTINUE -100014
/* LOADER flags */ /* LOADER flags */
#define LOADER_USE_PUTS 1 #define LOADER_USE_PUTS 1
typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val); typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val);

View file

@ -293,6 +293,7 @@ typedef enum {
#define TOKUDB_UPGRADE_FAILURE -100011 #define TOKUDB_UPGRADE_FAILURE -100011
#define TOKUDB_TRY_AGAIN -100012 #define TOKUDB_TRY_AGAIN -100012
#define TOKUDB_NEEDS_REPAIR -100013 #define TOKUDB_NEEDS_REPAIR -100013
#define TOKUDB_CURSOR_CONTINUE -100014
/* LOADER flags */ /* LOADER flags */
#define LOADER_USE_PUTS 1 #define LOADER_USE_PUTS 1
typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val); typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val);

View file

@ -75,6 +75,7 @@ enum {
TOKUDB_UPGRADE_FAILURE = -100011, TOKUDB_UPGRADE_FAILURE = -100011,
TOKUDB_TRY_AGAIN = -100012, TOKUDB_TRY_AGAIN = -100012,
TOKUDB_NEEDS_REPAIR = -100013, TOKUDB_NEEDS_REPAIR = -100013,
TOKUDB_CURSOR_CONTINUE = -100014,
}; };
static void print_defines (void) { static void print_defines (void) {
@ -226,6 +227,7 @@ static void print_defines (void) {
dodefine(TOKUDB_UPGRADE_FAILURE); dodefine(TOKUDB_UPGRADE_FAILURE);
dodefine(TOKUDB_TRY_AGAIN); dodefine(TOKUDB_TRY_AGAIN);
dodefine(TOKUDB_NEEDS_REPAIR); dodefine(TOKUDB_NEEDS_REPAIR);
dodefine(TOKUDB_CURSOR_CONTINUE);
/* LOADER flags */ /* LOADER flags */
printf("/* LOADER flags */\n"); printf("/* LOADER flags */\n");

View file

@ -293,6 +293,7 @@ typedef enum {
#define TOKUDB_UPGRADE_FAILURE -100011 #define TOKUDB_UPGRADE_FAILURE -100011
#define TOKUDB_TRY_AGAIN -100012 #define TOKUDB_TRY_AGAIN -100012
#define TOKUDB_NEEDS_REPAIR -100013 #define TOKUDB_NEEDS_REPAIR -100013
#define TOKUDB_CURSOR_CONTINUE -100014
/* LOADER flags */ /* LOADER flags */
#define LOADER_USE_PUTS 1 #define LOADER_USE_PUTS 1
typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val); typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val);

View file

@ -283,7 +283,8 @@ static int counttotalbytes (DBT const *key, DBT const *data, void *extrav) {
printf("%s:%d %"PRIu64" %"PRIu64"\n", __FUNCTION__, __LINE__, k, expect_key); printf("%s:%d %"PRIu64" %"PRIu64"\n", __FUNCTION__, __LINE__, k, expect_key);
expect_key = k + 1; expect_key = k + 1;
} }
return 0; return TOKUDB_CURSOR_CONTINUE;
//return 0;
} }
static void scanscan_lwc (void) { static void scanscan_lwc (void) {
@ -306,7 +307,7 @@ static void scanscan_lwc (void) {
rowcounter++; rowcounter++;
if (limitcount>0 && rowcounter>=limitcount) break; if (limitcount>0 && rowcounter>=limitcount) break;
} }
assert(r==DB_NOTFOUND); //assert(r==DB_NOTFOUND);
r = dbc->c_close(dbc); assert(r==0); r = dbc->c_close(dbc); assert(r==0);
double thistime = gettime(); double thistime = gettime();
double tdiff = thistime-prevtime; double tdiff = thistime-prevtime;

View file

@ -293,6 +293,7 @@ typedef enum {
#define TOKUDB_UPGRADE_FAILURE -100011 #define TOKUDB_UPGRADE_FAILURE -100011
#define TOKUDB_TRY_AGAIN -100012 #define TOKUDB_TRY_AGAIN -100012
#define TOKUDB_NEEDS_REPAIR -100013 #define TOKUDB_NEEDS_REPAIR -100013
#define TOKUDB_CURSOR_CONTINUE -100014
/* LOADER flags */ /* LOADER flags */
#define LOADER_USE_PUTS 1 #define LOADER_USE_PUTS 1
typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val); typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val);

View file

@ -429,7 +429,6 @@ struct brt_header {
unsigned int flags; unsigned int flags;
DESCRIPTOR_S descriptor; DESCRIPTOR_S descriptor;
u_int64_t root_put_counter; // the generation number of the brt
BLOCK_TABLE blocktable; BLOCK_TABLE blocktable;
// If a transaction created this BRT, which one? // If a transaction created this BRT, which one?
@ -549,8 +548,6 @@ struct brt_cursor_leaf_info_to_be {
// Values to be used to pin a leaf for shortcut searches // Values to be used to pin a leaf for shortcut searches
struct brt_cursor_leaf_info { struct brt_cursor_leaf_info {
BLOCKNUM blocknumber;
u_int32_t fullhash;
struct brt_cursor_leaf_info_to_be to_be; struct brt_cursor_leaf_info_to_be to_be;
}; };
@ -558,13 +555,10 @@ struct brt_cursor_leaf_info {
struct brt_cursor { struct brt_cursor {
struct toku_list cursors_link; struct toku_list cursors_link;
BRT brt; BRT brt;
BOOL current_in_omt;
BOOL prefetching; BOOL prefetching;
DBT key, val; // The key-value pair that the cursor currently points to DBT key, val; // The key-value pair that the cursor currently points to
DBT range_lock_left_key, range_lock_right_key; DBT range_lock_left_key, range_lock_right_key;
BOOL left_is_neg_infty, right_is_pos_infty; BOOL left_is_neg_infty, right_is_pos_infty;
OMTCURSOR omtcursor;
u_int64_t root_put_counter; // what was the count on the BRT when we validated the cursor?
TXNID oldest_living_xid;// what was the oldest live txnid when we created the cursor? TXNID oldest_living_xid;// what was the oldest live txnid when we created the cursor?
BOOL is_snapshot_read; // true if query is read_committed, false otherwise BOOL is_snapshot_read; // true if query is read_committed, false otherwise
BOOL is_leaf_mode; BOOL is_leaf_mode;

View file

@ -142,7 +142,7 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke
struct cmd_leafval_heaviside_extra be = {brt, &keydbt}; struct cmd_leafval_heaviside_extra be = {brt, &keydbt};
r = toku_omt_find_zero(BLB_BUFFER(node, 0), toku_cmd_leafval_heaviside, &be, &storeddatav, &idx, NULL); r = toku_omt_find_zero(BLB_BUFFER(node, 0), toku_cmd_leafval_heaviside, &be, &storeddatav, &idx);
if (r==0) { if (r==0) {

View file

@ -83,7 +83,7 @@ verify_msg_in_child_buffer(BRT brt, int type, MSN msn, bytevec key, ITEMLEN keyl
static LEAFENTRY static LEAFENTRY
get_ith_leafentry (BASEMENTNODE bn, int i) { get_ith_leafentry (BASEMENTNODE bn, int i) {
OMTVALUE le_v; OMTVALUE le_v;
int r = toku_omt_fetch(bn->buffer, i, &le_v, NULL); int r = toku_omt_fetch(bn->buffer, i, &le_v);
invariant(r == 0); // this is a bad failure if it happens. invariant(r == 0); // this is a bad failure if it happens.
return (LEAFENTRY)le_v; return (LEAFENTRY)le_v;
} }

View file

@ -143,14 +143,6 @@ toku_brt_header_suppress_rollbacks(struct brt_header *h, TOKUTXN txn) {
h->root_that_created_or_locked_when_empty = rootid; h->root_that_created_or_locked_when_empty = rootid;
} }
static void brt_cursor_invalidate(BRT_CURSOR brtcursor);
// We invalidate all the OMTCURSORS any time we push into the root of the BRT for that OMT.
// We keep a counter on each brt header, but if the brt header is evicted from the cachetable
// then we lose that counter. So we also keep a global counter.
// An alternative would be to keep only the global counter. But that would invalidate all OMTCURSORS
// even from unrelated BRTs. This way we only invalidate an OMTCURSOR if
static u_int64_t global_root_put_counter = 0;
enum reactivity { RE_STABLE, RE_FUSIBLE, RE_FISSIBLE }; enum reactivity { RE_STABLE, RE_FUSIBLE, RE_FISSIBLE };
@ -509,7 +501,7 @@ toku_verify_estimates (BRT t, BRTNODE node, ANCESTORS ancestors, struct pivot_bo
static LEAFENTRY static LEAFENTRY
fetch_from_buf (OMT omt, u_int32_t idx) { fetch_from_buf (OMT omt, u_int32_t idx) {
OMTVALUE v = 0; OMTVALUE v = 0;
int r = toku_omt_fetch(omt, idx, &v, NULL); int r = toku_omt_fetch(omt, idx, &v);
assert_zero(r); assert_zero(r);
return (LEAFENTRY)v; return (LEAFENTRY)v;
} }
@ -1167,7 +1159,7 @@ brtleaf_disk_size(BRTNODE node)
for (j=0; j < n_leafentries; j++) { for (j=0; j < n_leafentries; j++) {
OMTVALUE v; OMTVALUE v;
LEAFENTRY curr_le = NULL; LEAFENTRY curr_le = NULL;
int r = toku_omt_fetch(curr_buffer, j, &v, NULL); int r = toku_omt_fetch(curr_buffer, j, &v);
curr_le = v; curr_le = v;
assert_zero(r); assert_zero(r);
retval += leafentry_disksize(curr_le); retval += leafentry_disksize(curr_le);
@ -1195,7 +1187,7 @@ brtleaf_get_split_loc(
for (j=0; j < n_leafentries; j++) { for (j=0; j < n_leafentries; j++) {
LEAFENTRY curr_le = NULL; LEAFENTRY curr_le = NULL;
OMTVALUE v; OMTVALUE v;
int r = toku_omt_fetch(curr_buffer, j, &v, NULL); int r = toku_omt_fetch(curr_buffer, j, &v);
curr_le = v; curr_le = v;
assert_zero(r); assert_zero(r);
size_so_far += leafentry_disksize(curr_le); size_so_far += leafentry_disksize(curr_le);
@ -1376,7 +1368,7 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
if (splitk) { if (splitk) {
memset(splitk, 0, sizeof *splitk); memset(splitk, 0, sizeof *splitk);
OMTVALUE lev = 0; OMTVALUE lev = 0;
int r=toku_omt_fetch(BLB_BUFFER(node, split_node), toku_omt_size(BLB_BUFFER(node, split_node))-1, &lev, NULL); int r=toku_omt_fetch(BLB_BUFFER(node, split_node), toku_omt_size(BLB_BUFFER(node, split_node))-1, &lev);
assert_zero(r); // that fetch should have worked. assert_zero(r); // that fetch should have worked.
LEAFENTRY le=lev; LEAFENTRY le=lev;
splitk->size = le_keylen(le); splitk->size = le_keylen(le);
@ -1903,7 +1895,7 @@ brt_leaf_put_cmd (
*made_change = 1; *made_change = 1;
if (doing_seqinsert) { if (doing_seqinsert) {
idx = toku_omt_size(bn->buffer); idx = toku_omt_size(bn->buffer);
r = toku_omt_fetch(bn->buffer, idx-1, &storeddatav, NULL); r = toku_omt_fetch(bn->buffer, idx-1, &storeddatav);
if (r != 0) goto fz; if (r != 0) goto fz;
storeddata = storeddatav; storeddata = storeddatav;
int cmp = toku_cmd_leafval_heaviside(storeddata, &be); int cmp = toku_cmd_leafval_heaviside(storeddata, &be);
@ -1912,7 +1904,7 @@ brt_leaf_put_cmd (
} else { } else {
fz: fz:
r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be, r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be,
&storeddatav, &idx, NULL); &storeddatav, &idx);
} }
if (r==DB_NOTFOUND) { if (r==DB_NOTFOUND) {
storeddata = 0; storeddata = 0;
@ -1944,7 +1936,7 @@ brt_leaf_put_cmd (
// Apply to all the matches // Apply to all the matches
r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be, r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be,
&storeddatav, &idx, NULL); &storeddatav, &idx);
if (r == DB_NOTFOUND) break; if (r == DB_NOTFOUND) break;
assert(r==0); assert(r==0);
storeddata=storeddatav; storeddata=storeddatav;
@ -1969,7 +1961,7 @@ brt_leaf_put_cmd (
assert(idx <= num_leafentries_after); assert(idx <= num_leafentries_after);
if (idx == num_leafentries_after) break; //Reached the end of the leaf if (idx == num_leafentries_after) break; //Reached the end of the leaf
r = toku_omt_fetch(bn->buffer, idx, &storeddatav, NULL); r = toku_omt_fetch(bn->buffer, idx, &storeddatav);
assert_zero(r); assert_zero(r);
} }
storeddata=storeddatav; storeddata=storeddatav;
@ -1995,7 +1987,7 @@ brt_leaf_put_cmd (
// Apply to all leafentries // Apply to all leafentries
omt_size = toku_omt_size(bn->buffer); omt_size = toku_omt_size(bn->buffer);
for (u_int32_t idx = 0; idx < omt_size; ) { for (u_int32_t idx = 0; idx < omt_size; ) {
r = toku_omt_fetch(bn->buffer, idx, &storeddatav, NULL); r = toku_omt_fetch(bn->buffer, idx, &storeddatav);
assert_zero(r); assert_zero(r);
storeddata=storeddatav; storeddata=storeddatav;
int deleted = 0; int deleted = 0;
@ -2022,7 +2014,7 @@ brt_leaf_put_cmd (
// Apply to all leafentries if txn is represented // Apply to all leafentries if txn is represented
omt_size = toku_omt_size(bn->buffer); omt_size = toku_omt_size(bn->buffer);
for (u_int32_t idx = 0; idx < omt_size; ) { for (u_int32_t idx = 0; idx < omt_size; ) {
r = toku_omt_fetch(bn->buffer, idx, &storeddatav, NULL); r = toku_omt_fetch(bn->buffer, idx, &storeddatav);
assert_zero(r); assert_zero(r);
storeddata=storeddatav; storeddata=storeddatav;
int deleted = 0; int deleted = 0;
@ -2047,7 +2039,7 @@ brt_leaf_put_cmd (
case BRT_UPDATE: { case BRT_UPDATE: {
u_int32_t idx; u_int32_t idx;
r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be, r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be,
&storeddatav, &idx, NULL); &storeddatav, &idx);
if (r==DB_NOTFOUND) { if (r==DB_NOTFOUND) {
r = do_update(t, bn, se, cmd, idx, NULL, logger, made_change, workdone); r = do_update(t, bn, se, cmd, idx, NULL, logger, made_change, workdone);
} else if (r==0) { } else if (r==0) {
@ -2061,7 +2053,7 @@ brt_leaf_put_cmd (
u_int32_t idx = 0; u_int32_t idx = 0;
u_int32_t num_leafentries_before; u_int32_t num_leafentries_before;
while (idx < (num_leafentries_before = toku_omt_size(bn->buffer))) { while (idx < (num_leafentries_before = toku_omt_size(bn->buffer))) {
r = toku_omt_fetch(bn->buffer, idx, &storeddatav, NULL); r = toku_omt_fetch(bn->buffer, idx, &storeddatav);
assert(r==0); assert(r==0);
storeddata=storeddatav; storeddata=storeddatav;
r = do_update(t, bn, se, cmd, idx, storeddata, logger, made_change, workdone); r = do_update(t, bn, se, cmd, idx, storeddata, logger, made_change, workdone);
@ -3156,7 +3148,6 @@ toku_brt_root_put_cmd (BRT brt, BRT_MSG_S * cmd)
//assert(0==toku_cachetable_assert_all_unpinned(brt->cachetable)); //assert(0==toku_cachetable_assert_all_unpinned(brt->cachetable));
assert(brt->h); assert(brt->h);
brt->h->root_put_counter = global_root_put_counter++;
u_int32_t fullhash; u_int32_t fullhash;
rootp = toku_calculate_root_offset_pointer(brt, &fullhash); rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
@ -3721,8 +3712,6 @@ brt_init_header_partial (BRT t, TOKUTXN txn) {
compute_and_fill_remembered_hash(t); compute_and_fill_remembered_hash(t);
t->h->root_put_counter = global_root_put_counter++;
BLOCKNUM root = t->h->root; BLOCKNUM root = t->h->root;
if ((r=setup_initial_brt_root_node(t, root))!=0) { return r; } if ((r=setup_initial_brt_root_node(t, root))!=0) { return r; }
//printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, t->h, 0); //printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, t->h, 0);
@ -3815,7 +3804,6 @@ int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, LSN max_acceptabl
} }
if (r!=0) return r; if (r!=0) return r;
h->cf = cf; h->cf = cf;
h->root_put_counter = global_root_put_counter++;
toku_cachefile_set_userdata(cf, toku_cachefile_set_userdata(cf,
(void*)h, (void*)h,
brtheader_log_fassociate_during_checkpoint, brtheader_log_fassociate_during_checkpoint,
@ -4154,7 +4142,6 @@ brt_redirect_cursors (BRT brt_to, BRT brt_from) {
while (!toku_list_empty(&brt_from->cursors)) { while (!toku_list_empty(&brt_from->cursors)) {
struct toku_list * c_list = toku_list_head(&brt_from->cursors); struct toku_list * c_list = toku_list_head(&brt_from->cursors);
BRT_CURSOR c = toku_list_struct(c_list, struct brt_cursor, cursors_link); BRT_CURSOR c = toku_list_struct(c_list, struct brt_cursor, cursors_link);
brt_cursor_invalidate(c);
toku_list_remove(&c->cursors_link); toku_list_remove(&c->cursors_link);
@ -4799,12 +4786,10 @@ int toku_brt_flush (BRT brt) {
static inline void static inline void
brt_cursor_cleanup_dbts(BRT_CURSOR c) { brt_cursor_cleanup_dbts(BRT_CURSOR c) {
if (!c->current_in_omt) { if (c->key.data) toku_free(c->key.data);
if (c->key.data) toku_free(c->key.data); if (c->val.data) toku_free(c->val.data);
if (c->val.data) toku_free(c->val.data); memset(&c->key, 0, sizeof(c->key));
memset(&c->key, 0, sizeof(c->key)); memset(&c->val, 0, sizeof(c->val));
memset(&c->val, 0, sizeof(c->val));
}
} }
// //
@ -4834,14 +4819,13 @@ int does_txn_read_entry(TXNID id, TOKUTXN context) {
return rval; return rval;
} }
static inline int brt_cursor_extract_key_and_val( static inline void brt_cursor_extract_key_and_val(
LEAFENTRY le, LEAFENTRY le,
BRT_CURSOR cursor, BRT_CURSOR cursor,
u_int32_t *keylen, u_int32_t *keylen,
void **key, void **key,
u_int32_t *vallen, u_int32_t *vallen,
void **val) { void **val) {
int r = 0;
if (toku_brt_cursor_is_leaf_mode(cursor)) { if (toku_brt_cursor_is_leaf_mode(cursor)) {
*key = le_key_and_len(le, keylen); *key = le_key_and_len(le, keylen);
*val = le; *val = le;
@ -4859,53 +4843,6 @@ static inline int brt_cursor_extract_key_and_val(
*key = le_key_and_len(le, keylen); *key = le_key_and_len(le, keylen);
*val = le_latest_val_and_len(le, vallen); *val = le_latest_val_and_len(le, vallen);
} }
return r;
}
static inline void load_dbts_from_omt(BRT_CURSOR c, DBT *key, DBT *val) {
OMTVALUE le = 0;
int r = toku_omt_cursor_current(c->omtcursor, &le);
assert_zero(r);
r = brt_cursor_extract_key_and_val(le,
c,
&key->size,
&key->data,
&val->size,
&val->data);
assert_zero(r);
}
// When an omt cursor is invalidated, this is the brt-level function
// that is called. This function is only called by the omt logic.
// This callback is called when either (a) the brt logic invalidates one
// cursor (see brt_cursor_invalidate()) or (b) when the omt logic invalidates
// all the cursors for an omt.
static void
brt_cursor_invalidate_callback(OMTCURSOR UU(omt_c), void *extra) {
BRT_CURSOR cursor = extra;
//TODO: #1378 assert that this thread owns omt lock in brtcursor
if (cursor->current_in_omt) {
DBT key,val;
load_dbts_from_omt(cursor, toku_init_dbt(&key), toku_init_dbt(&val));
cursor->key.data = toku_memdup(key.data, key.size);
cursor->val.data = toku_memdup(val.data, val.size);
cursor->key.size = key.size;
cursor->val.size = val.size;
//TODO: Find some way to deal with ENOMEM here.
//Until then, just assert that the memdups worked.
assert(cursor->key.data && cursor->val.data);
cursor->current_in_omt = FALSE;
}
}
// Called at start of every slow query, and only from slow queries.
// When all cursors are invalidated (from writer thread, or insert/delete),
// this function is not used.
static void
brt_cursor_invalidate(BRT_CURSOR brtcursor) {
toku_omt_cursor_invalidate(brtcursor->omtcursor); // will call brt_cursor_invalidate_callback()
} }
int toku_brt_cursor ( int toku_brt_cursor (
@ -4929,7 +4866,6 @@ int toku_brt_cursor (
return ENOMEM; return ENOMEM;
memset(cursor, 0, sizeof(*cursor)); memset(cursor, 0, sizeof(*cursor));
cursor->brt = brt; cursor->brt = brt;
cursor->current_in_omt = FALSE;
cursor->prefetching = FALSE; cursor->prefetching = FALSE;
toku_init_dbt(&cursor->range_lock_left_key); toku_init_dbt(&cursor->range_lock_left_key);
toku_init_dbt(&cursor->range_lock_right_key); toku_init_dbt(&cursor->range_lock_right_key);
@ -4940,11 +4876,6 @@ int toku_brt_cursor (
cursor->is_leaf_mode = FALSE; cursor->is_leaf_mode = FALSE;
cursor->ttxn = ttxn; cursor->ttxn = ttxn;
toku_list_push(&brt->cursors, &cursor->cursors_link); toku_list_push(&brt->cursors, &cursor->cursors_link);
int r = toku_omt_cursor_create(&cursor->omtcursor);
assert_zero(r);
toku_omt_cursor_set_invalidate_callback(cursor->omtcursor,
brt_cursor_invalidate_callback, cursor);
cursor->root_put_counter=0;
*cursorptr = cursor; *cursorptr = cursor;
return 0; return 0;
} }
@ -4986,19 +4917,9 @@ toku_brt_cursor_set_range_lock(BRT_CURSOR cursor, const DBT *left, const DBT *ri
} }
} }
// Called during cursor destruction
// It is the same as brt_cursor_invalidate, except that
// we make sure the callback function is never called.
static void
brt_cursor_invalidate_no_callback(BRT_CURSOR brtcursor) {
toku_omt_cursor_set_invalidate_callback(brtcursor->omtcursor, NULL, NULL);
toku_omt_cursor_invalidate(brtcursor->omtcursor); // will NOT call brt_cursor_invalidate_callback()
}
//TODO: #1378 When we split the ydb lock, touching cursor->cursors_link //TODO: #1378 When we split the ydb lock, touching cursor->cursors_link
//is not thread safe. //is not thread safe.
int toku_brt_cursor_close(BRT_CURSOR cursor) { int toku_brt_cursor_close(BRT_CURSOR cursor) {
brt_cursor_invalidate_no_callback(cursor);
brt_cursor_cleanup_dbts(cursor); brt_cursor_cleanup_dbts(cursor);
if (cursor->range_lock_left_key.data) { if (cursor->range_lock_left_key.data) {
toku_free(cursor->range_lock_left_key.data); toku_free(cursor->range_lock_left_key.data);
@ -5009,7 +4930,6 @@ int toku_brt_cursor_close(BRT_CURSOR cursor) {
toku_destroy_dbt(&cursor->range_lock_right_key); toku_destroy_dbt(&cursor->range_lock_right_key);
} }
toku_list_remove(&cursor->cursors_link); toku_list_remove(&cursor->cursors_link);
toku_omt_cursor_destroy(&cursor->omtcursor);
toku_free_n(cursor, sizeof *cursor); toku_free_n(cursor, sizeof *cursor);
return 0; return 0;
} }
@ -5026,7 +4946,7 @@ static inline BOOL brt_cursor_prefetching(BRT_CURSOR cursor) {
static BOOL static BOOL
brt_cursor_not_set(BRT_CURSOR cursor) { brt_cursor_not_set(BRT_CURSOR cursor) {
assert((cursor->key.data==NULL) == (cursor->val.data==NULL)); assert((cursor->key.data==NULL) == (cursor->val.data==NULL));
return (BOOL)(!cursor->current_in_omt && cursor->key.data == NULL); return (BOOL)(cursor->key.data == NULL);
} }
static int static int
@ -5056,30 +4976,6 @@ heaviside_from_search_t (OMTVALUE lev, void *extra) {
} }
// This is the only function that associates a brt cursor (and its contained
// omt cursor) with a brt node (and its associated omt). This is different
// from older code because the old code associated the omt cursor with the
// omt when the search found a match. In this new design, the omt cursor
// will not be associated with the omt until after the application-level
// callback accepts the search result.
// The lock is necessary because we don't want two threads modifying
// the omt's list of cursors simultaneously.
// Note, this is only place in brt code that calls toku_omt_cursor_set_index().
// Requires: cursor->omtcursor is valid
static inline void
brt_cursor_update(BRT_CURSOR brtcursor) {
//Free old version if it is using local memory.
OMTCURSOR omtcursor = brtcursor->omtcursor;
if (!brtcursor->current_in_omt) {
brt_cursor_cleanup_dbts(brtcursor);
brtcursor->current_in_omt = TRUE;
toku_omt_cursor_associate(brtcursor->leaf_info.to_be.omt, omtcursor);
//no longer touching linked list, and
//only one thread can touch cursor at a time, protected by ydb lock
}
toku_omt_cursor_set_index(omtcursor, brtcursor->leaf_info.to_be.index);
}
// //
// Returns true if the value that is to be read is empty. // Returns true if the value that is to be read is empty.
// //
@ -5402,6 +5298,20 @@ exit:
VERIFY_NODE(t, node); VERIFY_NODE(t, node);
} }
static int
brt_cursor_shortcut (
BRT_CURSOR cursor,
int direction,
BRT_GET_CALLBACK_FUNCTION getf,
void *getf_v,
u_int32_t *keylen,
void **key,
u_int32_t *vallen,
void **val
);
// This is a bottom layer of the search functions. // This is a bottom layer of the search functions.
static int static int
brt_search_basement_node( brt_search_basement_node(
@ -5410,9 +5320,8 @@ brt_search_basement_node(
BRT_GET_CALLBACK_FUNCTION getf, BRT_GET_CALLBACK_FUNCTION getf,
void *getf_v, void *getf_v,
BOOL *doprefetch, BOOL *doprefetch,
BLOCKNUM thisnodename, BRT_CURSOR brtcursor,
u_int32_t fullhash, BOOL can_bulk_fetch
BRT_CURSOR brtcursor
) )
{ {
assert(bn->max_dsn_applied.dsn >= MIN_DSN.dsn); assert(bn->max_dsn_applied.dsn >= MIN_DSN.dsn);
@ -5432,7 +5341,7 @@ brt_search_basement_node(
heaviside_from_search_t, heaviside_from_search_t,
search, search,
direction, direction,
&datav, &idx, NULL); &datav, &idx);
if (r!=0) return r; if (r!=0) return r;
LEAFENTRY le = datav; LEAFENTRY le = datav;
@ -5454,7 +5363,7 @@ brt_search_basement_node(
default: default:
assert(FALSE); assert(FALSE);
} }
r = toku_omt_fetch(bn->buffer, idx, &datav, NULL); r = toku_omt_fetch(bn->buffer, idx, &datav);
assert_zero(r); // we just validated the index assert_zero(r); // we just validated the index
le = datav; le = datav;
if (!is_le_val_del(le,brtcursor)) goto got_a_good_value; if (!is_le_val_del(le,brtcursor)) goto got_a_good_value;
@ -5467,35 +5376,42 @@ got_a_good_value:
u_int32_t vallen; u_int32_t vallen;
void *val; void *val;
r = brt_cursor_extract_key_and_val(le, brt_cursor_extract_key_and_val(le,
brtcursor, brtcursor,
&keylen, &keylen,
&key, &key,
&vallen, &vallen,
&val); &val
);
assert(brtcursor->current_in_omt == FALSE); r = getf(keylen, key, vallen, val, getf_v);
if (r==0) { if (r==0 || r == TOKUDB_CURSOR_CONTINUE) {
r = getf(keylen, key, vallen, val, getf_v);
}
if (r==0) {
// Leave the omtcursor alone above (pass NULL to omt_find/fetch)
// This prevents the omt from calling associate(), which would
// require a lock to keep the list of cursors safe when the omt
// is used by the brt. (We don't want to impose the locking requirement
// on the omt for non-brt uses.)
//
// Instead, all associating of omtcursors with omts (for leaf nodes)
// is done in brt_cursor_update.
brtcursor->leaf_info.to_be.omt = bn->buffer; brtcursor->leaf_info.to_be.omt = bn->buffer;
brtcursor->leaf_info.to_be.index = idx; brtcursor->leaf_info.to_be.index = idx;
brtcursor->leaf_info.fullhash = fullhash;
brtcursor->leaf_info.blocknumber = thisnodename; if (r == TOKUDB_CURSOR_CONTINUE && can_bulk_fetch) {
brt_cursor_update(brtcursor); r = brt_cursor_shortcut(
brtcursor,
direction,
getf,
getf_v,
&keylen,
&key,
&vallen,
&val
);
}
brt_cursor_cleanup_dbts(brtcursor);
brtcursor->key.data = toku_memdup(key, keylen);
brtcursor->val.data = toku_memdup(val, vallen);
brtcursor->key.size = keylen;
brtcursor->val.size = vallen;
//The search was successful. Prefetching can continue. //The search was successful. Prefetching can continue.
*doprefetch = TRUE; *doprefetch = TRUE;
} }
} }
if (r == TOKUDB_CURSOR_CONTINUE) r = 0;
return r; return r;
} }
@ -5511,7 +5427,8 @@ brt_search_node (
BRT_CURSOR brtcursor, BRT_CURSOR brtcursor,
UNLOCKERS unlockers, UNLOCKERS unlockers,
ANCESTORS, ANCESTORS,
struct pivot_bounds const * const bounds struct pivot_bounds const * const bounds,
BOOL can_bulk_fetch
); );
// the number of nodes to prefetch // the number of nodes to prefetch
@ -5591,7 +5508,7 @@ unlock_brtnode_fun (void *v) {
/* search in a node's child */ /* search in a node's child */
static int static int
brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, BOOL *doprefetch, BRT_CURSOR brtcursor, UNLOCKERS unlockers, brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, BOOL *doprefetch, BRT_CURSOR brtcursor, UNLOCKERS unlockers,
ANCESTORS ancestors, struct pivot_bounds const * const bounds) ANCESTORS ancestors, struct pivot_bounds const * const bounds, BOOL can_bulk_fetch)
// Effect: Search in a node's child. Searches are read-only now (at least as far as the hardcopy is concerned). // Effect: Search in a node's child. Searches are read-only now (at least as far as the hardcopy is concerned).
{ {
struct ancestors next_ancestors = {node, childnum, ancestors}; struct ancestors next_ancestors = {node, childnum, ancestors};
@ -5624,7 +5541,7 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
struct unlock_brtnode_extra unlock_extra = {brt,childnode}; struct unlock_brtnode_extra unlock_extra = {brt,childnode};
struct unlockers next_unlockers = {TRUE, unlock_brtnode_fun, (void*)&unlock_extra, unlockers}; struct unlockers next_unlockers = {TRUE, unlock_brtnode_fun, (void*)&unlock_extra, unlockers};
int r = brt_search_node(brt, childnode, search, bfe.child_to_read, getf, getf_v, doprefetch, brtcursor, &next_unlockers, &next_ancestors, bounds); int r = brt_search_node(brt, childnode, search, bfe.child_to_read, getf, getf_v, doprefetch, brtcursor, &next_unlockers, &next_ancestors, bounds, can_bulk_fetch);
if (r!=TOKUDB_TRY_AGAIN) { if (r!=TOKUDB_TRY_AGAIN) {
// Even if r is reactive, we want to handle the maybe reactive child. // Even if r is reactive, we want to handle the maybe reactive child.
@ -5714,7 +5631,8 @@ brt_search_node(
BRT_CURSOR brtcursor, BRT_CURSOR brtcursor,
UNLOCKERS unlockers, UNLOCKERS unlockers,
ANCESTORS ancestors, ANCESTORS ancestors,
struct pivot_bounds const * const bounds struct pivot_bounds const * const bounds,
BOOL can_bulk_fetch
) )
{ int r = 0; { int r = 0;
// assert that we got a valid child_to_search // assert that we got a valid child_to_search
@ -5749,7 +5667,8 @@ brt_search_node(
brtcursor, brtcursor,
unlockers, unlockers,
ancestors, ancestors,
&next_bounds &next_bounds,
can_bulk_fetch
); );
} }
else { else {
@ -5759,9 +5678,8 @@ brt_search_node(
getf, getf,
getf_v, getf_v,
doprefetch, doprefetch,
node->thisnodename, brtcursor,
node->fullhash, can_bulk_fetch
brtcursor
); );
} }
if (r == 0) return r; //Success if (r == 0) return r; //Success
@ -5799,7 +5717,7 @@ brt_search_node(
} }
static int static int
toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, BRT_CURSOR brtcursor, u_int64_t *root_put_counter) toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, BRT_CURSOR brtcursor, BOOL can_bulk_fetch)
// Effect: Perform a search. Associate cursor with a leaf if possible. // Effect: Perform a search. Associate cursor with a leaf if possible.
// All searches are performed through this function. // All searches are performed through this function.
{ {
@ -5809,8 +5727,6 @@ toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf,
assert(brt->h); assert(brt->h);
*root_put_counter = brt->h->root_put_counter;
u_int32_t fullhash; u_int32_t fullhash;
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt, &fullhash); CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt, &fullhash);
@ -5861,7 +5777,7 @@ toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf,
{ {
BOOL doprefetch = FALSE; BOOL doprefetch = FALSE;
//static int counter = 0; counter++; //static int counter = 0; counter++;
r = brt_search_node(brt, node, search, bfe.child_to_read, getf, getf_v, &doprefetch, brtcursor, &unlockers, (ANCESTORS)NULL, &infinite_bounds); r = brt_search_node(brt, node, search, bfe.child_to_read, getf, getf_v, &doprefetch, brtcursor, &unlockers, (ANCESTORS)NULL, &infinite_bounds, can_bulk_fetch);
if (r==TOKUDB_TRY_AGAIN) { if (r==TOKUDB_TRY_AGAIN) {
// there are two cases where we get TOKUDB_TRY_AGAIN // there are two cases where we get TOKUDB_TRY_AGAIN
// case 1 is when some later call to toku_pin_brtnode returned // case 1 is when some later call to toku_pin_brtnode returned
@ -5913,10 +5829,9 @@ struct brt_cursor_search_struct {
/* search for the first kv pair that matches the search object */ /* search for the first kv pair that matches the search object */
static int static int
brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, BOOL can_bulk_fetch)
{ {
brt_cursor_invalidate(cursor); int r = toku_brt_search(cursor->brt, search, getf, getf_v, cursor, can_bulk_fetch);
int r = toku_brt_search(cursor->brt, search, getf, getf_v, cursor, &cursor->root_put_counter);
return r; return r;
} }
@ -5946,7 +5861,6 @@ brt_cursor_current_getf(ITEMLEN keylen, bytevec key,
} else { } else {
BRT_CURSOR cursor = bcss->cursor; BRT_CURSOR cursor = bcss->cursor;
DBT newkey = {.size=keylen, .data=(void*)key}; // initializes other fields to zero DBT newkey = {.size=keylen, .data=(void*)key}; // initializes other fields to zero
//Safe to access cursor->key/val because current_in_omt is FALSE
if (compare_k_x(cursor->brt, &cursor->key, &newkey) != 0) { if (compare_k_x(cursor->brt, &cursor->key, &newkey) != 0) {
r = bcss->getf(0, NULL, 0, NULL, bcss->getf_v); // This was once DB_KEYEMPTY r = bcss->getf(0, NULL, 0, NULL, bcss->getf_v); // This was once DB_KEYEMPTY
if (r==0) r = TOKUDB_FOUND_BUT_REJECTED; if (r==0) r = TOKUDB_FOUND_BUT_REJECTED;
@ -5963,14 +5877,12 @@ toku_brt_cursor_current(BRT_CURSOR cursor, int op, BRT_GET_CALLBACK_FUNCTION get
if (brt_cursor_not_set(cursor)) if (brt_cursor_not_set(cursor))
return EINVAL; return EINVAL;
if (op == DB_CURRENT) { if (op == DB_CURRENT) {
brt_cursor_invalidate(cursor);
struct brt_cursor_search_struct bcss = {getf, getf_v, cursor, 0}; struct brt_cursor_search_struct bcss = {getf, getf_v, cursor, 0};
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set, BRT_SEARCH_LEFT, &cursor->key, cursor->brt); brt_search_t search; brt_search_init(&search, brt_cursor_compare_set, BRT_SEARCH_LEFT, &cursor->key, cursor->brt);
int r = toku_brt_search(cursor->brt, &search, brt_cursor_current_getf, &bcss, cursor, &cursor->root_put_counter); int r = toku_brt_search(cursor->brt, &search, brt_cursor_current_getf, &bcss, cursor, FALSE);
brt_search_finish(&search); brt_search_finish(&search);
return r; return r;
} }
brt_cursor_invalidate(cursor);
return getf(cursor->key.size, cursor->key.data, cursor->val.size, cursor->val.data, getf_v); // brt_cursor_copyout(cursor, outkey, outval); return getf(cursor->key.size, cursor->key.data, cursor->val.size, cursor->val.data, getf_v); // brt_cursor_copyout(cursor, outkey, outval);
} }
@ -5988,7 +5900,7 @@ toku_brt_flatten(BRT brt, TOKUTXN ttxn)
int r = toku_brt_cursor(brt, &tmp_cursor, ttxn, FALSE); int r = toku_brt_cursor(brt, &tmp_cursor, ttxn, FALSE);
if (r!=0) return r; if (r!=0) return r;
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_LEFT, 0, tmp_cursor->brt); brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_LEFT, 0, tmp_cursor->brt);
r = brt_cursor_search(tmp_cursor, &search, brt_flatten_getf, NULL); r = brt_cursor_search(tmp_cursor, &search, brt_flatten_getf, NULL, FALSE);
brt_search_finish(&search); brt_search_finish(&search);
if (r==DB_NOTFOUND) r = 0; if (r==DB_NOTFOUND) r = 0;
{ {
@ -6003,7 +5915,7 @@ int
toku_brt_cursor_first(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) toku_brt_cursor_first(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v)
{ {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_LEFT, 0, cursor->brt); brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_LEFT, 0, cursor->brt);
int r = brt_cursor_search(cursor, &search, getf, getf_v); int r = brt_cursor_search(cursor, &search, getf, getf_v, FALSE);
brt_search_finish(&search); brt_search_finish(&search);
return r; return r;
} }
@ -6012,8 +5924,8 @@ int
toku_brt_cursor_last(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) toku_brt_cursor_last(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v)
{ {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_RIGHT, 0, cursor->brt); brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_RIGHT, 0, cursor->brt);
int r = brt_cursor_search(cursor, &search, getf, getf_v); int r = brt_cursor_search(cursor, &search, getf, getf_v, FALSE);
brt_search_finish(&search);; brt_search_finish(&search);
return r; return r;
} }
@ -6022,113 +5934,66 @@ static int brt_cursor_compare_next(brt_search_t *search, DBT *x) {
return compare_k_x(brt, search->k, x) < 0; /* return min xy: kv < xy */ return compare_k_x(brt, search->k, x) < 0; /* return min xy: kv < xy */
} }
static int
brt_cursor_shortcut (BRT_CURSOR cursor, int direction, u_int32_t limit, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) {
int r;
OMTCURSOR omtcursor = cursor->omtcursor;
OMT omt = toku_omt_cursor_get_omt(omtcursor);
u_int64_t h_put_counter = cursor->brt->h->root_put_counter;
u_int64_t c_put_counter = cursor->root_put_counter;
BOOL found = FALSE;
//Verify that no messages have been inserted
//since the last time the cursor's pointer was set.
//Also verify the omt cursor is still valid.
//(Necessary to recheck after the maybe_get_and_pin)
if (c_put_counter==h_put_counter && toku_omt_cursor_is_valid(cursor->omtcursor)) {
u_int32_t index = 0;
r = toku_omt_cursor_current_index(omtcursor, &index);
assert_zero(r);
//Starting with the prev, find the first real (non-provdel) leafentry.
while (index != limit) {
OMTVALUE le = NULL;
index += direction;
r = toku_omt_fetch(omt, index, &le, NULL);
assert_zero(r);
if (toku_brt_cursor_is_leaf_mode(cursor) || !is_le_val_del(le, cursor)) {
u_int32_t keylen;
void *key;
u_int32_t vallen;
void *val;
r = brt_cursor_extract_key_and_val(le,
cursor,
&keylen,
&key,
&vallen,
&val);
if (r==0) {
r = getf(keylen, key, vallen, val, getf_v);
}
if (r==0) {
//Update cursor.
cursor->leaf_info.to_be.index = index;
brt_cursor_update(cursor);
found = TRUE;
}
break;
}
}
if (r==0 && !found) r = DB_NOTFOUND;
}
else r = EINVAL;
return r;
}
static int static int
brt_cursor_maybe_get_and_pin_leaf(BRT_CURSOR brtcursor, BRTNODE* leafp) { brt_cursor_shortcut (
void *leafv; BRT_CURSOR cursor,
int r = toku_cachetable_maybe_get_and_pin_clean(brtcursor->brt->cf, int direction,
brtcursor->leaf_info.blocknumber, BRT_GET_CALLBACK_FUNCTION getf,
brtcursor->leaf_info.fullhash, void *getf_v,
&leafv); u_int32_t *keylen,
if (r == 0) { void **key,
*leafp = leafv; u_int32_t *vallen,
} void **val
return r; )
}
static void
brt_cursor_unpin_leaf(BRT_CURSOR brtcursor, BRTNODE leaf) {
toku_unpin_brtnode(brtcursor->brt, leaf);
}
static int
brt_cursor_next_shortcut (BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v)
// Effect: If possible, increment the cursor and return the key-value pair
// (i.e., the next one from what the cursor pointed to before.)
// That is, do DB_NEXT on DUP databases, and do DB_NEXT_NODUP on NODUP databases.
{ {
int r; int r = 0;
if (toku_omt_cursor_is_valid(cursor->omtcursor)) { u_int32_t index = cursor->leaf_info.to_be.index;
BRTNODE leaf; OMT omt = cursor->leaf_info.to_be.omt;
r = brt_cursor_maybe_get_and_pin_leaf(cursor, &leaf); // if we are searching towards the end, limit is last element
if (r == 0) { // if we are searching towards the beginning, limit is the first element
u_int32_t limit = toku_omt_size(toku_omt_cursor_get_omt(cursor->omtcursor)) - 1; u_int32_t limit = (direction > 0) ? (toku_omt_size(omt) - 1) : 0;
r = brt_cursor_shortcut(cursor, 1, limit, getf, getf_v);
brt_cursor_unpin_leaf(cursor, leaf); //Starting with the prev, find the first real (non-provdel) leafentry.
while (index != limit) {
OMTVALUE le = NULL;
index += direction;
r = toku_omt_fetch(omt, index, &le);
assert_zero(r);
if (toku_brt_cursor_is_leaf_mode(cursor) || !is_le_val_del(le, cursor)) {
brt_cursor_extract_key_and_val(
le,
cursor,
keylen,
key,
vallen,
val
);
r = getf(*keylen, *key, *vallen, *val, getf_v);
if (r==0 || r == TOKUDB_CURSOR_CONTINUE) {
//Update cursor.
cursor->leaf_info.to_be.index = index;
}
if (r== TOKUDB_CURSOR_CONTINUE) {
continue;
}
else {
break;
}
} }
} }
else r = EINVAL;
return r; return r;
} }
int int
toku_brt_cursor_next(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) toku_brt_cursor_next(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v)
{ {
int r; brt_search_t search; brt_search_init(&search, brt_cursor_compare_next, BRT_SEARCH_LEFT, &cursor->key, cursor->brt);
if (brt_cursor_next_shortcut(cursor, getf, getf_v)==0) { int r = brt_cursor_search(cursor, &search, getf, getf_v, TRUE);
r = 0; brt_search_finish(&search);
} else {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next, BRT_SEARCH_LEFT, &cursor->key, cursor->brt);
r = brt_cursor_search(cursor, &search, getf, getf_v);
brt_search_finish(&search);
}
if (r == 0) brt_cursor_set_prefetching(cursor); if (r == 0) brt_cursor_set_prefetching(cursor);
return r; return r;
} }
@ -6158,33 +6023,11 @@ brt_cursor_search_eq_k_x_getf(ITEMLEN keylen, bytevec key,
static int static int
brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v)
{ {
brt_cursor_invalidate(cursor);
struct brt_cursor_search_struct bcss = {getf, getf_v, cursor, search}; struct brt_cursor_search_struct bcss = {getf, getf_v, cursor, search};
int r = toku_brt_search(cursor->brt, search, brt_cursor_search_eq_k_x_getf, &bcss, cursor, &cursor->root_put_counter); int r = toku_brt_search(cursor->brt, search, brt_cursor_search_eq_k_x_getf, &bcss, cursor, FALSE);
return r; return r;
} }
static int
brt_cursor_prev_shortcut (BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v)
// Effect: If possible, decrement the cursor and return the key-value pair
// (i.e., the previous one from what the cursor pointed to before.)
// That is, do DB_PREV on DUP databases, and do DB_PREV_NODUP on NODUP databases.
{
int r;
if (toku_omt_cursor_is_valid(cursor->omtcursor)) {
BRTNODE leaf;
r = brt_cursor_maybe_get_and_pin_leaf(cursor, &leaf);
if (r == 0) {
r = brt_cursor_shortcut(cursor, -1, 0, getf, getf_v);
brt_cursor_unpin_leaf(cursor, leaf);
}
}
else r = EINVAL;
return r;
}
static int brt_cursor_compare_prev(brt_search_t *search, DBT *x) { static int brt_cursor_compare_prev(brt_search_t *search, DBT *x) {
BRT brt = search->context; BRT brt = search->context;
return compare_k_x(brt, search->k, x) > 0; /* return max xy: kv > xy */ return compare_k_x(brt, search->k, x) > 0; /* return max xy: kv > xy */
@ -6193,10 +6036,8 @@ static int brt_cursor_compare_prev(brt_search_t *search, DBT *x) {
int int
toku_brt_cursor_prev(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) toku_brt_cursor_prev(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v)
{ {
if (brt_cursor_prev_shortcut(cursor, getf, getf_v)==0)
return 0;
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev, BRT_SEARCH_RIGHT, &cursor->key, cursor->brt); brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev, BRT_SEARCH_RIGHT, &cursor->key, cursor->brt);
int r = brt_cursor_search(cursor, &search, getf, getf_v); int r = brt_cursor_search(cursor, &search, getf, getf_v, TRUE);
brt_search_finish(&search); brt_search_finish(&search);
return r; return r;
} }
@ -6219,7 +6060,7 @@ int
toku_brt_cursor_set_range(BRT_CURSOR cursor, DBT *key, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) toku_brt_cursor_set_range(BRT_CURSOR cursor, DBT *key, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v)
{ {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range, BRT_SEARCH_LEFT, key, cursor->brt); brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range, BRT_SEARCH_LEFT, key, cursor->brt);
int r = brt_cursor_search(cursor, &search, getf, getf_v); int r = brt_cursor_search(cursor, &search, getf, getf_v, FALSE);
brt_search_finish(&search); brt_search_finish(&search);
return r; return r;
} }
@ -6233,7 +6074,7 @@ int
toku_brt_cursor_set_range_reverse(BRT_CURSOR cursor, DBT *key, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v) toku_brt_cursor_set_range_reverse(BRT_CURSOR cursor, DBT *key, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v)
{ {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range_reverse, BRT_SEARCH_RIGHT, key, cursor->brt); brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range_reverse, BRT_SEARCH_RIGHT, key, cursor->brt);
int r = brt_cursor_search(cursor, &search, getf, getf_v); int r = brt_cursor_search(cursor, &search, getf, getf_v, FALSE);
brt_search_finish(&search); brt_search_finish(&search);
return r; return r;
} }
@ -6284,7 +6125,6 @@ toku_brt_cursor_peek(BRT_CURSOR cursor, const DBT **pkey, const DBT **pval)
// Requires: The caller must be in the context of a // Requires: The caller must be in the context of a
// BRT_GET_(STRADDLE_)CALLBACK_FUNCTION // BRT_GET_(STRADDLE_)CALLBACK_FUNCTION
{ {
if (cursor->current_in_omt) load_dbts_from_omt(cursor, &cursor->key, &cursor->val);
*pkey = &cursor->key; *pkey = &cursor->key;
*pval = &cursor->val; *pval = &cursor->val;
} }
@ -6355,12 +6195,6 @@ toku_brt_cursor_delete(BRT_CURSOR cursor, int flags, TOKUTXN txn) {
r = toku_brt_cursor_current(cursor, DB_CURRENT, getf_nothing, NULL); r = toku_brt_cursor_current(cursor, DB_CURRENT, getf_nothing, NULL);
} }
if (r == 0) { if (r == 0) {
//We need to have access to the (key,val) that the cursor points to.
//By invalidating the cursor we guarantee we have a local copy.
//
//If we try to use the omtcursor, there exists a race condition
//(node could be evicted), but maybe_get_and_pin() prevents delete.
brt_cursor_invalidate(cursor);
r = toku_brt_delete(cursor->brt, &cursor->key, txn); r = toku_brt_delete(cursor->brt, &cursor->key, txn);
} }
} }
@ -6417,7 +6251,7 @@ static void toku_brt_keyrange_internal (BRT brt, CACHEKEY nodename,
if (BP_STATE(node,i) == PT_AVAIL) { if (BP_STATE(node,i) == PT_AVAIL) {
struct cmd_leafval_heaviside_extra be = {brt, key}; struct cmd_leafval_heaviside_extra be = {brt, key};
u_int32_t idx; u_int32_t idx;
int r = toku_omt_find_zero(BLB_BUFFER(node, i), toku_cmd_leafval_heaviside, &be, 0, &idx, NULL); int r = toku_omt_find_zero(BLB_BUFFER(node, i), toku_cmd_leafval_heaviside, &be, 0, &idx);
*less += idx; *less += idx;
*greater += toku_omt_size(BLB_BUFFER(node, i))-idx; *greater += toku_omt_size(BLB_BUFFER(node, i))-idx;
if (r==0) { if (r==0) {
@ -6543,7 +6377,7 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, struct kv_
if (0) if (0)
for (int j=0; j<size; j++) { for (int j=0; j<size; j++) {
OMTVALUE v = 0; OMTVALUE v = 0;
r = toku_omt_fetch(BLB_BUFFER(node, i), j, &v, 0); r = toku_omt_fetch(BLB_BUFFER(node, i), j, &v);
assert_zero(r); assert_zero(r);
fprintf(file, " [%d]=", j); fprintf(file, " [%d]=", j);
print_leafentry(file, v); print_leafentry(file, v);

View file

@ -410,7 +410,7 @@ is_filenum_reserved(CACHETABLE ct, FILENUM filenum) {
int r; int r;
BOOL rval; BOOL rval;
r = toku_omt_find_zero(ct->reserved_filenums, find_by_filenum, &filenum, &v, NULL, NULL); r = toku_omt_find_zero(ct->reserved_filenums, find_by_filenum, &filenum, &v, NULL);
if (r==0) { if (r==0) {
FILENUM* found = v; FILENUM* found = v;
assert(found->fileid == filenum.fileid); assert(found->fileid == filenum.fileid);
@ -429,7 +429,7 @@ reserve_filenum(CACHETABLE ct, FILENUM filenum) {
assert(filenum.fileid != FILENUM_NONE.fileid); assert(filenum.fileid != FILENUM_NONE.fileid);
uint32_t index; uint32_t index;
r = toku_omt_find_zero(ct->reserved_filenums, find_by_filenum, &filenum, NULL, &index, NULL); r = toku_omt_find_zero(ct->reserved_filenums, find_by_filenum, &filenum, NULL, &index);
assert(r==DB_NOTFOUND); assert(r==DB_NOTFOUND);
FILENUM *XMALLOC(entry); FILENUM *XMALLOC(entry);
*entry = filenum; *entry = filenum;
@ -443,7 +443,7 @@ unreserve_filenum(CACHETABLE ct, FILENUM filenum) {
int r; int r;
uint32_t index; uint32_t index;
r = toku_omt_find_zero(ct->reserved_filenums, find_by_filenum, &filenum, &v, &index, NULL); r = toku_omt_find_zero(ct->reserved_filenums, find_by_filenum, &filenum, &v, &index);
assert(r==0); assert(r==0);
FILENUM* found = v; FILENUM* found = v;
assert(found->fileid == filenum.fileid); assert(found->fileid == filenum.fileid);

View file

@ -1192,7 +1192,7 @@ int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result) {
OMTVALUE txnfound; OMTVALUE txnfound;
int rval; int rval;
int r = toku_omt_find_zero(logger->live_txns, find_by_xid, &txnid, &txnfound, NULL, NULL); int r = toku_omt_find_zero(logger->live_txns, find_by_xid, &txnid, &txnfound, NULL);
if (r==0) { if (r==0) {
TOKUTXN txn = txnfound; TOKUTXN txn = txnfound;
assert(txn->txnid64==txnid); assert(txn->txnid64==txnid);

View file

@ -48,15 +48,6 @@ struct omt {
struct omt_array a; struct omt_array a;
struct omt_tree t; struct omt_tree t;
} i; } i;
OMTCURSOR associated; // the OMTs associated with this.
};
struct omt_cursor {
OMT omt; // The omt this cursor is associated with. NULL if not present.
void (*invalidate)(OMTCURSOR, void*);
void *invalidate_extra;
u_int32_t index; // This is the state for the initial implementation
OMTCURSOR next,prev; // circular linked list of all OMTCURSORs associated with omt.
}; };
static inline int static inline int
@ -66,7 +57,6 @@ omt_create_no_array(OMT *omtp) {
result->is_array = TRUE; result->is_array = TRUE;
result->i.a.num_values = 0; result->i.a.num_values = 0;
result->i.a.start_idx = 0; result->i.a.start_idx = 0;
result->associated = NULL;
*omtp = result; *omtp = result;
return 0; return 0;
} }
@ -99,82 +89,6 @@ toku_omt_create_steal_sorted_array(OMT *omtp, OMTVALUE **valuesp, u_int32_t numv
return 0; return 0;
} }
int toku_omt_cursor_create (OMTCURSOR *omtcp) {
OMTCURSOR MALLOC(c);
if (c==NULL) return errno;
c->omt = NULL;
c->next = c->prev = NULL;
c->invalidate = NULL;
c->invalidate_extra = NULL;
*omtcp = c;
return 0;
}
OMT toku_omt_cursor_get_omt(OMTCURSOR c) {
return c->omt;
}
void toku_omt_cursor_set_invalidate_callback(OMTCURSOR c, void (*f)(OMTCURSOR,void*), void* extra) {
c->invalidate = f;
c->invalidate_extra = extra;
}
void toku_omt_cursor_invalidate (OMTCURSOR c) {
//If already invalid, do nothing.
if (c==NULL || c->omt==NULL) return;
if (c->invalidate) c->invalidate(c, c->invalidate_extra);
if (c->next == c) {
// It's the last one.
c->omt->associated = NULL;
} else {
OMTCURSOR next = c->next;
OMTCURSOR prev = c->prev;
if (c->omt->associated == c) {
c->omt->associated = next;
}
next->prev = prev;
prev->next = next;
}
c->next = c->prev = NULL;
c->omt = NULL;
}
void toku_omt_cursor_destroy (OMTCURSOR *p) {
toku_omt_cursor_invalidate(*p);
toku_free(*p);
*p = NULL;
}
static void invalidate_cursors (OMT omt) {
OMTCURSOR assoced;
while ((assoced = omt->associated)) {
toku_omt_cursor_invalidate(assoced);
}
}
static void associate (OMT omt, OMTCURSOR c)
{
if (c->omt==omt) return;
toku_omt_cursor_invalidate(c);
if (omt->associated==NULL) {
c->prev = c;
c->next = c;
omt->associated = c;
} else {
c->prev = omt->associated->prev;
c->next = omt->associated;
omt->associated->prev->next = c;
omt->associated->prev = c;
}
c->omt = omt;
}
void
toku_omt_cursor_associate(OMT omt, OMTCURSOR c) {
assert(c->omt==NULL||c->omt==omt);
associate(omt, c);
}
static inline u_int32_t nweight(OMT omt, node_idx idx) { static inline u_int32_t nweight(OMT omt, node_idx idx) {
if (idx==NODE_NULL) return 0; if (idx==NODE_NULL) return 0;
else return (omt->i.t.nodes+idx)->weight; else return (omt->i.t.nodes+idx)->weight;
@ -639,50 +553,6 @@ static inline int find_internal_plus(OMT omt, node_idx n_idx, int (*h)(OMTVALUE,
} }
} }
int toku_omt_cursor_is_valid (OMTCURSOR c) {
return c->omt!=NULL;
}
void toku_omt_cursor_set_index(OMTCURSOR c, u_int32_t index) {
assert(c->omt);
c->index = index;
}
int toku_omt_cursor_next (OMTCURSOR c, OMTVALUE *v) {
if (c->omt == NULL) return EINVAL;
int r = toku_omt_fetch(c->omt, c->index+1, v, NULL);
if (r==0) c->index++;
else toku_omt_cursor_invalidate(c);
return r;
}
int toku_omt_cursor_prev (OMTCURSOR c, OMTVALUE *v) {
if (c->omt == NULL) return EINVAL;
if (c->index==0) {
toku_omt_cursor_invalidate(c);
return EINVAL;
}
c->index--;
int r = toku_omt_fetch(c->omt, c->index, v, NULL);
assert(r==0);
return r;
}
int toku_omt_cursor_current (OMTCURSOR c, OMTVALUE *v) {
if (c->omt == NULL) return EINVAL;
int r = toku_omt_fetch(c->omt, c->index, v, NULL);
assert(r==0);
return r;
}
int toku_omt_cursor_current_index(OMTCURSOR c, u_int32_t *index) {
if (c->omt == NULL) return EINVAL;
*index = c->index;
return 0;
}
//TODO: Put all omt API functions here. //TODO: Put all omt API functions here.
int toku_omt_create (OMT *omtp) { int toku_omt_create (OMT *omtp) {
return omt_create_internal(omtp, 2); return omt_create_internal(omtp, 2);
@ -690,7 +560,6 @@ int toku_omt_create (OMT *omtp) {
void toku_omt_destroy(OMT *omtp) { void toku_omt_destroy(OMT *omtp) {
OMT omt=*omtp; OMT omt=*omtp;
invalidate_cursors(omt);
if (omt->is_array) toku_free(omt->i.a.values); if (omt->is_array) toku_free(omt->i.a.values);
else toku_free(omt->i.t.nodes); else toku_free(omt->i.t.nodes);
toku_free(omt); toku_free(omt);
@ -713,7 +582,6 @@ int toku_omt_create_from_sorted_array(OMT *omtp, OMTVALUE *values, u_int32_t num
int toku_omt_insert_at(OMT omt, OMTVALUE value, u_int32_t index) { int toku_omt_insert_at(OMT omt, OMTVALUE value, u_int32_t index) {
int r; int r;
invalidate_cursors(omt);
if (index>omt_size(omt)) return EINVAL; if (index>omt_size(omt)) return EINVAL;
if ((r=maybe_resize_or_convert(omt, 1+omt_size(omt)))) return r; if ((r=maybe_resize_or_convert(omt, 1+omt_size(omt)))) return r;
if (omt->is_array && index!=omt->i.a.num_values && if (omt->is_array && index!=omt->i.a.num_values &&
@ -739,7 +607,6 @@ int toku_omt_insert_at(OMT omt, OMTVALUE value, u_int32_t index) {
int toku_omt_set_at (OMT omt, OMTVALUE value, u_int32_t index) { int toku_omt_set_at (OMT omt, OMTVALUE value, u_int32_t index) {
if (index>=omt_size(omt)) return EINVAL; if (index>=omt_size(omt)) return EINVAL;
invalidate_cursors(omt);
if (omt->is_array) { if (omt->is_array) {
set_at_internal_array(omt, value, index); set_at_internal_array(omt, value, index);
} }
@ -752,7 +619,6 @@ int toku_omt_set_at (OMT omt, OMTVALUE value, u_int32_t index) {
int toku_omt_delete_at(OMT omt, u_int32_t index) { int toku_omt_delete_at(OMT omt, u_int32_t index) {
OMTVALUE v; OMTVALUE v;
int r; int r;
invalidate_cursors(omt);
if (index>=omt_size(omt)) return EINVAL; if (index>=omt_size(omt)) return EINVAL;
if ((r=maybe_resize_or_convert(omt, -1+omt_size(omt)))) return r; if ((r=maybe_resize_or_convert(omt, -1+omt_size(omt)))) return r;
if (omt->is_array && index!=0 && index!=omt->i.a.num_values-1) { if (omt->is_array && index!=0 && index!=omt->i.a.num_values-1) {
@ -772,7 +638,7 @@ int toku_omt_delete_at(OMT omt, u_int32_t index) {
return 0; return 0;
} }
int toku_omt_fetch(OMT V, u_int32_t i, OMTVALUE *v, OMTCURSOR c) { int toku_omt_fetch(OMT V, u_int32_t i, OMTVALUE *v) {
if (i>=omt_size(V)) return EINVAL; if (i>=omt_size(V)) return EINVAL;
if (V->is_array) { if (V->is_array) {
fetch_internal_array(V, i, v); fetch_internal_array(V, i, v);
@ -780,10 +646,6 @@ int toku_omt_fetch(OMT V, u_int32_t i, OMTVALUE *v, OMTCURSOR c) {
else { else {
fetch_internal(V, V->i.t.root, i, v); fetch_internal(V, V->i.t.root, i, v);
} }
if (c) {
associate(V,c);
c->index = i;
}
return 0; return 0;
} }
@ -796,7 +658,6 @@ free_item (OMTVALUE lev, u_int32_t UU(idx), void *vsi) {
void toku_omt_free_items(OMT omt) { void toku_omt_free_items(OMT omt) {
invalidate_cursors(omt);
int r = toku_omt_iterate(omt, free_item, NULL); int r = toku_omt_iterate(omt, free_item, NULL);
lazy_assert_zero(r); lazy_assert_zero(r);
} }
@ -820,9 +681,7 @@ int toku_omt_insert(OMT omt, OMTVALUE value, int(*h)(OMTVALUE, void*v), void *v,
int r; int r;
u_int32_t idx; u_int32_t idx;
invalidate_cursors(omt); r = toku_omt_find_zero(omt, h, v, NULL, &idx);
r = toku_omt_find_zero(omt, h, v, NULL, &idx, NULL);
if (r==0) { if (r==0) {
if (index) *index = idx; if (index) *index = idx;
return DB_KEYEXIST; return DB_KEYEXIST;
@ -835,7 +694,7 @@ int toku_omt_insert(OMT omt, OMTVALUE value, int(*h)(OMTVALUE, void*v), void *v,
return 0; return 0;
} }
int toku_omt_find_zero(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, u_int32_t *index, OMTCURSOR c) { int toku_omt_find_zero(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, u_int32_t *index) {
u_int32_t tmp_index; u_int32_t tmp_index;
if (index==NULL) index=&tmp_index; if (index==NULL) index=&tmp_index;
int r; int r;
@ -845,16 +704,10 @@ int toku_omt_find_zero(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, OMTVAL
else { else {
r = find_internal_zero(V, V->i.t.root, h, extra, value, index); r = find_internal_zero(V, V->i.t.root, h, extra, value, index);
} }
if (c && r==0) {
associate(V,c);
c->index = *index;
} else {
toku_omt_cursor_invalidate(c);
}
return r; return r;
} }
int toku_omt_find(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, int direction, OMTVALUE *value, u_int32_t *index, OMTCURSOR c) { int toku_omt_find(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, int direction, OMTVALUE *value, u_int32_t *index) {
u_int32_t tmp_index; u_int32_t tmp_index;
int r; int r;
if (index==NULL) index=&tmp_index; if (index==NULL) index=&tmp_index;
@ -875,19 +728,12 @@ int toku_omt_find(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, int directi
r = find_internal_plus( V, V->i.t.root, h, extra, value, index); r = find_internal_plus( V, V->i.t.root, h, extra, value, index);
} }
} }
if (c && r==0) {
associate(V,c);
c->index=*index;
} else {
toku_omt_cursor_invalidate(c);
}
return r; return r;
} }
int toku_omt_split_at(OMT omt, OMT *newomtp, u_int32_t index) { int toku_omt_split_at(OMT omt, OMT *newomtp, u_int32_t index) {
int r; int r;
OMT newomt; OMT newomt;
invalidate_cursors(omt);
if (index>omt_size(omt)) return EINVAL; if (index>omt_size(omt)) return EINVAL;
if ((r=omt_convert_to_array(omt))) return r; if ((r=omt_convert_to_array(omt))) return r;
@ -909,8 +755,6 @@ int toku_omt_split_at(OMT omt, OMT *newomtp, u_int32_t index) {
int toku_omt_merge(OMT leftomt, OMT rightomt, OMT *newomtp) { int toku_omt_merge(OMT leftomt, OMT rightomt, OMT *newomtp) {
int r; int r;
OMT newomt = 0; OMT newomt = 0;
invalidate_cursors(leftomt);
invalidate_cursors(rightomt);
u_int32_t newsize = omt_size(leftomt)+omt_size(rightomt); u_int32_t newsize = omt_size(leftomt)+omt_size(rightomt);
if ((r = omt_create_internal(&newomt, newsize))) return r; if ((r = omt_create_internal(&newomt, newsize))) return r;
@ -938,7 +782,6 @@ int toku_omt_merge(OMT leftomt, OMT rightomt, OMT *newomtp) {
} }
void toku_omt_clear(OMT omt) { void toku_omt_clear(OMT omt) {
invalidate_cursors(omt);
if (omt->is_array) { if (omt->is_array) {
omt->i.a.start_idx = 0; omt->i.a.start_idx = 0;
omt->i.a.num_values = 0; omt->i.a.num_values = 0;

View file

@ -153,7 +153,6 @@ extern "C" {
//typedef struct value *OMTVALUE; // A slight improvement over using void*. //typedef struct value *OMTVALUE; // A slight improvement over using void*.
typedef void *OMTVALUE; typedef void *OMTVALUE;
typedef struct omt *OMT; typedef struct omt *OMT;
typedef struct omt_cursor *OMTCURSOR;
int toku_omt_create (OMT *omtp); int toku_omt_create (OMT *omtp);
@ -302,13 +301,7 @@ int toku_omt_delete_at(OMT omt, u_int32_t idx);
// Rationale: To delete an item, first find its index using toku_omt_find, then delete it. // Rationale: To delete an item, first find its index using toku_omt_find, then delete it.
// Performance: time=O(\log N) amortized. // Performance: time=O(\log N) amortized.
void toku_omt_cursor_set_index(OMTCURSOR c, u_int32_t idx); int toku_omt_fetch (OMT V, u_int32_t i, OMTVALUE *v);
// Effect:
// Set the abstract index.
// Requires:
// The cursor is not invalid.
int toku_omt_fetch (OMT V, u_int32_t i, OMTVALUE *v, OMTCURSOR c);
// Effect: Set *v=V_i // Effect: Set *v=V_i
// If c!=NULL then set c's abstract offset to i. // If c!=NULL then set c's abstract offset to i.
// Requires: v != NULL // Requires: v != NULL
@ -323,14 +316,14 @@ int toku_omt_fetch (OMT V, u_int32_t i, OMTVALUE *v, OMTCURSOR c);
// function, the function must remove c's association with the old // function, the function must remove c's association with the old
// OMT, and associate it with the new OMT. // OMT, and associate it with the new OMT.
int toku_omt_find_zero(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, u_int32_t *idx, OMTCURSOR c); int toku_omt_find_zero(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, u_int32_t *idx);
// Effect: Find the smallest i such that h(V_i, extra)>=0 // Effect: Find the smallest i such that h(V_i, extra)>=0
// If there is such an i and h(V_i,extra)==0 then set *index=i and return 0. // If there is such an i and h(V_i,extra)==0 then set *index=i and return 0.
// If there is such an i and h(V_i,extra)>0 then set *index=i and return DB_NOTFOUND. // If there is such an i and h(V_i,extra)>0 then set *index=i and return DB_NOTFOUND.
// If there is no such i then set *index=toku_omt_size(V) and return DB_NOTFOUND. // If there is no such i then set *index=toku_omt_size(V) and return DB_NOTFOUND.
// Requires: index!=NULL // Requires: index!=NULL
int toku_omt_find(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, int direction, OMTVALUE *value, u_int32_t *idx, OMTCURSOR c); int toku_omt_find(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, int direction, OMTVALUE *value, u_int32_t *idx);
// Effect: // Effect:
// If direction >0 then find the smallest i such that h(V_i,extra)>0. // If direction >0 then find the smallest i such that h(V_i,extra)>0.
// If direction <0 then find the largest i such that h(V_i,extra)<0. // If direction <0 then find the largest i such that h(V_i,extra)<0.
@ -422,98 +415,6 @@ void toku_omt_clear(OMT omt);
unsigned long toku_omt_memory_size (OMT omt); unsigned long toku_omt_memory_size (OMT omt);
// Effect: Return the size (in bytes) of the omt, as it resides in main memory. Don't include any of the OMTVALUES. // Effect: Return the size (in bytes) of the omt, as it resides in main memory. Don't include any of the OMTVALUES.
int toku_omt_cursor_create (OMTCURSOR *p);
// Effect: Create an OMTCURSOR. Stores it in *p. The OMTCURSOR is
// initially invalid.
// Requires: p != NULL
// Returns:
// 0 success
// ENOMEM out of memory (and doesn't modify *omtp)
// Performance: constant time.
void toku_omt_cursor_destroy (OMTCURSOR *p);
// Effect: Invalidates *p (if it is valid) and frees any memory
// associated with *p.
// Also sets *p=NULL.
// Rationale: The usage is to do something like
// toku_omt_cursor_destroy(&c);
// and now c will have a NULL pointer instead of a dangling freed pointer.
// Rationale: Returns no values since free() cannot fail.
int toku_omt_cursor_is_valid (OMTCURSOR c);
// Effect: returns 0 iff c is invalid.
// Performance: time=O(1)
int toku_omt_cursor_next (OMTCURSOR c, OMTVALUE *v);
// Effect: Increment c's offset, and find and store the value in v.
// Requires: v != NULL
// Returns
// 0 success
// EINVAL if the offset goes out of range or c is invalid.
// On nonzero return, *v is unchanged and c is invalidated.
// Performance: time=O(log N) worst case, expected time=O(1) for a randomly
// chosen initial position.
int toku_omt_cursor_current_index(OMTCURSOR c, u_int32_t *idx);
// Effect: Stores c's offset in *index.
// Requires: index != NULL
// Returns
// 0 success
// EINVAL c is invalid
// On nonzero return, *index is unchanged and c is unchanged.
// Performance: time=O(1)
OMT toku_omt_cursor_get_omt(OMTCURSOR c);
// Effect: returns the associated omt or NULL if not associated.
// Performance: time=O(1)
int toku_omt_cursor_current (OMTCURSOR c, OMTVALUE *v);
// Effect: Store in v the value pointed by c's abstract offset
// Requires: v != NULL
// Returns
// 0 success
// EINVAL if c is invalid
// On non-zero return, *v is unchanged
// Performance: O(1) time
int toku_omt_cursor_prev (OMTCURSOR c, OMTVALUE *v);
// Effect: Decrement c's offset, and find and store the value in v.
// Requires: v != NULL
// Returns
// 0 success
// EINVAL if the offset goes out of range or c is invalid.
// On nonzero return, *v is unchanged and c is invalidated.
// Performance: time=O(log N) worst case, expected time=O(1) for a randomly
// chosen initial position.
void toku_omt_cursor_invalidate (OMTCURSOR c);
// Effect: Invalidate c. (This does not mean that c is destroyed or
// that its memory is freed.)
// If c is valid, the invalidate callback function (if any) will be called
// before invalidating c.
void toku_omt_cursor_set_invalidate_callback(OMTCURSOR c, void (*f)(OMTCURSOR,void*), void* extra);
// Effect:
// Saves function 'f' to be called whenever the cursor is invalidated.
// 'extra' is passed as an additional parameter to f.
// Requires:
// The lifetime of the 'extra' parameter must continue at least till the cursor
// is destroyed.
void toku_omt_cursor_associate(OMT omt, OMTCURSOR c);
// Effect:
// Associates an omtcursor with an omt.
// Requires:
// The omtcursor is not associated with any other omt.
// Requires:
// toku_omt_associate must be called when the omt-lock is held
// Rationale:
// This is used by brt_cursors for omts representing leaf nodes.
// These omts are touched by multiple threads, and therefore require locks
// for modifying the list of omtcursors.
// We do not want to grab two locks (one for omt, and one for the old
// associated omt).
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus) || defined(__cilkplusplus)
}; };

View file

@ -99,7 +99,7 @@ static void file_map_close_dictionaries(struct file_map *fmap, BOOL recovery_suc
if (n == 0) if (n == 0)
break; break;
OMTVALUE v; OMTVALUE v;
r = toku_omt_fetch(fmap->filenums, n-1, &v, NULL); r = toku_omt_fetch(fmap->filenums, n-1, &v);
assert(r == 0); assert(r == 0);
r = toku_omt_delete_at(fmap->filenums, n-1); r = toku_omt_delete_at(fmap->filenums, n-1);
assert(r == 0); assert(r == 0);
@ -150,7 +150,7 @@ static int file_map_insert (struct file_map *fmap, FILENUM fnum, BRT brt, char *
static void file_map_remove(struct file_map *fmap, FILENUM fnum) { static void file_map_remove(struct file_map *fmap, FILENUM fnum) {
OMTVALUE v; u_int32_t idx; OMTVALUE v; u_int32_t idx;
int r = toku_omt_find_zero(fmap->filenums, file_map_h, &fnum, &v, &idx, NULL); int r = toku_omt_find_zero(fmap->filenums, file_map_h, &fnum, &v, &idx);
if (r == 0) { if (r == 0) {
struct file_map_tuple *tuple = v; struct file_map_tuple *tuple = v;
r = toku_omt_delete_at(fmap->filenums, idx); r = toku_omt_delete_at(fmap->filenums, idx);
@ -162,7 +162,7 @@ static void file_map_remove(struct file_map *fmap, FILENUM fnum) {
// Look up file info: given FILENUM, return file_map_tuple (or DB_NOTFOUND) // Look up file info: given FILENUM, return file_map_tuple (or DB_NOTFOUND)
static int file_map_find(struct file_map *fmap, FILENUM fnum, struct file_map_tuple **file_map_tuple) { static int file_map_find(struct file_map *fmap, FILENUM fnum, struct file_map_tuple **file_map_tuple) {
OMTVALUE v; u_int32_t idx; OMTVALUE v; u_int32_t idx;
int r = toku_omt_find_zero(fmap->filenums, file_map_h, &fnum, &v, &idx, NULL); int r = toku_omt_find_zero(fmap->filenums, file_map_h, &fnum, &v, &idx);
if (r == 0) { if (r == 0) {
struct file_map_tuple *tuple = v; struct file_map_tuple *tuple = v;
assert(tuple->filenum.fileid == fnum.fileid); assert(tuple->filenum.fileid == fnum.fileid);
@ -1142,7 +1142,7 @@ static void recover_abort_live_txns(RECOVER_ENV renv) {
if (n_live_txns == 0) if (n_live_txns == 0)
break; break;
OMTVALUE v; OMTVALUE v;
r = toku_omt_fetch(renv->logger->live_txns, n_live_txns-1, &v, NULL); r = toku_omt_fetch(renv->logger->live_txns, n_live_txns-1, &v);
if (r != 0) if (r != 0)
break; break;
TOKUTXN txn = (TOKUTXN) v; TOKUTXN txn = (TOKUTXN) v;

View file

@ -152,7 +152,7 @@ static int do_insertion (enum brt_msg_type type, FILENUM filenum, BYTESTRING key
(void)toku_cachefile_get_and_pin_fd(cf); (void)toku_cachefile_get_and_pin_fd(cf);
if (!toku_cachefile_is_dev_null_unlocked(cf)) { if (!toku_cachefile_is_dev_null_unlocked(cf)) {
OMTVALUE brtv=NULL; OMTVALUE brtv=NULL;
r = toku_omt_find_zero(txn->open_brts, find_brt_from_filenum, &filenum, &brtv, NULL, NULL); r = toku_omt_find_zero(txn->open_brts, find_brt_from_filenum, &filenum, &brtv, NULL);
assert(r==0); assert(r==0);
BRT brt = brtv; BRT brt = brtv;
@ -561,7 +561,7 @@ toku_rollback_change_fdescriptor(FILENUM filenum,
fd = toku_cachefile_get_and_pin_fd(cf); fd = toku_cachefile_get_and_pin_fd(cf);
if (!toku_cachefile_is_dev_null_unlocked(cf)) { if (!toku_cachefile_is_dev_null_unlocked(cf)) {
OMTVALUE brtv=NULL; OMTVALUE brtv=NULL;
r = toku_omt_find_zero(txn->open_brts, find_brt_from_filenum, &filenum, &brtv, NULL, NULL); r = toku_omt_find_zero(txn->open_brts, find_brt_from_filenum, &filenum, &brtv, NULL);
assert(r==0); assert(r==0);
BRT brt = brtv; BRT brt = brtv;
DESCRIPTOR_S d; DESCRIPTOR_S d;

View file

@ -176,7 +176,7 @@ live_list_reverse_note_txn_end_iter(OMTVALUE live_xidv, u_int32_t UU(index), voi
int r; int r;
OMT reverse = txn->logger->live_list_reverse; OMT reverse = txn->logger->live_list_reverse;
r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, live_xid, &pairv, &idx, NULL); r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, live_xid, &pairv, &idx);
invariant(r==0); invariant(r==0);
pair = pairv; pair = pairv;
invariant(pair->xid1 == *live_xid); //sanity check invariant(pair->xid1 == *live_xid); //sanity check
@ -188,7 +188,7 @@ live_list_reverse_note_txn_end_iter(OMTVALUE live_xidv, u_int32_t UU(index), voi
OMT snapshot = txn->logger->snapshot_txnids; OMT snapshot = txn->logger->snapshot_txnids;
BOOL should_delete = TRUE; BOOL should_delete = TRUE;
// find the youngest txn in snapshot that is older than xid // find the youngest txn in snapshot that is older than xid
r = toku_omt_find(snapshot, toku_find_xid_by_xid, &xid, -1, &olderv, &olderidx, NULL); r = toku_omt_find(snapshot, toku_find_xid_by_xid, &xid, -1, &olderv, &olderidx);
if (r==0) { if (r==0) {
//There is an older txn //There is an older txn
olderxid = olderv; olderxid = olderv;
@ -237,7 +237,7 @@ void toku_rollback_txn_close (TOKUTXN txn) {
//Remove txn from list (omt) of live transactions //Remove txn from list (omt) of live transactions
OMTVALUE txnagain; OMTVALUE txnagain;
u_int32_t idx; u_int32_t idx;
r = toku_omt_find_zero(txn->logger->live_txns, find_xid, txn, &txnagain, &idx, NULL); r = toku_omt_find_zero(txn->logger->live_txns, find_xid, txn, &txnagain, &idx);
assert(r==0); assert(r==0);
assert(txn==txnagain); assert(txn==txnagain);
r = toku_omt_delete_at(txn->logger->live_txns, idx); r = toku_omt_delete_at(txn->logger->live_txns, idx);
@ -248,7 +248,7 @@ void toku_rollback_txn_close (TOKUTXN txn) {
OMTVALUE txnagain; OMTVALUE txnagain;
u_int32_t idx; u_int32_t idx;
//Remove txn from list of live root txns //Remove txn from list of live root txns
r = toku_omt_find_zero(txn->logger->live_root_txns, find_xid, txn, &txnagain, &idx, NULL); r = toku_omt_find_zero(txn->logger->live_root_txns, find_xid, txn, &txnagain, &idx);
assert(r==0); assert(r==0);
assert(txn==txnagain); assert(txn==txnagain);
r = toku_omt_delete_at(txn->logger->live_root_txns, idx); r = toku_omt_delete_at(txn->logger->live_root_txns, idx);
@ -266,7 +266,7 @@ void toku_rollback_txn_close (TOKUTXN txn) {
u_int32_t idx; u_int32_t idx;
OMTVALUE v; OMTVALUE v;
//Free memory used for snapshot_txnids //Free memory used for snapshot_txnids
r = toku_omt_find_zero(txn->logger->snapshot_txnids, toku_find_xid_by_xid, &txn->txnid64, &v, &idx, NULL); r = toku_omt_find_zero(txn->logger->snapshot_txnids, toku_find_xid_by_xid, &txn->txnid64, &v, &idx);
invariant(r==0); invariant(r==0);
TXNID *xid = v; TXNID *xid = v;
invariant(*xid == txn->txnid64); invariant(*xid == txn->txnid64);
@ -280,7 +280,7 @@ void toku_rollback_txn_close (TOKUTXN txn) {
invariant(toku_omt_size(txn->live_root_txn_list) > 0); invariant(toku_omt_size(txn->live_root_txn_list) > 0);
OMTVALUE v; OMTVALUE v;
//store a single array of txnids //store a single array of txnids
r = toku_omt_fetch(txn->live_root_txn_list, 0, &v, NULL); r = toku_omt_fetch(txn->live_root_txn_list, 0, &v);
invariant(r==0); invariant(r==0);
toku_free(v); toku_free(v);
toku_omt_destroy(&txn->live_root_txn_list); toku_omt_destroy(&txn->live_root_txn_list);
@ -293,7 +293,7 @@ void toku_rollback_txn_close (TOKUTXN txn) {
TOKULOGGER logger = txn->logger; TOKULOGGER logger = txn->logger;
OMTVALUE oldest_txnv; OMTVALUE oldest_txnv;
r = toku_omt_fetch(logger->live_txns, 0, &oldest_txnv, NULL); r = toku_omt_fetch(logger->live_txns, 0, &oldest_txnv);
if (r==0) { if (r==0) {
TOKUTXN oldest_txn = oldest_txnv; TOKUTXN oldest_txn = oldest_txnv;
assert(oldest_txn != txn); // We just removed it assert(oldest_txn != txn); // We just removed it
@ -638,7 +638,7 @@ int toku_txn_note_brt (TOKUTXN txn, BRT brt) {
OMTVALUE txnv; OMTVALUE txnv;
u_int32_t index; u_int32_t index;
// Does brt already know about transaction txn? // Does brt already know about transaction txn?
int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv, &index, NULL); int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv, &index);
if (r==0) { if (r==0) {
// It's already there. // It's already there.
assert((TOKUTXN)txnv==txn); assert((TOKUTXN)txnv==txn);
@ -669,7 +669,7 @@ static int swap_brt (OMTVALUE txnv, u_int32_t UU(idx), void *extra) {
int r; int r;
r = toku_txn_note_brt(txn, info->live); //Add new brt. r = toku_txn_note_brt(txn, info->live); //Add new brt.
assert(r==0); assert(r==0);
r = toku_omt_find_zero(txn->open_brts, find_filenum, info->zombie, &zombie_again, &index, NULL); r = toku_omt_find_zero(txn->open_brts, find_filenum, info->zombie, &zombie_again, &index);
assert(r==0); assert(r==0);
assert((void*)zombie_again==info->zombie); assert((void*)zombie_again==info->zombie);
r = toku_omt_delete_at(txn->open_brts, index); //Delete old brt. r = toku_omt_delete_at(txn->open_brts, index); //Delete old brt.
@ -703,7 +703,7 @@ static int remove_brt (OMTVALUE txnv, u_int32_t UU(idx), void *brtv) {
BRT brt = brtv; BRT brt = brtv;
OMTVALUE brtv_again=NULL; OMTVALUE brtv_again=NULL;
u_int32_t index; u_int32_t index;
int r = toku_omt_find_zero(txn->open_brts, find_filenum, brt, &brtv_again, &index, NULL); int r = toku_omt_find_zero(txn->open_brts, find_filenum, brt, &brtv_again, &index);
assert(r==0); assert(r==0);
assert((void*)brtv_again==brtv); assert((void*)brtv_again==brtv);
r = toku_omt_delete_at(txn->open_brts, index); r = toku_omt_delete_at(txn->open_brts, index);
@ -723,7 +723,7 @@ static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv) {
TOKUTXN txn = txnv; TOKUTXN txn = txnv;
OMTVALUE txnv_again=NULL; OMTVALUE txnv_again=NULL;
u_int32_t index; u_int32_t index;
int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv_again, &index, NULL); int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv_again, &index);
assert(r==0); assert(r==0);
assert((void*)txnv_again==txnv); assert((void*)txnv_again==txnv);
r = toku_omt_delete_at(brt->txns, index); r = toku_omt_delete_at(brt->txns, index);
@ -760,7 +760,7 @@ int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr) {
struct tokutxn fake_txn; fake_txn.txnid64 = xid; struct tokutxn fake_txn; fake_txn.txnid64 = xid;
u_int32_t index; u_int32_t index;
OMTVALUE txnv; OMTVALUE txnv;
int r = toku_omt_find_zero(brt->txns, find_xid, &fake_txn, &txnv, &index, NULL); int r = toku_omt_find_zero(brt->txns, find_xid, &fake_txn, &txnv, &index);
if (r == 0) *txnptr = txnv; if (r == 0) *txnptr = txnv;
return r; return r;
} }

View file

@ -1,118 +0,0 @@
#ident "$Id$"
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#include <toku_portability.h>
#include <errno.h>
#include "test.h"
#include <stdio.h>
#include "omt.h"
#include "memory.h"
#include "brttypes.h"
#include <stdlib.h>
#include <string.h>
enum { N=10 };
struct value { int x; } vs[N];
OMTVALUE ps[N];
#define V(x) ((struct value *)(x))
static void callback (OMTCURSOR c, void *extra) {
if (verbose) printf("%s:%d %p %p\n", __FUNCTION__, __LINE__, c, extra);
OMTVALUE v = NULL;
int r = toku_omt_cursor_current(c, &v);
if (verbose) printf("%s:%d %d\n", __FUNCTION__, __LINE__, r);
}
static void test (void) {
OMT o;
OMTCURSOR curs, curs2, curs3;
int i, r;
OMTVALUE v;
for (i=0; i<N; i++) {
vs[i].x=i;
ps[i]=&vs[i];
}
// destroy the omt first
r = toku_omt_create_from_sorted_array(&o, ps, 10); assert(r==0);
r = toku_omt_cursor_create(&curs); assert(r==0);
r = toku_omt_fetch(o, 5, &v, curs); assert(r==0);
toku_omt_destroy(&o);
toku_omt_cursor_destroy(&curs);
// destroy the cursor first
r = toku_omt_create_from_sorted_array(&o, ps, 10); assert(r==0);
r = toku_omt_cursor_create(&curs); assert(r==0);
r = toku_omt_fetch(o, 5, &v, curs); assert(r==0);
assert(V(v)->x==5);
r = toku_omt_cursor_next(curs, &v);
assert(r==0 && V(v)->x==6);
r = toku_omt_cursor_prev(curs, &v);
assert(r==0 && V(v)->x==5);
toku_omt_cursor_destroy(&curs);
toku_omt_destroy(&o);
// Create two cursors, destroy omt first
r = toku_omt_create_from_sorted_array(&o, ps, 10); assert(r==0);
r = toku_omt_cursor_create(&curs); assert(r==0);
r = toku_omt_fetch(o, 5, &v, curs); assert(r==0);
r = toku_omt_cursor_create(&curs2); assert(r==0);
r = toku_omt_fetch(o, 4, &v, curs2); assert(r==0);
r = toku_omt_cursor_next(curs, &v); assert(r==0 && V(v)->x==6);
toku_omt_destroy(&o);
toku_omt_cursor_destroy(&curs);
toku_omt_cursor_destroy(&curs2);
// Create two cursors, destroy them first
r = toku_omt_create_from_sorted_array(&o, ps, 10); assert(r==0);
r = toku_omt_cursor_create(&curs); assert(r==0);
r = toku_omt_fetch(o, 5, &v, curs); assert(r==0);
r = toku_omt_cursor_create(&curs2); assert(r==0);
r = toku_omt_fetch(o, 4, &v, curs2); assert(r==0);
r = toku_omt_cursor_next(curs, &v); assert(r==0 && V(v)->x==6);
r = toku_omt_cursor_prev(curs2, &v); assert(r==0 && V(v)->x==3);
toku_omt_cursor_destroy(&curs);
r = toku_omt_cursor_prev(curs2, &v); assert(r==0 && V(v)->x==2);
toku_omt_cursor_destroy(&curs2);
toku_omt_destroy(&o);
// Create three cursors, destroy them first
r = toku_omt_create_from_sorted_array(&o, ps, 10); assert(r==0);
r = toku_omt_cursor_create(&curs); assert(r==0);
r = toku_omt_fetch(o, 5, &v, curs); assert(r==0);
r = toku_omt_cursor_create(&curs2); assert(r==0);
r = toku_omt_fetch(o, 4, &v, curs2); assert(r==0);
r = toku_omt_cursor_create(&curs3); assert(r==0);
r = toku_omt_fetch(o, 9, &v, curs3); assert(r==0);
r = toku_omt_cursor_next(curs, &v); assert(r==0 && V(v)->x==6);
r = toku_omt_cursor_prev(curs2, &v); assert(r==0 && V(v)->x==3);
r = toku_omt_cursor_next(curs3, &v); assert(r!=0 && !toku_omt_cursor_is_valid(curs3));
toku_omt_cursor_destroy(&curs);
r = toku_omt_cursor_prev(curs2, &v); assert(r==0 && V(v)->x==2);
r = toku_omt_cursor_prev(curs2, &v); assert(r==0 && V(v)->x==1);
r = toku_omt_fetch(o, 1, &v, curs3); assert(r==0);
r = toku_omt_cursor_prev(curs3, &v); assert(r==0 && V(v)->x==0);
r = toku_omt_cursor_prev(curs3, &v); assert(r!=0 && !toku_omt_cursor_is_valid(curs3));
toku_omt_cursor_destroy(&curs2);
toku_omt_destroy(&o);
toku_omt_cursor_destroy(&curs3);
// ticket 1622, invalidate recursion
r = toku_omt_create_from_sorted_array(&o, ps, 10); assert(r==0);
r = toku_omt_cursor_create(&curs); assert(r==0);
toku_omt_cursor_set_invalidate_callback(curs, callback, 0);
r = toku_omt_fetch(o, 9, &v, curs); assert(r==0);
r = toku_omt_cursor_next(curs, &v);
toku_omt_destroy(&o);
toku_omt_cursor_destroy(&curs);
}
int
test_main (int argc __attribute__((__unused__)), const char *argv[] __attribute__((__unused__))) {
test();
return 0;
}

View file

@ -264,94 +264,30 @@ test_create_from_sorted_array_size (enum create_type create_choice, enum close_w
static void static void
test_fetch_verify (OMT omtree, TESTVALUE* val, u_int32_t len ) { test_fetch_verify (OMT omtree, TESTVALUE* val, u_int32_t len ) {
u_int32_t i; u_int32_t i;
int j;
int r; int r;
TESTVALUE v = (TESTVALUE)&i; TESTVALUE v = (TESTVALUE)&i;
TESTVALUE oldv = v; TESTVALUE oldv = v;
OMTCURSOR c;
r = toku_omt_cursor_create(&c);
CKERR(r);
assert(len == toku_omt_size(omtree)); assert(len == toku_omt_size(omtree));
for (i = 0; i < len; i++) { for (i = 0; i < len; i++) {
assert(oldv!=val[i]); assert(oldv!=val[i]);
v = NULL; v = NULL;
r = toku_omt_fetch(omtree, i, &v, NULL); r = toku_omt_fetch(omtree, i, &v);
CKERR(r); CKERR(r);
assert(v != NULL); assert(v != NULL);
assert(v != oldv); assert(v != oldv);
assert(v == val[i]); assert(v == val[i]);
assert(V(v)->number == V(val[i])->number); assert(V(v)->number == V(val[i])->number);
v = oldv; v = oldv;
r = toku_omt_fetch(omtree, i, &v, c);
CKERR(r);
assert(v != NULL);
assert(v != oldv);
assert(v == val[i]);
assert(V(v)->number == V(val[i])->number);
assert(toku_omt_cursor_is_valid(c));
v = oldv;
r = toku_omt_cursor_current(c, &v);
CKERR(r);
assert(v != NULL);
assert(v != oldv);
assert(v == val[i]);
assert(V(v)->number == V(val[i])->number);
assert(toku_omt_cursor_is_valid(c));
v = oldv;
j = i + 1;
while ((r = toku_omt_cursor_next(c, &v)) == 0) {
assert(toku_omt_cursor_is_valid(c));
assert(v != NULL);
assert(v != oldv);
assert(v == val[j]);
assert(V(v)->number == V(val[j])->number);
j++;
v = oldv;
}
CKERR2(r, EINVAL);
assert(j == (int) len);
assert(oldv!=val[i]);
v = NULL;
r = toku_omt_fetch(omtree, i, &v, c);
CKERR(r);
assert(v != NULL);
assert(v != oldv);
assert(v == val[i]);
assert(V(v)->number == V(val[i])->number);
v = oldv;
j = i - 1;
while ((r = toku_omt_cursor_prev(c, &v)) == 0) {
assert(toku_omt_cursor_is_valid(c));
assert(v != NULL);
assert(v != oldv);
assert(v == val[j]);
assert(V(v)->number == V(val[j])->number);
j--;
v = oldv;
}
CKERR2(r, EINVAL);
assert(j == -1);
} }
for (i = len; i < len*2; i++) { for (i = len; i < len*2; i++) {
v = oldv; v = oldv;
r = toku_omt_fetch(omtree, i, &v, NULL); r = toku_omt_fetch(omtree, i, &v);
CKERR2(r, EINVAL); CKERR2(r, EINVAL);
assert(v == oldv); assert(v == oldv);
v = NULL;
r = toku_omt_fetch(omtree, i, &v, c);
CKERR2(r, EINVAL);
assert(v == NULL);
} }
toku_omt_cursor_destroy(&c);
} }
static void static void
@ -700,69 +636,22 @@ heavy_extra (h_extra* extra, u_int32_t first_zero, u_int32_t first_pos) {
static void static void
test_find_dir (int dir, void* extra, int (*h)(OMTVALUE, void*), test_find_dir (int dir, void* extra, int (*h)(OMTVALUE, void*),
int r_expect, BOOL idx_will_change, u_int32_t idx_expect, int r_expect, BOOL idx_will_change, u_int32_t idx_expect,
u_int32_t number_expect, BOOL cursor_valid) { u_int32_t number_expect, BOOL UU(cursor_valid)) {
u_int32_t idx = UINT32_MAX; u_int32_t idx = UINT32_MAX;
u_int32_t old_idx = idx; u_int32_t old_idx = idx;
TESTVALUE omt_val, omt_val_curs; TESTVALUE omt_val;
OMTCURSOR c;
int r; int r;
BOOL found;
r = toku_omt_cursor_create(&c);
CKERR(r);
omt_val = NULL; omt_val = NULL;
if (dir == 0) {
r = toku_omt_find_zero(omt, h, extra, &omt_val, &idx, c);
}
else {
r = toku_omt_find( omt, h, extra, dir, &omt_val, &idx, c);
}
CKERR2(r, r_expect);
if (idx_will_change) {
assert(idx == idx_expect);
}
else {
assert(idx == old_idx);
}
if (r == DB_NOTFOUND) {
assert(omt_val == NULL);
found = FALSE;
}
else {
assert(V(omt_val)->number == number_expect);
found = TRUE;
}
assert(!cursor_valid == !toku_omt_cursor_is_valid(c));
if (cursor_valid) {
TESTVALUE tmp;
assert(idx_will_change);
omt_val_curs = NULL;
r = toku_omt_cursor_current(c, &omt_val_curs);
CKERR(r);
assert(toku_omt_cursor_is_valid(c));
r = toku_omt_fetch(omt, idx, &tmp, NULL);
CKERR(r);
if (found) assert(tmp==omt_val);
assert(omt_val_curs != NULL);
assert(omt_val_curs == tmp);
assert(V(omt_val_curs)->number == V(tmp)->number);
if (found) assert(V(omt_val_curs)->number==number_expect);
}
toku_omt_cursor_invalidate(c);
assert(!toku_omt_cursor_is_valid(c));
toku_omt_cursor_destroy(&c);
/* Verify we can pass NULL value. */ /* Verify we can pass NULL value. */
omt_val = NULL; omt_val = NULL;
idx = old_idx; idx = old_idx;
if (dir == 0) { if (dir == 0) {
r = toku_omt_find_zero(omt, h, extra, NULL, &idx, NULL); r = toku_omt_find_zero(omt, h, extra, NULL, &idx);
} }
else { else {
r = toku_omt_find( omt, h, extra, dir, NULL, &idx, NULL); r = toku_omt_find( omt, h, extra, dir, NULL, &idx);
} }
CKERR2(r, r_expect); CKERR2(r, r_expect);
if (idx_will_change) { if (idx_will_change) {
@ -777,10 +666,10 @@ test_find_dir (int dir, void* extra, int (*h)(OMTVALUE, void*),
omt_val = NULL; omt_val = NULL;
idx = old_idx; idx = old_idx;
if (dir == 0) { if (dir == 0) {
r = toku_omt_find_zero(omt, h, extra, &omt_val, 0, NULL); r = toku_omt_find_zero(omt, h, extra, &omt_val, 0);
} }
else { else {
r = toku_omt_find( omt, h, extra, dir, &omt_val, 0, NULL); r = toku_omt_find( omt, h, extra, dir, &omt_val, 0);
} }
CKERR2(r, r_expect); CKERR2(r, r_expect);
assert(idx == old_idx); assert(idx == old_idx);
@ -795,10 +684,10 @@ test_find_dir (int dir, void* extra, int (*h)(OMTVALUE, void*),
omt_val = NULL; omt_val = NULL;
idx = old_idx; idx = old_idx;
if (dir == 0) { if (dir == 0) {
r = toku_omt_find_zero(omt, h, extra, NULL, 0, NULL); r = toku_omt_find_zero(omt, h, extra, NULL, 0);
} }
else { else {
r = toku_omt_find( omt, h, extra, dir, NULL, 0, NULL); r = toku_omt_find( omt, h, extra, dir, NULL, 0);
} }
CKERR2(r, r_expect); CKERR2(r, r_expect);
assert(idx == old_idx); assert(idx == old_idx);
@ -879,100 +768,12 @@ test_find (enum create_type create_choice, enum close_when_done do_close) {
test_close(do_close); test_close(do_close);
} }
static void
invalidate_callback_null (OMTCURSOR c, void *extra) {
assert(c && !extra);
}
static void
invalidate_callback_inc (OMTCURSOR c, void *extra) {
assert(c);
int *num = extra;
(*num)++;
}
static void
test_invalidate (enum create_type create_choice, BOOL set_callback, BOOL invalidate_callback) {
init_identity_values(random_seed, 100);
test_create_from_sorted_array(create_choice, KEEP_WHEN_DONE);
OMTCURSOR c;
int invalidate_count = 0;
int r = toku_omt_cursor_create(&c);
if (set_callback || invalidate_callback) {
toku_omt_cursor_set_invalidate_callback(c, invalidate_callback_inc, &invalidate_count);
}
if (invalidate_callback) {
toku_omt_cursor_set_invalidate_callback(c, invalidate_callback_null, NULL);
}
OMTVALUE val;
r = toku_omt_fetch(omt, 0, &val, c); CKERR(r);
assert(toku_omt_cursor_is_valid(c));
assert(invalidate_count==0);
r = toku_omt_cursor_prev(c, &val); CKERR2(r, EINVAL);
assert(!toku_omt_cursor_is_valid(c));
if (set_callback && !invalidate_callback) assert(invalidate_count==1);
else assert(invalidate_count==0);
r = toku_omt_cursor_prev(c, &val); CKERR2(r, EINVAL);
assert(!toku_omt_cursor_is_valid(c));
if (set_callback && !invalidate_callback) assert(invalidate_count==1);
else assert(invalidate_count==0);
r = toku_omt_fetch(omt, toku_omt_size(omt)-1, &val, c); CKERR(r);
assert(toku_omt_cursor_is_valid(c));
if (set_callback && !invalidate_callback) assert(invalidate_count==1);
else assert(invalidate_count==0);
r = toku_omt_cursor_prev(c, &val); CKERR(r);
assert(toku_omt_cursor_is_valid(c));
if (set_callback && !invalidate_callback) assert(invalidate_count==1);
else assert(invalidate_count==0);
r = toku_omt_cursor_next(c, &val); CKERR(r);
assert(toku_omt_cursor_is_valid(c));
if (set_callback && !invalidate_callback) assert(invalidate_count==1);
else assert(invalidate_count==0);
r = toku_omt_cursor_next(c, &val); CKERR2(r, EINVAL);
assert(!toku_omt_cursor_is_valid(c));
if (set_callback && !invalidate_callback) assert(invalidate_count==2);
else assert(invalidate_count==0);
r = toku_omt_fetch(omt, toku_omt_size(omt)-1, &val, c); CKERR(r);
assert(toku_omt_cursor_is_valid(c));
if (set_callback && !invalidate_callback) assert(invalidate_count==2);
else assert(invalidate_count==0);
test_close(CLOSE_WHEN_DONE);
assert(!toku_omt_cursor_is_valid(c));
if (set_callback && !invalidate_callback) assert(invalidate_count==3);
else assert(invalidate_count==0);
init_identity_values(random_seed, 100);
test_create_from_sorted_array(create_choice, KEEP_WHEN_DONE);
r = toku_omt_fetch(omt, toku_omt_size(omt)-1, &val, c); CKERR(r);
assert(toku_omt_cursor_is_valid(c));
if (set_callback && !invalidate_callback) assert(invalidate_count==3);
else assert(invalidate_count==0);
toku_omt_cursor_destroy(&c);
if (set_callback && !invalidate_callback) assert(invalidate_count==4);
else assert(invalidate_count==0);
test_close(CLOSE_WHEN_DONE);
if (set_callback && !invalidate_callback) assert(invalidate_count==4);
else assert(invalidate_count==0);
}
static void static void
runtests_create_choice (enum create_type create_choice) { runtests_create_choice (enum create_type create_choice) {
test_create_array(create_choice, TEST_SORTED); test_create_array(create_choice, TEST_SORTED);
test_create_array(create_choice, TEST_RANDOM); test_create_array(create_choice, TEST_RANDOM);
test_create_array(create_choice, TEST_IDENTITY); test_create_array(create_choice, TEST_IDENTITY);
test_find( create_choice, CLOSE_WHEN_DONE); test_find( create_choice, CLOSE_WHEN_DONE);
test_invalidate( create_choice, FALSE, FALSE);
test_invalidate( create_choice, FALSE, TRUE);
test_invalidate( create_choice, TRUE, FALSE);
test_invalidate( create_choice, TRUE, TRUE);
} }
int int

View file

@ -114,7 +114,7 @@ live_list_reverse_note_txn_start_iter(OMTVALUE live_xidv, u_int32_t UU(index), v
int r; int r;
OMT reverse = txn->logger->live_list_reverse; OMT reverse = txn->logger->live_list_reverse;
r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, live_xid, &pairv, &idx, NULL); r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, live_xid, &pairv, &idx);
if (r==0) { if (r==0) {
pair = pairv; pair = pairv;
invariant(pair->xid1 == *live_xid); //sanity check invariant(pair->xid1 == *live_xid); //sanity check
@ -488,7 +488,7 @@ TXNID toku_get_oldest_in_live_root_txn_list(TOKUTXN txn) {
invariant(toku_omt_size(omt)>0); invariant(toku_omt_size(omt)>0);
OMTVALUE v; OMTVALUE v;
int r; int r;
r = toku_omt_fetch(omt, 0, &v, NULL); r = toku_omt_fetch(omt, 0, &v);
assert_zero(r); assert_zero(r);
TXNID *xidp = v; TXNID *xidp = v;
return *xidp; return *xidp;
@ -509,7 +509,7 @@ BOOL toku_is_txn_in_live_root_txn_list(TOKUTXN txn, TXNID xid) {
OMTVALUE txnidpv; OMTVALUE txnidpv;
uint32_t index; uint32_t index;
BOOL retval = FALSE; BOOL retval = FALSE;
int r = toku_omt_find_zero(omt, find_xidp, &xid, &txnidpv, &index, NULL); int r = toku_omt_find_zero(omt, find_xidp, &xid, &txnidpv, &index);
if (r==0) { if (r==0) {
TXNID *txnidp = txnidpv; TXNID *txnidp = txnidpv;
invariant(*txnidp == xid); invariant(*txnidp == xid);
@ -536,19 +536,19 @@ verify_snapshot_system(TOKULOGGER logger) {
//set up arrays for easier access //set up arrays for easier access
for (i = 0; i < num_snapshot_txnids; i++) { for (i = 0; i < num_snapshot_txnids; i++) {
OMTVALUE v; OMTVALUE v;
r = toku_omt_fetch(logger->snapshot_txnids, i, &v, NULL); r = toku_omt_fetch(logger->snapshot_txnids, i, &v);
assert_zero(r); assert_zero(r);
snapshot_txnids[i] = *(TXNID*)v; snapshot_txnids[i] = *(TXNID*)v;
} }
for (i = 0; i < num_live_txns; i++) { for (i = 0; i < num_live_txns; i++) {
OMTVALUE v; OMTVALUE v;
r = toku_omt_fetch(logger->live_txns, i, &v, NULL); r = toku_omt_fetch(logger->live_txns, i, &v);
assert_zero(r); assert_zero(r);
live_txns[i] = v; live_txns[i] = v;
} }
for (i = 0; i < num_live_list_reverse; i++) { for (i = 0; i < num_live_list_reverse; i++) {
OMTVALUE v; OMTVALUE v;
r = toku_omt_fetch(logger->live_list_reverse, i, &v, NULL); r = toku_omt_fetch(logger->live_list_reverse, i, &v);
assert_zero(r); assert_zero(r);
live_list_reverse[i] = v; live_list_reverse[i] = v;
} }
@ -566,7 +566,7 @@ verify_snapshot_system(TOKULOGGER logger) {
{ {
for (j = 0; j < num_live_root_txn_list; j++) { for (j = 0; j < num_live_root_txn_list; j++) {
OMTVALUE v; OMTVALUE v;
r = toku_omt_fetch(snapshot_txn->live_root_txn_list, j, &v, NULL); r = toku_omt_fetch(snapshot_txn->live_root_txn_list, j, &v);
assert_zero(r); assert_zero(r);
live_root_txn_list[j] = *(TXNID*)v; live_root_txn_list[j] = *(TXNID*)v;
} }
@ -595,7 +595,7 @@ verify_snapshot_system(TOKULOGGER logger) {
OMTVALUE v2; OMTVALUE v2;
r = toku_omt_find_zero(logger->snapshot_txnids, r = toku_omt_find_zero(logger->snapshot_txnids,
toku_find_xid_by_xid, toku_find_xid_by_xid,
&pair->xid2, &v2, &index, NULL); &pair->xid2, &v2, &index);
assert_zero(r); assert_zero(r);
} }
for (j = 0; j < num_live_txns; j++) { for (j = 0; j < num_live_txns; j++) {
@ -621,7 +621,7 @@ verify_snapshot_system(TOKULOGGER logger) {
OMTVALUE v2; OMTVALUE v2;
r = toku_omt_find_zero(logger->snapshot_txnids, r = toku_omt_find_zero(logger->snapshot_txnids,
toku_find_xid_by_xid, toku_find_xid_by_xid,
&txn->txnid64, &v2, &index, NULL); &txn->txnid64, &v2, &index);
invariant(r==0 || r==DB_NOTFOUND); invariant(r==0 || r==DB_NOTFOUND);
invariant((r==0) == (expect!=0)); invariant((r==0) == (expect!=0));
} }

View file

@ -137,7 +137,7 @@ get_next_older_txnid(TXNID xc, OMT omt) {
OMTVALUE v; OMTVALUE v;
uint32_t idx; uint32_t idx;
TXNID rval; TXNID rval;
r = toku_omt_find(omt, toku_find_pair_by_xid, &xc, -1, &v, &idx, NULL); r = toku_omt_find(omt, toku_find_pair_by_xid, &xc, -1, &v, &idx);
if (r==0) { if (r==0) {
xid = v; xid = v;
invariant(*xid < xc); //sanity check invariant(*xid < xc); //sanity check
@ -157,7 +157,7 @@ toku_get_youngest_live_list_txnid_for(TXNID xc, OMT live_list_reverse) {
uint32_t idx; uint32_t idx;
TXNID rval; TXNID rval;
int r; int r;
r = toku_omt_find_zero(live_list_reverse, toku_find_pair_by_xid, &xc, &pairv, &idx, NULL); r = toku_omt_find_zero(live_list_reverse, toku_find_pair_by_xid, &xc, &pairv, &idx);
if (r==0) { if (r==0) {
pair = pairv; pair = pairv;
invariant(pair->xid1 == xc); //sanity check invariant(pair->xid1 == xc); //sanity check
@ -226,7 +226,7 @@ garbage_collection(ULE ule, OMT snapshot_xids, OMT live_list_reverse) {
{ {
u_int32_t idx; u_int32_t idx;
OMTVALUE txnagain; OMTVALUE txnagain;
int r = toku_omt_find_zero(snapshot_xids, toku_find_xid_by_xid, &tl1, &txnagain, &idx, NULL); int r = toku_omt_find_zero(snapshot_xids, toku_find_xid_by_xid, &tl1, &txnagain, &idx);
invariant(r==0); //make sure that the txn you are claiming is live is actually live invariant(r==0); //make sure that the txn you are claiming is live is actually live
} }
// //

View file

@ -1710,7 +1710,7 @@ static int lt_do_escalation(toku_lock_tree* lt) {
OMTVALUE dbv; OMTVALUE dbv;
assert(toku_omt_size(lt->dbs) > 0); // there is at least one db associated with this locktree assert(toku_omt_size(lt->dbs) > 0); // there is at least one db associated with this locktree
r = toku_omt_fetch(lt->dbs, 0, &dbv, NULL); r = toku_omt_fetch(lt->dbs, 0, &dbv);
assert(r == 0); assert(r == 0);
db = dbv; db = dbv;
lt_set_comparison_functions(lt, db); lt_set_comparison_functions(lt, db);
@ -2143,7 +2143,7 @@ static void lt_add_db(toku_lock_tree* tree, DB *db) {
int r; int r;
OMTVALUE get_dbv = NULL; OMTVALUE get_dbv = NULL;
uint32_t index; uint32_t index;
r = toku_omt_find_zero(tree->dbs, find_db, db, &get_dbv, &index, NULL); r = toku_omt_find_zero(tree->dbs, find_db, db, &get_dbv, &index);
assert(r == DB_NOTFOUND); assert(r == DB_NOTFOUND);
r = toku_omt_insert_at(tree->dbs, db, index); r = toku_omt_insert_at(tree->dbs, db, index);
assert_zero(r); assert_zero(r);
@ -2155,7 +2155,7 @@ static void lt_remove_db(toku_lock_tree* tree, DB *db) {
int r; int r;
OMTVALUE get_dbv = NULL; OMTVALUE get_dbv = NULL;
uint32_t index; uint32_t index;
r = toku_omt_find_zero(tree->dbs, find_db, db, &get_dbv, &index, NULL); r = toku_omt_find_zero(tree->dbs, find_db, db, &get_dbv, &index);
assert_zero(r); assert_zero(r);
assert(db == get_dbv); assert(db == get_dbv);
r = toku_omt_delete_at(tree->dbs, index); r = toku_omt_delete_at(tree->dbs, index);

View file

@ -157,7 +157,7 @@ toku_rt_find(toku_range_tree* tree, toku_interval* query, u_int32_t k,
extra.end_cmp = tree->end_cmp; extra.end_cmp = tree->end_cmp;
extra.query = *query; extra.query = *query;
r = toku_omt_find_zero(tree->i.omt, rt_heaviside, &extra, NULL, &leftmost, NULL); r = toku_omt_find_zero(tree->i.omt, rt_heaviside, &extra, NULL, &leftmost);
if (r == DB_NOTFOUND) { if (r == DB_NOTFOUND) {
/* Nothing overlaps. */ /* Nothing overlaps. */
*numfound = 0; *numfound = 0;
@ -199,7 +199,7 @@ toku_rt_insert(toku_range_tree* tree, toku_range* range) {
extra.end_cmp = tree->end_cmp; extra.end_cmp = tree->end_cmp;
extra.query = range->ends; extra.query = range->ends;
r = toku_omt_find_zero(tree->i.omt, rt_heaviside, &extra, NULL, &index, NULL); r = toku_omt_find_zero(tree->i.omt, rt_heaviside, &extra, NULL, &index);
if (r == 0) { if (r == 0) {
r = EDOM; goto cleanup; r = EDOM; goto cleanup;
} }
@ -233,7 +233,7 @@ toku_rt_delete(toku_range_tree* tree, toku_range* range) {
extra.end_cmp = tree->end_cmp; extra.end_cmp = tree->end_cmp;
extra.query = range->ends; extra.query = range->ends;
r = toku_omt_find_zero(tree->i.omt, rt_heaviside, &extra, &value, &index, NULL); r = toku_omt_find_zero(tree->i.omt, rt_heaviside, &extra, &value, &index);
if (r != 0) { if (r != 0) {
r = EDOM; goto cleanup; r = EDOM; goto cleanup;
} }
@ -270,7 +270,7 @@ rt_neightbor(toku_range_tree* tree, toku_point* point,
extra.query.right = point; extra.query.right = point;
assert(direction==1 || direction==-1); assert(direction==1 || direction==-1);
r = toku_omt_find(tree->i.omt, rt_heaviside, &extra, direction, &value, &index, NULL); r = toku_omt_find(tree->i.omt, rt_heaviside, &extra, direction, &value, &index);
if (r == DB_NOTFOUND) { if (r == DB_NOTFOUND) {
*wasfound = FALSE; *wasfound = FALSE;
r = 0; r = 0;
@ -349,14 +349,14 @@ toku_rt_verify(toku_range_tree *tree) {
for (u_int32_t i = 0; i < tree->numelements; i++) { for (u_int32_t i = 0; i < tree->numelements; i++) {
// assert left <= right // assert left <= right
OMTVALUE omtv; OMTVALUE omtv;
r = toku_omt_fetch(tree->i.omt, i, &omtv, NULL); r = toku_omt_fetch(tree->i.omt, i, &omtv);
assert_zero(r); assert_zero(r);
toku_range *v = (toku_range *) omtv; toku_range *v = (toku_range *) omtv;
assert(tree->end_cmp(v->ends.left, v->ends.right) <= 0); assert(tree->end_cmp(v->ends.left, v->ends.right) <= 0);
// assert ranges are sorted // assert ranges are sorted
if (i < tree->numelements-1) { if (i < tree->numelements-1) {
OMTVALUE omtvnext; OMTVALUE omtvnext;
r = toku_omt_fetch(tree->i.omt, i+1, &omtvnext, NULL); r = toku_omt_fetch(tree->i.omt, i+1, &omtvnext);
assert_zero(r); assert_zero(r);
toku_range *vnext = (toku_range *) omtvnext; toku_range *vnext = (toku_range *) omtvnext;
assert(tree->end_cmp(v->ends.right, vnext->ends.left) < 0); assert(tree->end_cmp(v->ends.right, vnext->ends.left) < 0);
@ -365,11 +365,11 @@ toku_rt_verify(toku_range_tree *tree) {
// verify no overlaps // verify no overlaps
for (u_int32_t i = 1; i < tree->numelements; i++) { for (u_int32_t i = 1; i < tree->numelements; i++) {
OMTVALUE omtvprev; OMTVALUE omtvprev;
r = toku_omt_fetch(tree->i.omt, i-1, &omtvprev, NULL); r = toku_omt_fetch(tree->i.omt, i-1, &omtvprev);
assert_zero(r); assert_zero(r);
toku_range *vprev = (toku_range *) omtvprev; toku_range *vprev = (toku_range *) omtvprev;
OMTVALUE omtv; OMTVALUE omtv;
r = toku_omt_fetch(tree->i.omt, i, &omtv, NULL); r = toku_omt_fetch(tree->i.omt, i, &omtv);
assert_zero(r); assert_zero(r);
toku_range *v = (toku_range *) omtv; toku_range *v = (toku_range *) omtv;
assert(!toku__rt_overlap(tree, &vprev->ends, &v->ends)); assert(!toku__rt_overlap(tree, &vprev->ends, &v->ends));

153
src/tests/test_bulk_fetch.c Normal file
View file

@ -0,0 +1,153 @@
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#include "test.h"
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <memory.h>
#include <sys/stat.h>
#include <db.h>
static void
verify_val(DBT const *a, DBT const *b, void *c) {
assert(a->size == sizeof(int));
assert(b->size == sizeof(int));
int* expected = (int *)c;
assert(*expected == *(int *)a->data);
assert(*expected == *(int *)b->data);
}
static int
verify_fwd_fast(DBT const *a, DBT const *b, void *c) {
verify_val(a,b,c);
int* expected = (int *)c;
*expected = *expected + 1;
return TOKUDB_CURSOR_CONTINUE;
}
static int
verify_fwd_slow(DBT const *a, DBT const *b, void *c) {
verify_val(a,b,c);
int* expected = (int *)c;
*expected = *expected + 1;
return 0;
}
static int
verify_bwd_fast(DBT const *a, DBT const *b, void *c) {
verify_val(a,b,c);
int* expected = (int *)c;
*expected = *expected - 1;
return TOKUDB_CURSOR_CONTINUE;
}
static int
verify_bwd_slow(DBT const *a, DBT const *b, void *c) {
verify_val(a,b,c);
int* expected = (int *)c;
*expected = *expected - 1;
return 0;
}
static void
test_bulk_fetch (int n) {
if (verbose) printf("test_rand_insert:%d \n", n);
DB_TXN * const null_txn = 0;
const char * const fname = "test.bulk_fetch.brt";
int r;
r = system("rm -rf " ENVDIR);
CKERR(r);
r=toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0);
/* create the dup database file */
DB_ENV *env;
r = db_env_create(&env, 0); assert(r == 0);
r=env->set_default_bt_compare(env, int_dbt_cmp); CKERR(r);
r = env->open(env, ENVDIR, DB_CREATE+DB_PRIVATE+DB_INIT_MPOOL, 0); assert(r == 0);
DB *db;
r = db_create(&db, env, 0);
assert(r == 0);
r = db->set_flags(db, 0);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
int keys[n];
int i;
for (i=0; i<n; i++) {
keys[i] = i;
}
for (i=0; i<n; i++) {
DBT key, val;
r = db->put(db, null_txn, dbt_init(&key, &keys[i], sizeof keys[i]), dbt_init(&val, &i, sizeof i), 0);
assert(r == 0);
}
//
// data inserted, now verify that using TOKUDB_CURSOR_CONTINUE in the callback works
//
DBC* cursor;
// verify fast
r = db->cursor(db, NULL, &cursor, 0);
CKERR(r);
int expected = 0;
while (r != DB_NOTFOUND) {
r = cursor->c_getf_next(cursor, 0, verify_fwd_fast, &expected);
assert(r==0 || r==DB_NOTFOUND);
}
r = cursor->c_close(cursor); CKERR(r);
// verify slow
r = db->cursor(db, NULL, &cursor, 0);
CKERR(r);
expected = 0;
while (r != DB_NOTFOUND) {
r = cursor->c_getf_next(cursor, 0, verify_fwd_slow, &expected);
assert(r==0 || r==DB_NOTFOUND);
}
r = cursor->c_close(cursor); CKERR(r);
// now do backwards
r = db->cursor(db, NULL, &cursor, 0);
CKERR(r);
expected = n-1;
while (r != DB_NOTFOUND) {
r = cursor->c_getf_prev(cursor, 0, verify_bwd_fast, &expected);
assert(r==0 || r==DB_NOTFOUND);
}
r = cursor->c_close(cursor); CKERR(r);
// verify slow
r = db->cursor(db, NULL, &cursor, 0);
CKERR(r);
expected = n-1;
while (r != DB_NOTFOUND) {
r = cursor->c_getf_prev(cursor, 0, verify_bwd_slow, &expected);
assert(r==0 || r==DB_NOTFOUND);
}
r = cursor->c_close(cursor); CKERR(r);
r = db->close(db, 0); CKERR(r);
r = env->close(env, 0); CKERR(r);
}
int
test_main(int argc, char *const argv[]) {
parse_args(argc, argv);
test_bulk_fetch(10000);
return 0;
}

View file

@ -2819,7 +2819,7 @@ env_note_db_opened(DB_ENV *env, DB *db) {
OMTVALUE dbv; OMTVALUE dbv;
uint32_t idx; uint32_t idx;
env->i->num_open_dbs++; env->i->num_open_dbs++;
r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx, NULL); r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx);
assert(r==DB_NOTFOUND); //Must not already be there. assert(r==DB_NOTFOUND); //Must not already be there.
r = toku_omt_insert_at(env->i->open_dbs, db, idx); r = toku_omt_insert_at(env->i->open_dbs, db, idx);
assert(r==0); assert(r==0);
@ -2834,7 +2834,7 @@ env_note_db_closed(DB_ENV *env, DB *db) {
OMTVALUE dbv; OMTVALUE dbv;
uint32_t idx; uint32_t idx;
env->i->num_open_dbs--; env->i->num_open_dbs--;
r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx, NULL); r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx);
assert(r==0); //Must already be there. assert(r==0); //Must already be there.
assert((DB*)dbv == db); assert((DB*)dbv == db);
r = toku_omt_delete_at(env->i->open_dbs, idx); r = toku_omt_delete_at(env->i->open_dbs, idx);
@ -2850,7 +2850,7 @@ env_note_zombie_db(DB_ENV *env, DB *db) {
OMTVALUE dbv; OMTVALUE dbv;
uint32_t idx; uint32_t idx;
env->i->num_zombie_dbs++; env->i->num_zombie_dbs++;
r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx, NULL); r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx);
assert(r==DB_NOTFOUND); //Must not already be there. assert(r==DB_NOTFOUND); //Must not already be there.
r = toku_omt_insert_at(env->i->open_dbs, db, idx); r = toku_omt_insert_at(env->i->open_dbs, db, idx);
assert(r==0); assert(r==0);
@ -2865,7 +2865,7 @@ env_note_zombie_db_closed(DB_ENV *env, DB *db) {
OMTVALUE dbv; OMTVALUE dbv;
uint32_t idx; uint32_t idx;
env->i->num_zombie_dbs--; env->i->num_zombie_dbs--;
r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx, NULL); r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx);
assert(r==0); //Must already be there. assert(r==0); //Must already be there.
assert((DB*)dbv == db); assert((DB*)dbv == db);
r = toku_omt_delete_at(env->i->open_dbs, idx); r = toku_omt_delete_at(env->i->open_dbs, idx);
@ -2907,7 +2907,7 @@ env_is_db_with_dname_open(DB_ENV *env, const char *dname) {
BOOL rval; BOOL rval;
OMTVALUE dbv; OMTVALUE dbv;
uint32_t idx; uint32_t idx;
r = toku_omt_find_zero(env->i->open_dbs, find_open_db_by_dname, (void*)dname, &dbv, &idx, NULL); r = toku_omt_find_zero(env->i->open_dbs, find_open_db_by_dname, (void*)dname, &dbv, &idx);
if (r==0) { if (r==0) {
DB *db = dbv; DB *db = dbv;
assert(strcmp(dname, db->i->dname) == 0); assert(strcmp(dname, db->i->dname) == 0);
@ -2928,7 +2928,7 @@ env_get_zombie_db_with_dname(DB_ENV *env, const char *dname) {
DB* rval; DB* rval;
OMTVALUE dbv; OMTVALUE dbv;
uint32_t idx; uint32_t idx;
r = toku_omt_find_zero(env->i->open_dbs, find_zombie_db_by_dname, (void*)dname, &dbv, &idx, NULL); r = toku_omt_find_zero(env->i->open_dbs, find_zombie_db_by_dname, (void*)dname, &dbv, &idx);
if (r==0) { if (r==0) {
DB *db = dbv; DB *db = dbv;
assert(db); assert(db);
@ -3461,7 +3461,7 @@ c_getf_first_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val,
//Call application-layer callback if found and locks were successfully obtained. //Call application-layer callback if found and locks were successfully obtained.
if (r==0 && key!=NULL) { if (r==0 && key!=NULL) {
context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra); context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra);
if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR; r = context->r_user_callback;
} }
//Give brt-layer an error (if any) to return from toku_brt_cursor_first //Give brt-layer an error (if any) to return from toku_brt_cursor_first
@ -3512,7 +3512,7 @@ c_getf_last_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, v
//Call application-layer callback if found and locks were successfully obtained. //Call application-layer callback if found and locks were successfully obtained.
if (r==0 && key!=NULL) { if (r==0 && key!=NULL) {
context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra); context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra);
if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR; r = context->r_user_callback;
} }
//Give brt-layer an error (if any) to return from toku_brt_cursor_last //Give brt-layer an error (if any) to return from toku_brt_cursor_last
@ -3567,7 +3567,7 @@ c_getf_next_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, v
//Call application-layer callback if found and locks were successfully obtained. //Call application-layer callback if found and locks were successfully obtained.
if (r==0 && key!=NULL) { if (r==0 && key!=NULL) {
context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra); context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra);
if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR; r = context->r_user_callback;
} }
//Give brt-layer an error (if any) to return from toku_brt_cursor_next //Give brt-layer an error (if any) to return from toku_brt_cursor_next
@ -3622,7 +3622,7 @@ c_getf_prev_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, v
//Call application-layer callback if found and locks were successfully obtained. //Call application-layer callback if found and locks were successfully obtained.
if (r==0 && key!=NULL) { if (r==0 && key!=NULL) {
context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra); context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra);
if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR; r = context->r_user_callback;
} }
//Give brt-layer an error (if any) to return from toku_brt_cursor_prev //Give brt-layer an error (if any) to return from toku_brt_cursor_prev
@ -3660,7 +3660,7 @@ c_getf_current_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val
//Call application-layer callback if found. //Call application-layer callback if found.
if (key!=NULL) { if (key!=NULL) {
context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra); context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra);
if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR; r = context->r_user_callback;
} }
//Give brt-layer an error (if any) to return from toku_brt_cursor_current //Give brt-layer an error (if any) to return from toku_brt_cursor_current
@ -3724,7 +3724,7 @@ c_getf_set_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, vo
//Call application-layer callback if found and locks were successfully obtained. //Call application-layer callback if found and locks were successfully obtained.
if (r==0 && key!=NULL) { if (r==0 && key!=NULL) {
context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra); context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra);
if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR; r = context->r_user_callback;
} }
//Give brt-layer an error (if any) to return from toku_brt_cursor_set //Give brt-layer an error (if any) to return from toku_brt_cursor_set
@ -3779,7 +3779,7 @@ c_getf_set_range_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec v
//Call application-layer callback if found and locks were successfully obtained. //Call application-layer callback if found and locks were successfully obtained.
if (r==0 && key!=NULL) { if (r==0 && key!=NULL) {
context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra); context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra);
if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR; r = context->r_user_callback;
} }
//Give brt-layer an error (if any) to return from toku_brt_cursor_set_range //Give brt-layer an error (if any) to return from toku_brt_cursor_set_range
@ -3835,7 +3835,7 @@ c_getf_set_range_reverse_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, b
//Call application-layer callback if found and locks were successfully obtained. //Call application-layer callback if found and locks were successfully obtained.
if (r==0 && key!=NULL) { if (r==0 && key!=NULL) {
context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra); context->r_user_callback = super_context->f(&found_key, &found_val, context->f_extra);
if (context->r_user_callback) r = TOKUDB_USER_CALLBACK_ERROR; r = context->r_user_callback;
} }
//Give brt-layer an error (if any) to return from toku_brt_cursor_set_range_reverse //Give brt-layer an error (if any) to return from toku_brt_cursor_set_range_reverse