diff --git a/newbrt/brt-internal.h b/newbrt/brt-internal.h index 7665f869f83..d0d4b99a6b3 100644 --- a/newbrt/brt-internal.h +++ b/newbrt/brt-internal.h @@ -95,7 +95,7 @@ struct brt { /* serialization code */ void serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node); -int deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int nodesize); +int deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int nodesize, int (*)(DB *, const DBT*, const DBT*)); unsigned int serialize_brtnode_size(BRTNODE node); /* How much space will it take? */ int keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len); diff --git a/newbrt/brt-serialize-test.c b/newbrt/brt-serialize-test.c index 12e0580f7ca..d19bda514cc 100644 --- a/newbrt/brt-serialize-test.c +++ b/newbrt/brt-serialize-test.c @@ -42,7 +42,7 @@ void test_serialize(void) { serialize_brtnode_to(fd, sn.nodesize*20, sn.nodesize, &sn); assert(r==0); - r = deserialize_brtnode_from(fd, nodesize*20, &dn, nodesize); + r = deserialize_brtnode_from(fd, nodesize*20, &dn, nodesize, 0); assert(r==0); assert(dn->thisnodename==nodesize*20); diff --git a/newbrt/brt-serialize.c b/newbrt/brt-serialize.c index 41d89ec0552..10b6f181167 100644 --- a/newbrt/brt-serialize.c +++ b/newbrt/brt-serialize.c @@ -178,7 +178,8 @@ void serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node) { toku_free(buf); } -int deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int nodesize) { +int deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int nodesize, + int (*compare_fun)(DB *, const DBT *, const DBT *)) { TAGMALLOC(BRTNODE, result); struct rbuf rc; int i; @@ -334,7 +335,7 @@ int deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int nodesiz } else { int n_in_buf = rbuf_int(&rc); result->u.l.n_bytes_in_buffer = 0; - r=pma_create(&result->u.l.buffer, default_compare_fun, nodesize); + r=pma_create(&result->u.l.buffer, compare_fun, nodesize); if (r!=0) { if (0) { died_21: pma_free(&result->u.l.buffer); } goto died1; diff --git a/newbrt/brt.c b/newbrt/brt.c index 3b3192f7e9d..549479126dd 100644 --- a/newbrt/brt.c +++ b/newbrt/brt.c @@ -173,10 +173,10 @@ void brtnode_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *brtnod //printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced); } -int brtnode_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **brtnode_pv, long *sizep __attribute__((unused)), void*extraargs, LSN *written_lsn) { - long nodesize=(long)extraargs; +int brtnode_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **brtnode_pv, long *sizep, void*extraargs, LSN *written_lsn) { + BRT t =(BRT)extraargs; BRTNODE *result=(BRTNODE*)brtnode_pv; - int r = deserialize_brtnode_from(cachefile_fd(cachefile), nodename, result, nodesize); + int r = deserialize_brtnode_from(cachefile_fd(cachefile), nodename, result, t->nodesize, t->compare_fun); if (r == 0) *sizep = brtnode_size(*result); *written_lsn = (*result)->lsn; @@ -320,7 +320,7 @@ static void create_new_brtnode (BRT t, BRTNODE *result, int height, BRTNODE pare // n->brt = t; //printf("%s:%d putting %p (%lld) parent=%p\n", __FILE__, __LINE__, n, n->thisnodename, parent_brtnode); r=cachetable_put(t->cf, n->thisnodename, n, brtnode_size(n), - brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)t->h->nodesize); + brtnode_flush_callback, brtnode_fetch_callback, t); assert(r==0); } @@ -745,7 +745,7 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum, DISKOFF targetchild = node->u.n.children[childnum]; assert(targetchild>=0 && targetchildh->unused_memory); // This assertion could fail in a concurrent setting since another process might have bumped unused memory. r = cachetable_get_and_pin(t->cf, targetchild, &childnode_v, NULL, - brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)t->h->nodesize); + brtnode_flush_callback, brtnode_fetch_callback, t); if (r!=0) return r; //printf("%s:%d pin %p\n", __FILE__, __LINE__, childnode_v); child=childnode_v; @@ -960,7 +960,7 @@ static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD *cmd, r = cachetable_maybe_get_and_pin(t->cf, node->u.n.children[childnum], &child_v); else r = cachetable_get_and_pin(t->cf, node->u.n.children[childnum], &child_v, NULL, - brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)t->h->nodesize); + brtnode_flush_callback, brtnode_fetch_callback, t); if (r != 0) return r; @@ -1175,7 +1175,7 @@ static int setup_brt_root_node (BRT t, DISKOFF offset) { } //printf("%s:%d putting %p (%lld)\n", __FILE__, __LINE__, node, node->thisnodename); r=cachetable_put(t->cf, offset, node, brtnode_size(node), - brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)t->h->nodesize); + brtnode_flush_callback, brtnode_fetch_callback, t); if (r!=0) { toku_free(node); return r; @@ -1470,7 +1470,7 @@ int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKE if (r!=0) return r; //printf("%s:%d put %lld\n", __FILE__, __LINE__, brt->root); cachetable_put(brt->cf, newroot_diskoff, newroot, brtnode_size(newroot), - brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize); + brtnode_flush_callback, brtnode_fetch_callback, brt); brt_update_cursors_new_root(brt, newroot, nodea, nodeb); return 0; } @@ -1492,7 +1492,7 @@ static int brt_root_put_cmd(BRT brt, BRT_CMD *cmd, TOKUTXN txn) { rootp = toku_calculate_root_offset_pointer(brt); if (debug) printf("%s:%d Getting %lld\n", __FILE__, __LINE__, *rootp); if ((r=cachetable_get_and_pin(brt->cf, *rootp, &node_v, NULL, - brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize))) { + brtnode_flush_callback, brtnode_fetch_callback, brt))) { goto died0; } //printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v); @@ -1547,7 +1547,7 @@ int brt_lookup_node (BRT brt, DISKOFF off, DBT *k, DBT *v, DB *db, BRTNODE paren int result; void *node_v; int r = cachetable_get_and_pin(brt->cf, off, &node_v, NULL, - brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize); + brtnode_flush_callback, brtnode_fetch_callback, brt); if (r!=0) return r; @@ -1644,7 +1644,7 @@ int dump_brtnode (BRT brt, DISKOFF off, int depth, bytevec lorange, ITEMLEN lole BRTNODE node; void *node_v; int r = cachetable_get_and_pin(brt->cf, off, &node_v, NULL, - brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize); + brtnode_flush_callback, brtnode_fetch_callback, brt); assert(r==0); printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v); node=node_v; @@ -1712,7 +1712,7 @@ int show_brtnode_blocknumbers (BRT brt, DISKOFF off, BRTNODE parent_brtnode) { int i,r; assert(off%brt->h->nodesize==0); if ((r = cachetable_get_and_pin(brt->cf, off, &node_v, NULL, - brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize))) { + brtnode_flush_callback, brtnode_fetch_callback, brt))) { if (0) { died0: cachetable_unpin(brt->cf, off, 0, 0); } return r; } @@ -2082,7 +2082,7 @@ int brtcurs_set_position_last (BRT_CURSOR cursor, DISKOFF off, DBT *key, DB *db, void *node_v; int r = cachetable_get_and_pin(brt->cf, off, &node_v, NULL, - brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize); + brtnode_flush_callback, brtnode_fetch_callback, brt); if (r!=0) { if (0) { died0: cachetable_unpin(brt->cf, off, 1, 0); } return r; @@ -2144,7 +2144,7 @@ int brtcurs_set_position_first (BRT_CURSOR cursor, DISKOFF off, DBT *key, DB *db void *node_v; int r = cachetable_get_and_pin(brt->cf, off, &node_v, NULL, - brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize); + brtnode_flush_callback, brtnode_fetch_callback, brt); if (r!=0) { if (0) { died0: cachetable_unpin(brt->cf, off, 1, 0); } return r; @@ -2327,7 +2327,7 @@ int brtcurs_set_key(BRT_CURSOR cursor, DISKOFF off, DBT *key, DBT *val, int flag void *node_v; int r; r = cachetable_get_and_pin(brt->cf, off, &node_v, NULL, brtnode_flush_callback, - brtnode_fetch_callback, (void*)(long)brt->h->nodesize); + brtnode_fetch_callback, brt); if (r != 0) return r; @@ -2389,7 +2389,7 @@ int brtcurs_set_range(BRT_CURSOR cursor, DISKOFF off, DBT *key, DB *db, TOKUTXN void *node_v; int r; r = cachetable_get_and_pin(brt->cf, off, &node_v, NULL, brtnode_flush_callback, - brtnode_fetch_callback, (void*)(long)brt->h->nodesize); + brtnode_fetch_callback, brt); if (r != 0) return r; diff --git a/src/tests/test_reverse_compare_fun.c b/src/tests/test_reverse_compare_fun.c new file mode 100644 index 00000000000..5542a4da05b --- /dev/null +++ b/src/tests/test_reverse_compare_fun.c @@ -0,0 +1,144 @@ +/* try a reverse compare function to verify that the database always uses the application's + compare function */ + +#include +#include +#include +#include +#include +#include +#include + +DBT *dbt_init(DBT *dbt, void *data, u_int32_t size) { + memset(dbt, 0, sizeof *dbt); + dbt->data = data; + dbt->size = size; + return dbt; +} + +DBT *dbt_init_malloc(DBT *dbt) { + memset(dbt, 0, sizeof *dbt); + dbt->flags = DB_DBT_MALLOC; + return dbt; +} + +int keycompare (const void *key1, unsigned int key1len, const void *key2, unsigned int key2len) { + if (key1len==key2len) { + return memcmp(key1,key2,key1len); + } else if (key1lendata, a->size, b->data, b->size); +} + +void expect(DBC *cursor, int k, int v) { + DBT key, val; + int r = cursor->c_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_NEXT); + assert(r == 0); + assert(key.size == sizeof k); + int kk; + memcpy(&kk, key.data, key.size); + assert(val.size == sizeof v); + int vv; + memcpy(&vv, val.data, val.size); + if (kk != k || vv != v) printf("expect key %d got %d - %d %d\n", htonl(k), htonl(kk), htonl(v), htonl(vv)); + assert(kk == k); + assert(vv == v); + + free(key.data); + free(val.data); +} + +void test_reverse_compare(int n) { + printf("test_reverse_compare:%d\n", n); + + DB_ENV * const null_env = 0; + DB *db; + DB_TXN * const null_txn = 0; + const char * const fname = "test.reverse.compare.brt"; + int r; + int i; + + unlink(fname); + + /* create the dup database file */ + r = db_create(&db, null_env, 0); + assert(r == 0); + r = db->set_flags(db, 0); + assert(r == 0); + r = db->set_pagesize(db, 4096); + assert(r == 0); + r = db->set_bt_compare(db, reverse_compare); + assert(r == 0); + r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666); + assert(r == 0); + + /* insert n unique keys {0, 1, n-1} */ + for (i=0; iput(db, null_txn, &key, &val, 0); + assert(r == 0); + } + + /* reopen the database to force nonleaf buffering */ + r = db->close(db, 0); + assert(r == 0); + r = db_create(&db, null_env, 0); + assert(r == 0); + r = db->set_flags(db, 0); + assert(r == 0); + r = db->set_pagesize(db, 4096); + assert(r == 0); + r = db->set_bt_compare(db, reverse_compare); + assert(r == 0); + r = db->open(db, null_txn, fname, "main", DB_BTREE, 0, 0666); + assert(r == 0); + + /* insert n unique keys {n, n+1, 2*n-1} */ + for (i=n; i<2*n; i++) { + DBT key, val; + int k, v; + k = htonl(i); + dbt_init(&key, &k, sizeof k); + v = i; + dbt_init(&val, &v, sizeof v); + r = db->put(db, null_txn, &key, &val, 0); + assert(r == 0); + } + + /* verify the sort order with a cursor */ + DBC *cursor; + r = db->cursor(db, null_txn, &cursor, 0); + assert(r == 0); + + //for (i=0; i<2*n; i++) + for (i=2*n-1; i>=0; i--) + expect(cursor, htonl(i), i); + + r = cursor->c_close(cursor); + assert(r == 0); + + r = db->close(db, 0); + assert(r == 0); +} + +int main() { + int i; + + for (i = 1; i <= (1<<16); i *= 2) { + test_reverse_compare(i); + } + return 0; +}