diff --git a/buildheader/db.h_4_1 b/buildheader/db.h_4_1 index e83617161fc..19de1cc9b6c 100644 --- a/buildheader/db.h_4_1 +++ b/buildheader/db.h_4_1 @@ -251,7 +251,8 @@ struct __toku_dbc { int (*c_getf_current)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *); int (*c_getf_first)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *); int (*c_getf_last)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *); - void* __toku_dummy0[10]; + int (*c_getf_heavi)(DBC *, u_int32_t, void(*f)(DBT const *, DBT const *, void *, int), void *extra_f, int (*h)(const DBT *key, const DBT *value, void *extra_h), void *extra_h, int direction); + void* __toku_dummy0[9]; char __toku_dummy1[104]; int (*c_close) (DBC *); /* 32-bit offset=188 size=4, 64=bit offset=272 size=8 */ int (*c_count) (DBC *, db_recno_t *, u_int32_t); /* 32-bit offset=192 size=4, 64=bit offset=280 size=8 */ diff --git a/buildheader/db.h_4_3 b/buildheader/db.h_4_3 index 661384971b8..5fb3822264a 100644 --- a/buildheader/db.h_4_3 +++ b/buildheader/db.h_4_3 @@ -267,7 +267,8 @@ struct __toku_dbc { int (*c_getf_current)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *); int (*c_getf_first)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *); int (*c_getf_last)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *); - void* __toku_dummy0[8]; + int (*c_getf_heavi)(DBC *, u_int32_t, void(*f)(DBT const *, DBT const *, void *, int), void *extra_f, int (*h)(const DBT *key, const DBT *value, void *extra_h), void *extra_h, int direction); + void* __toku_dummy0[7]; char __toku_dummy1[112]; int (*c_close) (DBC *); /* 32-bit offset=188 size=4, 64=bit offset=264 size=8 */ int (*c_count) (DBC *, db_recno_t *, u_int32_t); /* 32-bit offset=192 size=4, 64=bit offset=272 size=8 */ diff --git a/buildheader/db.h_4_4 b/buildheader/db.h_4_4 index b3d9475337a..42798ed7402 100644 --- a/buildheader/db.h_4_4 +++ b/buildheader/db.h_4_4 @@ -272,7 +272,8 @@ struct __toku_dbc { int (*c_getf_current)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *); int (*c_getf_first)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *); int (*c_getf_last)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *); - void* __toku_dummy0[10]; + int (*c_getf_heavi)(DBC *, u_int32_t, void(*f)(DBT const *, DBT const *, void *, int), void *extra_f, int (*h)(const DBT *key, const DBT *value, void *extra_h), void *extra_h, int direction); + void* __toku_dummy0[9]; char __toku_dummy1[104]; int (*c_close) (DBC *); /* 32-bit offset=188 size=4, 64=bit offset=272 size=8 */ int (*c_count) (DBC *, db_recno_t *, u_int32_t); /* 32-bit offset=192 size=4, 64=bit offset=280 size=8 */ diff --git a/buildheader/db.h_4_5 b/buildheader/db.h_4_5 index 9efad2a477b..ae3ebd05fc1 100644 --- a/buildheader/db.h_4_5 +++ b/buildheader/db.h_4_5 @@ -271,7 +271,8 @@ struct __toku_dbc { int (*c_getf_current)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *); int (*c_getf_first)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *); int (*c_getf_last)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *); - void* __toku_dummy0[14]; + int (*c_getf_heavi)(DBC *, u_int32_t, void(*f)(DBT const *, DBT const *, void *, int), void *extra_f, int (*h)(const DBT *key, const DBT *value, void *extra_h), void *extra_h, int direction); + void* __toku_dummy0[13]; char __toku_dummy1[104]; int (*c_close) (DBC *); /* 32-bit offset=204 size=4, 64=bit offset=304 size=8 */ int (*c_count) (DBC *, db_recno_t *, u_int32_t); /* 32-bit offset=208 size=4, 64=bit offset=312 size=8 */ diff --git a/buildheader/db.h_4_6 b/buildheader/db.h_4_6 index c656e08ee66..407625c5454 100644 --- a/buildheader/db.h_4_6 +++ b/buildheader/db.h_4_6 @@ -11,9 +11,9 @@ extern "C" { #define TOKUDB 1 #define DB_VERSION_MAJOR 4 #define DB_VERSION_MINOR 6 -#define DB_VERSION_PATCH 21 +#define DB_VERSION_PATCH 19 #ifndef _TOKUDB_WRAP_H -#define DB_VERSION_STRING "Tokutek: TokuDB 4.6.21" +#define DB_VERSION_STRING "Tokutek: TokuDB 4.6.19" #else #define DB_VERSION_STRING_ydb "Tokutek: TokuDB (wrapped bdb)" #endif @@ -276,7 +276,8 @@ struct __toku_dbc { int (*c_getf_current)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *); int (*c_getf_first)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *); int (*c_getf_last)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *); - void* __toku_dummy0[24]; + int (*c_getf_heavi)(DBC *, u_int32_t, void(*f)(DBT const *, DBT const *, void *, int), void *extra_f, int (*h)(const DBT *key, const DBT *value, void *extra_h), void *extra_h, int direction); + void* __toku_dummy0[23]; char __toku_dummy1[104]; int (*c_close) (DBC *); /* 32-bit offset=244 size=4, 64=bit offset=384 size=8 */ int (*c_count) (DBC *, db_recno_t *, u_int32_t); /* 32-bit offset=248 size=4, 64=bit offset=392 size=8 */ diff --git a/buildheader/make_db_h.c b/buildheader/make_db_h.c index 1e0750c07de..aebd4c633ea 100644 --- a/buildheader/make_db_h.c +++ b/buildheader/make_db_h.c @@ -330,6 +330,9 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un "int (*c_getf_current)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *)", "int (*c_getf_first)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *)", "int (*c_getf_last)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *)", + "int (*c_getf_heavi)(DBC *, u_int32_t, " + "void(*f)(DBT const *, DBT const *, void *, int), void *extra_f, " + "int (*h)(const DBT *key, const DBT *value, void *extra_h), void *extra_h, int direction)", NULL}; assert(sizeof(dbc_fields32)==sizeof(dbc_fields64)); print_struct("dbc", 1, dbc_fields32, dbc_fields64, sizeof(dbc_fields32)/sizeof(dbc_fields32[0]), extra); diff --git a/include/db.h b/include/db.h index b3d9475337a..42798ed7402 100644 --- a/include/db.h +++ b/include/db.h @@ -272,7 +272,8 @@ struct __toku_dbc { int (*c_getf_current)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *); int (*c_getf_first)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *); int (*c_getf_last)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *); - void* __toku_dummy0[10]; + int (*c_getf_heavi)(DBC *, u_int32_t, void(*f)(DBT const *, DBT const *, void *, int), void *extra_f, int (*h)(const DBT *key, const DBT *value, void *extra_h), void *extra_h, int direction); + void* __toku_dummy0[9]; char __toku_dummy1[104]; int (*c_close) (DBC *); /* 32-bit offset=188 size=4, 64=bit offset=272 size=8 */ int (*c_count) (DBC *, db_recno_t *, u_int32_t); /* 32-bit offset=192 size=4, 64=bit offset=280 size=8 */ diff --git a/newbrt/brt.c b/newbrt/brt.c index b4c45358150..0a9592ef6bf 100644 --- a/newbrt/brt.c +++ b/newbrt/brt.c @@ -3322,6 +3322,64 @@ get_next:; return -1; } +int toku_brt_cursor_peek_prev(BRT_CURSOR cursor, DBT *outkey, DBT *outval) { + if (toku_omt_cursor_is_valid(cursor->omtcursor)) { + { + int rr = toku_read_and_pin_brt_header(cursor->brt->cf, &cursor->brt->h); + if (rr!=0) return rr; + uint64_t h_counter = cursor->brt->h->root_put_counter; + rr = toku_unpin_brt_header(cursor->brt); + assert(rr==0); + if (h_counter != cursor->root_put_counter) return -1; + } + OMTVALUE le; + u_int32_t index = 0; + int r = toku_omt_cursor_current_index(cursor->omtcursor, &index); + assert(r==0); + OMT omt = toku_omt_cursor_get_omt(cursor->omtcursor); +get_prev:; + if (index>0) { + r = toku_omt_fetch(omt, --index, &le, NULL); + if (r==0) { + if (le_is_provdel(le)) goto get_prev; + toku_fill_dbt(outkey, le_latest_key(le), le_latest_keylen(le)); + toku_fill_dbt(outval, le_latest_val(le), le_latest_vallen(le)); + return 0; + } + } + } + return -1; +} + +int toku_brt_cursor_peek_next(BRT_CURSOR cursor, DBT *outkey, DBT *outval) { + if (toku_omt_cursor_is_valid(cursor->omtcursor)) { + { + int rr = toku_read_and_pin_brt_header(cursor->brt->cf, &cursor->brt->h); + if (rr!=0) return rr; + uint64_t h_counter = cursor->brt->h->root_put_counter; + rr = toku_unpin_brt_header(cursor->brt); + assert(rr==0); + if (h_counter != cursor->root_put_counter) return -1; + } + OMTVALUE le; + u_int32_t index = UINT32_MAX; + int r = toku_omt_cursor_current_index(cursor->omtcursor, &index); + assert(r==0); + OMT omt = toku_omt_cursor_get_omt(cursor->omtcursor); +get_next:; + if (++indexbrt->flags & TOKU_DB_DUP) && brt_cursor_next_shortcut(cursor, outkey, outval)==0) @@ -3330,6 +3388,12 @@ static int brt_cursor_next(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGG return brt_cursor_search(cursor, &search, outkey, outval, logger); } +int toku_brt_cursor_after(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKUTXN txn) { + TOKULOGGER logger = toku_txn_logger(txn); + brt_search_t search; brt_search_init(&search, brt_cursor_compare_next, BRT_SEARCH_LEFT, key, val, cursor->brt); + return brt_cursor_search(cursor, &search, outkey, outval, logger); +} + static int brt_cursor_compare_next_nodup(brt_search_t *search, DBT *x, DBT *y) { BRT brt = search->context; y = y; return compare_k_x(brt, search->k, x) < 0; /* return min x: k < x */ @@ -3413,6 +3477,12 @@ get_prev:; return -1; } +int toku_brt_cursor_before(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKUTXN txn) { + TOKULOGGER logger = toku_txn_logger(txn); + brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev, BRT_SEARCH_RIGHT, key, val, cursor->brt); + return brt_cursor_search(cursor, &search, outkey, outval, logger); +} + static int brt_cursor_prev(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) { if (0!=(cursor->brt->flags & TOKU_DB_DUP) && brt_cursor_prev_shortcut(cursor, outkey, outval)==0) @@ -3543,6 +3613,34 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *key, DBT *val, int get_flags, T return r; } +static int brt_cursor_compare_heavi(brt_search_t *search, DBT *x, DBT *y) { + HEAVI_WRAPPER wrapper = search->context; + int r = wrapper->h(x, y, wrapper->extra_h); + // wrapper->r_h must have the same signus as the final chosen element. + // it is initialized to -1 or 1. 0's are closer to the min (max) that we + // want so once we hit 0 we keep it. + if (r==0) wrapper->r_h = 0; + return (search->direction&BRT_SEARCH_LEFT) ? r>=0 : r<=0; +} + +//We pass in toku_dbt_fake to the search functions, since it will not pass the +//key(or val) to the heaviside function if key(or val) is NULL. +//It is not used for anything else, +//the actual 'extra' information for the heaviside function is inside the +//wrapper. +static const DBT __toku_dbt_fake; +static const DBT* const toku_dbt_fake = &__toku_dbt_fake; + +int toku_brt_cursor_get_heavi (BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKUTXN txn, int direction, HEAVI_WRAPPER wrapper) { + TOKULOGGER logger = toku_txn_logger(txn); + brt_search_t search; brt_search_init(&search, brt_cursor_compare_heavi, + direction < 0 ? BRT_SEARCH_RIGHT : BRT_SEARCH_LEFT, + (DBT*)toku_dbt_fake, + cursor->brt->flags & TOKU_DB_DUPSORT ? (DBT*)toku_dbt_fake : NULL, + wrapper); + return brt_cursor_search(cursor, &search, outkey, outval, logger); +} + static void toku_brt_keyrange_internal (BRT brt, CACHEKEY nodename, u_int32_t fullhash, DBT *key, u_int64_t *less, u_int64_t *equal, u_int64_t *greater) { BRTNODE node; { diff --git a/newbrt/brt.h b/newbrt/brt.h index 70147b07895..795421694d9 100644 --- a/newbrt/brt.h +++ b/newbrt/brt.h @@ -49,6 +49,17 @@ int toku_verify_brt (BRT brt); typedef struct brt_cursor *BRT_CURSOR; int toku_brt_cursor (BRT, BRT_CURSOR*, int is_temporary_cursor); int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int brtc_flags, TOKUTXN); +struct heavi_wrapper { + int (*h)(const DBT *key, const DBT *value, void *extra_h); + void *extra_h; + int r_h; +}; +typedef struct heavi_wrapper *HEAVI_WRAPPER; +int toku_brt_cursor_get_heavi (BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKUTXN txn, int direction, HEAVI_WRAPPER wrapper); +int toku_brt_cursor_peek_prev(BRT_CURSOR cursor, DBT *outkey, DBT *outval); +int toku_brt_cursor_peek_next(BRT_CURSOR cursor, DBT *outkey, DBT *outval); +int toku_brt_cursor_before(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKUTXN txn); +int toku_brt_cursor_after(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKUTXN txn); int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags, TOKUTXN); int toku_brt_cursor_close (BRT_CURSOR curs); BOOL toku_brt_cursor_uninitialized(BRT_CURSOR c); diff --git a/newbrt/omt.c b/newbrt/omt.c index a1f74548a18..464c5e2e1ef 100644 --- a/newbrt/omt.c +++ b/newbrt/omt.c @@ -80,6 +80,10 @@ int toku_omt_cursor_create (OMTCURSOR *omtcp) { return 0; } +OMT toku_omt_cursor_get_omt(OMTCURSOR c) { + return c->omt; +} + void toku_omt_cursor_invalidate (OMTCURSOR c) { if (c==NULL || c->omt==NULL) return; if (c->next == c) { @@ -625,6 +629,12 @@ int toku_omt_cursor_current (OMTCURSOR c, OMTVALUE *v) { return r; } +int toku_omt_cursor_current_index(OMTCURSOR c, u_int32_t *index) { + if (c->omt == NULL) return EINVAL; + *index = c->index; + return 0; +} + //TODO: Put all omt API functions here. int toku_omt_create (OMT *omtp) { return omt_create_internal(omtp, 2); diff --git a/newbrt/omt.h b/newbrt/omt.h index 007d70462e1..053d8a469d9 100644 --- a/newbrt/omt.h +++ b/newbrt/omt.h @@ -417,6 +417,19 @@ int toku_omt_cursor_next (OMTCURSOR c, OMTVALUE *v); // Performance: time=O(log N) worst case, expected time=O(1) for a randomly // chosen initial position. +int toku_omt_cursor_current_index(OMTCURSOR c, u_int32_t *index); +// Effect: Stores c's offset in *index. +// Requires: index != NULL +// Returns +// 0 success +// EINVAL c is invalid +// On nonzero return, *index is unchanged and c is unchanged. +// Performance: time=O(1) + +OMT toku_omt_cursor_get_omt(OMTCURSOR c); +// Effect: returns the associated omt or NULL if not associated. +// Performance: time=O(1) + int toku_omt_cursor_current (OMTCURSOR c, OMTVALUE *v); // Effect: Store in v the value pointed by c's abstract offset // Requires: v != NULL diff --git a/src/tests/test_db_txn_locks.c b/src/tests/test_db_txn_locks.c index 7613cb5e7c9..47b34c582d9 100644 --- a/src/tests/test_db_txn_locks.c +++ b/src/tests/test_db_txn_locks.c @@ -9,6 +9,37 @@ #include "test.h" +struct heavi_extra { + DBT key; + DBT val; + DB* db; +}; + +int heavi_after(const DBT *key, const DBT *val, void *extra) { + //Assumes cmp is int_dbt_cmp + struct heavi_extra *info = extra; + int cmp = int_dbt_cmp(info->db, key, &info->key); + if (cmp!=0) return cmp; + if (!val) return -1; + cmp = int_dbt_cmp(info->db, val, &info->val); + return cmp<=0 ? -1 : 0; + //Returns <0 for too small/equal + //Returns 0 for greater, but with the same key + //Returns >0 for greater with different key +} + +int heavi_before(const DBT *key, const DBT *val, void *extra) { + struct heavi_extra *info = extra; + int cmp = int_dbt_cmp(info->db, key, &info->key); + if (cmp!=0) return cmp; + if (!val) return +1; + cmp = int_dbt_cmp(info->db, val, &info->val); + return cmp>=0 ? 1 : 0; + //Returns >0 for too large/equal + //Returns 0 for smaller with same key + //returns -1 for smaller with different key +} + // ENVDIR is defined in the Makefile int dbtcmp(DBT *dbt1, DBT *dbt2) { @@ -593,6 +624,162 @@ void test_current(u_int32_t dup_flags) { close_dbs(); } +struct dbt_pair { + DBT key; + DBT val; +}; + +struct int_pair { + int key; + int val; +}; + +int got_r_h; + +void f_heavi(DBT const *key, DBT const *val, void *extra_f, int r_h) { + struct int_pair *info = extra_f; + + if (r_h==0) got_r_h = 0; + assert(key->size == 4); + assert(val->size == 4); + + info->key = *(int*)key->data; + info->val = *(int*)val->data; +} + +void cget_heavi(BOOL success, BOOL find, char txn, int _key, int _val, + int _key_expect, int _val_expect, int direction, + int r_h_expect, + int (*h)(const DBT*,const DBT*,void*)) { +#if defined(USE_BDB) + return; +#else + assert(txns[(int)txn] && cursors[(int)txn]); + + int r; + struct heavi_extra input; + struct int_pair output; + dbt_init(&input.key, &_key, sizeof(int)); + dbt_init(&input.val, &_val, sizeof(int)); + input.db = db; + output.key = 0; + output.val = 0; + + got_r_h = direction; + + r = cursors[(int)txn]->c_getf_heavi(cursors[(int)txn], 0, //No prelocking + f_heavi, &output, + h, &input, direction); + if (!success) { + CKERR2s(r, DB_LOCK_DEADLOCK, DB_LOCK_NOTGRANTED); + return; + } + if (!find) { + CKERR2s(r, DB_NOTFOUND, DB_KEYEMPTY); + return; + } + CKERR(r); + assert(got_r_h == r_h_expect); + assert(output.key == _key_expect); + assert(output.val == _val_expect); +#endif +} + + +void test_heavi(u_int32_t dup_flags) { + /* ********************************************************************** */ + setup_dbs(dup_flags); + cget_heavi(TRUE, FALSE, 'a', 0, 0, 0, 0, 1, 0, heavi_after); + cget_heavi(TRUE, FALSE, 'a', 0, 0, 0, 0, -1, 0, heavi_before); + close_dbs(); + /* ********************************************************************** */ + //Not found locks left to right (with empty db == entire db) + setup_dbs(dup_flags); + cget_heavi(TRUE, FALSE, 'a', 0, 0, 0, 0, 1, 0, heavi_after); + put(FALSE, 'b', 7, 6); + put(FALSE, 'b', -1, -1); + put(TRUE, 'a', 4, 4); + early_commit('a'); + put(TRUE, 'b', 7, 6); + put(TRUE, 'b', -1, -1); + close_dbs(); + /* ********************************************************************** */ + //Not found locks left to right (with empty db == entire db) + setup_dbs(dup_flags); + cget_heavi(TRUE, FALSE, 'a', 0, 0, 0, 0, -1, 0, heavi_before); + put(FALSE, 'b', 7, 6); + put(FALSE, 'b', -1, -1); + put(TRUE, 'a', 4, 4); + early_commit('a'); + put(TRUE, 'b', 7, 6); + put(TRUE, 'b', -1, -1); + close_dbs(); + /* ********************************************************************** */ + //Duplicate mode behaves differently. + setup_dbs(dup_flags); + int k,v; + for (k = 10; k <= 100; k+= 10) { + v = k+5; + put(TRUE, 'a', k, v); + } + if (dup_flags) { + cget_heavi(TRUE, TRUE, 'a', 100, 0, 100, 105, 1, 0, heavi_after); + } + else { + cget_heavi(TRUE, FALSE, 'a', 100, 0, 0, 0, 1, 0, heavi_after); + } + close_dbs(); + /* ********************************************************************** */ + //Locks stop at actual elements in the DB. + setup_dbs(dup_flags); + //int k,v; + for (k = 10; k <= 100; k+= 10) { + v = k+5; + put(TRUE, 'a', k, v); + } + cget_heavi(TRUE, FALSE, 'a', 105, 1, 0, 0, 1, 0, heavi_after); + put(FALSE, 'b', 104, 1); + put(FALSE, 'b', 105, 0); + put(FALSE, 'b', 105, 1); + put(FALSE, 'b', 105, 2); + put(FALSE, 'b', 106, 0); + put(TRUE, 'b', 99, 0); + put(dup_flags!=0, 'b', 100, 104); + close_dbs(); + /* ********************************************************************** */ + // Test behavior of heavi_after + setup_dbs(dup_flags); + //int k,v; + for (k = 10; k <= 100; k+= 10) { + v = k+5; + put(TRUE, 'a', k, v); + } + for (k = 5; k <= 95; k+= 10) { + v = k+5; + cget_heavi(TRUE, TRUE, 'a', k, v, k+5, v+5, 1, 1, heavi_after); + } + put(FALSE, 'b', -1, -2); + put(TRUE, 'b', 200, 201); + cget_heavi(FALSE, FALSE, 'a', 105, 105, 0, 0, 1, 0, heavi_after); + close_dbs(); + /* ********************************************************************** */ + // Test behavior of heavi_before + setup_dbs(dup_flags); + //int k,v; + for (k = 10; k <= 100; k+= 10) { + v = k+5; + put(TRUE, 'a', k, v); + } + for (k = 105; k >= 15; k-= 10) { + v = k+5; + cget_heavi(TRUE, TRUE, 'a', k, v, k-5, v-5, -1, -1, heavi_before); + } + put(FALSE, 'b', 200, 201); + put(TRUE, 'b', -1, -2); + cget_heavi(FALSE, FALSE, 'a', -5, -5, 0, 0, -1, 0, heavi_after); + close_dbs(); +} + void test(u_int32_t dup_flags) { /* ********************************************************************** */ setup_dbs(dup_flags); @@ -637,6 +824,9 @@ void test(u_int32_t dup_flags) { test_dbdel(dup_flags); /* ********************************************************************** */ test_current(dup_flags); + /* ********************************************************************** */ + test_heavi(dup_flags); + /* ********************************************************************** */ } diff --git a/src/ydb.c b/src/ydb.c index f986a95da62..2d96a5471a0 100644 --- a/src/ydb.c +++ b/src/ydb.c @@ -92,6 +92,146 @@ static int toku_db_cursor(DB *db, DB_TXN * txn, DBC **c, u_int32_t flags, int is /* lightweight cursor methods. */ static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra); +static int toku_c_getf_heavi(DBC *c, u_int32_t flags, + void(*f)(DBT const *key, DBT const *value, void *extra_f, int r_h), + void *extra_f, + int (*h)(const DBT *key, const DBT *value, void *extra_h), + void *extra_h, int direction); +// There is a total order on all key/value pairs in the database. +// In a DB_DUPSORT db, let V_i = (Key,Value) refer to the ith element (0 based indexing). +// In a NODUP db, let V_i = (Key) refer to the ith element (0 based indexing). +// We define V_{-1} = -\infty and +// V_{|V|} = \infty and +// h(-\infty,extra_h) = -1 by definition and +// h( \infty,extra_h) = 1 by definition +// Requires: Direction != 0 +// Effect: +// if direction >0 then find the smallest i such that h(V_i,extra_h)>=0. +// if direction <0 then find the largest i such that h(V_i,extra_h)<=0. +// Let signus(r_h) = signus(h(V_i, extra_h)) +// If flags&(DB_PRELOCKED|DB_PRELOCKED_WRITE) then skip locking +// That is, we already own the locks +// else +// if direction >0 then readlock [V_{i-1}, V_i] +// if direction <0 then readlock [V_i, V_{i+1}] +// That is, If we search from the right, lock the element we found, up to the +// next element to the right. +// If locking fails, return the locking error code +// +// If (0<=i<|V|) then +// call f(V_i.Key, V_i.Value, extra_f, r_h) +// Note: The lifetime of V_i.Key and V_i.Value is limited: they may only +// be referenced until f returns +// and return 0 +// else +// return DB_NOTFOUND +// Rationale: Locking +// If we approach from the left (direction<0) we need to prevent anyone +// from inserting anything to our right that could change our answer, +// so we lock the range from the element found, to the next element to the right. +// The inverse argument applies for approaching from the right. +// Rationale: passing r_h to f +// We want to save the performance hit of requiring f to call h again to +// find out what h's return value was. +// Rationale: separate extra_f, extra_h parameters +// If the same extra parameter is sent to both f and h, then you need a +// special struct for each tuple (f_i, h_i) you use instead of a struct for each +// f_i and each h_i. +// Requires: The signum of h is monotically increasing. +// Requires: f does not create references to key, value, or data within once f +// exits +// Returns +// 0 success +// DB_NOTFOUND i is not in [0,|V|) +// DB_LOCK_NOTGRANTED Failed to obtain a lock. +// On nonzero return, what c points to becomes undefined, That is, c becomes uninitialized +// Performance: ... TODO +// Implementation Notes: +// How do we get the extra locking information efficiently? +// After finding the target, we can copy the cursor, do a DB_NEXT, +// or do a DB_NEXT+DB_PREV (vice versa for direction<0). +// Can we have the BRT provide two key/value pairs instead of one? +// That is, brt_cursor_c_getf_heavi_and_next for direction >0 +// and brt_cursor_c_getf_heavi_and_prev for direction <0 +// Current suggestion is to make a copy of the cursor, and use the +// copy to find the next(prev) element by using DB_NEXT(DB_PREV). +// This has the overhead of needing to make a copy of the cursor, +// which probably has a memcpy involved. +// The argument against returning two key/value pairs is that +// we should not have to pay to retreive both when we're doing something +// simple like DB_NEXT. +// This could be mitigated by having two BRT functions (or one with a +// BOOL parameter) such that it only returns two values when necessary. +// Parameters +// c The cursor +// flags Additional bool parameters. The current allowed flags are +// DB_PRELOCKED and DB_PRELOCKED_WRITE (bitwise or'd to use both) +// h A heaviside function that, along with direction, defines the query. +// extra_h is passed to h +// For additional information on heaviside functions, see omt.h +// NOTE: In a DB_DUPSORT database, both key and value will be +// passed to h. In a NODUP database, only key will be passed to h. +// f A callback function (i.e. smart dbts) to provide the result of the +// query. key and value are the key/value pair found, extra_f is +// passed to f, r_h is the return value for h for the key and value returned. +// This is used as output. That is, we call f with the outputs of the +// function. +// direction Which direction to search in on the heaviside function. >0 +// means from the right, <0 means from the left. +// extra_f Any extra information required for f +// extra_h Any extra information required for h +// +// Example: +// Find the smallest V_i = (key_i,val_i) such that key_i > key_x, assume +// key.data and val.data are c strings, and print them out. +// Create a struct to hold key_x, that is extra_h +// Direction = 1 (We approach from the right, and want the smallest such +// element). +// Construct a heaviside function that returns >=0 if the +// given key > key_x, and -1 otherwise +// That is, call the comparison function on (key, key_x) +// Create a struct to hold key_x, that is extra_f +// construct f to call printf on key_x.data, key_i.data, val_i.data. +// Find the least upper bound (greatest lower bound) +// In this case, h can just return the comparison function's answer. +// direction >0 means upper bound, direction <0 means lower bound. +// (If you want upper/lower bound of the keyvalue pair, you need +// to call the comparison function on the values if the key comparison +// returns 0). +// Handlerton implications: +// The handlerton needs at most one heaviside function per special query type (where a +// special query is one that is not directly supported by the bdb api excluding +// this function). +// It is possible that more than query type can use the same heaviside function +// if the extra_h parameter can be used to change its behavior sufficiently. +// +// That is, part of extra_h can be a boolean strictly_greater +// You can construct a single heaviside function that converts 0 to -1 +// (strictly greater) from the comparison function, or one that just returns +// the results of the comparison function (greater or equal). +// +// Implementation Notes: +// The BRT search function supports the following searches: +// SEARCH_LEFT(h(V_i)) +// Given a step function b, that goes from 0 to 1 +// find the greatest i such that h_b(V_i) == 1 +// If it does not exist, return not found +// SEARCH_RIGHT(h(V_i)) +// Given a step function b, that goes from 1 to 0 +// find the smallest i such that h_b(V_i) == 1 +// If it does not exist, return not found +// We can implement c_getf_heavi using these BRT search functions. +// A query of direction<0: +// Create wrapper function B +// return h(V_i) <=0 ? 1 : 0; +// SEARCH_RIGHT(B) +// A query of direction>0: +// Create wrapper function B +// return h(V_i) >=0 ? 1 : 0; +// SEARCH_LEFT(B) + +// Effect: Lightweight cursor get + /* cursor methods */ static int toku_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag); static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag); @@ -1639,7 +1779,6 @@ static int toku_c_getf_next_old(DBC *c, u_int32_t flag, void(*f)(DBT const *key, static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) { HANDLE_PANICKED_DB(c->dbp); - //int prelocked = flag & DB_PRELOCKED; if (toku_c_uninitialized(c)) return toku_c_getf_next_old(c, flag, f, extra); //return toku_c_getf_first(c, flag, f, extra); u_int32_t lock_flags = get_prelocked_flags(flag); flag &= ~lock_flags; @@ -1684,6 +1823,130 @@ static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT return r; } +static int locked_c_getf_heavi(DBC *c, u_int32_t flags, + void(*f)(DBT const *key, DBT const *value, void *extra_f, int r_h), + void *extra_f, + int (*h)(const DBT *key, const DBT *value, void *extra_h), + void *extra_h, int direction) { + toku_ydb_lock(); int r = toku_c_getf_heavi(c, flags, f, extra_f, h, extra_h, direction); toku_ydb_unlock(); return r; +} + +static int toku_c_getf_heavi(DBC *c, u_int32_t flags, + void(*f)(DBT const *key, DBT const *value, void *extra_f, int r_h), + void *extra_f, + int (*h)(const DBT *key, const DBT *value, void *extra_h), + void *extra_h, int direction) { + if (direction==0) return EINVAL; + DBC *tmp_c = NULL; + int r; + u_int32_t lock_flags = get_prelocked_flags(flags); + flags &= ~lock_flags; + assert(flags==0); + struct heavi_wrapper wrapper; + wrapper.h = h; + wrapper.extra_h = extra_h; + wrapper.r_h = direction; + + TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL; + int c_get_result = toku_brt_cursor_get_heavi(c->i->c, NULL, NULL, txn, direction, &wrapper); + if (c_get_result!=0 && c_get_result!=DB_NOTFOUND) { r = c_get_result; goto cleanup; } + BOOL found = c_get_result==0; + DB *db=c->dbp; + toku_lock_tree* lt = db->i->lt; + if (lt!=NULL && !lock_flags) { + DBT tmp_key; + DBT tmp_val; + DBT *left_key, *left_val, *right_key, *right_val; + if (direction<0) { + if (!found) { + r = toku_brt_cursor_get(c->i->c, NULL, NULL, DB_FIRST, txn); + if (r!=0 && r!=DB_NOTFOUND) goto cleanup; + if (r==DB_NOTFOUND) right_key = right_val = (DBT*)toku_lt_infinity; + else { + //Restore cursor to the 'uninitialized' state it was just in. + brt_cursor_restore_state_from_prev(c->i->c); + right_key = brt_cursor_peek_prev_key(c->i->c); + right_val = brt_cursor_peek_prev_val(c->i->c); + } + left_key = left_val = (DBT*)toku_lt_neg_infinity; + } + else { + left_key = brt_cursor_peek_current_key(c->i->c); + left_val = brt_cursor_peek_current_val(c->i->c); + //Try to find right end the fast way. + r = toku_brt_cursor_peek_next(c->i->c, &tmp_key, &tmp_val); + if (r==0) { + right_key = &tmp_key; + right_val = &tmp_val; + } + else { + //Find the right end the slow way. + if ((r = toku_db_cursor(c->dbp, c->i->txn, &tmp_c, 0, 0))) goto cleanup; + r=toku_brt_cursor_after(tmp_c->i->c, left_key, left_val, + NULL, NULL, txn); + if (r!=0 && r!=DB_NOTFOUND) goto cleanup; + if (r==DB_NOTFOUND) right_key = right_val = (DBT*)toku_lt_infinity; + else { + right_key = brt_cursor_peek_current_key(tmp_c->i->c); + right_val = brt_cursor_peek_current_val(tmp_c->i->c); + } + } + } + } + else { + //direction>0 + if (!found) { + r = toku_brt_cursor_get(c->i->c, NULL, NULL, DB_LAST, txn); + if (r!=0 && r!=DB_NOTFOUND) goto cleanup; + if (r==DB_NOTFOUND) left_key = left_val = (DBT*)toku_lt_neg_infinity; + else { + //Restore cursor to the 'uninitialized' state it was just in. + brt_cursor_restore_state_from_prev(c->i->c); + left_key = brt_cursor_peek_prev_key(c->i->c); + left_val = brt_cursor_peek_prev_val(c->i->c); + } + right_key = right_val = (DBT*)toku_lt_infinity; + } + else { + right_key = brt_cursor_peek_current_key(c->i->c); + right_val = brt_cursor_peek_current_val(c->i->c); + //Try to find left end the fast way. + r=toku_brt_cursor_peek_prev(c->i->c, &tmp_key, &tmp_val); + if (r==0) { + left_key = &tmp_key; + left_val = &tmp_val; + } + else { + //Find the left end the slow way. + if ((r = toku_db_cursor(c->dbp, c->i->txn, &tmp_c, 0, 0))) goto cleanup; + r=toku_brt_cursor_before(tmp_c->i->c, right_key, right_val, + NULL, NULL, txn); + if (r==DB_NOTFOUND) left_key = left_val = (DBT*)toku_lt_neg_infinity; + else { + left_key = brt_cursor_peek_current_key(tmp_c->i->c); + left_val = brt_cursor_peek_current_val(tmp_c->i->c); + } + } + } + } + DB_TXN* txn_anc = toku_txn_ancestor(c->i->txn); + TXNID id_anc = toku_txn_get_txnid(txn_anc->i->tokutxn); + if ((r = toku_txn_add_lt(txn_anc, lt))) goto cleanup; + r = toku_lt_acquire_range_read_lock(lt, db, id_anc, + left_key, left_val, + right_key, right_val); + if (r!=0) goto cleanup; + } + if (found) { + f(brt_cursor_peek_current_key(c->i->c), brt_cursor_peek_current_val(c->i->c), extra_f, wrapper.r_h); + } + r = c_get_result; +cleanup:; + int r2 = 0; + if (tmp_c) r2 = toku_c_close(tmp_c); + return r ? r : r2; +} + static int toku_c_close(DBC * c) { int r = toku_brt_cursor_close(c->i->c); toku_free(c->i); @@ -1987,13 +2250,16 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int if (result == 0) return ENOMEM; memset(result, 0, sizeof *result); - result->c_get = locked_c_get; - result->c_pget = locked_c_pget; - result->c_put = locked_c_put; - result->c_close = locked_c_close; - result->c_del = locked_c_del; - result->c_count = locked_c_count; - result->c_getf_next = locked_c_getf_next; +#define SCRS(name) result->name = locked_ ## name + SCRS(c_get); + SCRS(c_pget); + SCRS(c_put); + SCRS(c_close); + SCRS(c_del); + SCRS(c_count); + SCRS(c_getf_next); + SCRS(c_getf_heavi); +#undef SCRS MALLOC(result->i); assert(result->i); result->dbp = db;