fix bug 3: use the application's compare function in the pma rather than the default compare function

git-svn-id: file:///svn/tokudb@604 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
Rich Prohaska 2007-11-16 22:06:27 +00:00
parent 96b7ff0908
commit 5b24c06e48
5 changed files with 165 additions and 20 deletions

View file

@ -95,7 +95,7 @@ struct brt {
/* serialization code */
void serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node);
int deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int nodesize);
int deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int nodesize, int (*)(DB *, const DBT*, const DBT*));
unsigned int serialize_brtnode_size(BRTNODE node); /* How much space will it take? */
int keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len);

View file

@ -42,7 +42,7 @@ void test_serialize(void) {
serialize_brtnode_to(fd, sn.nodesize*20, sn.nodesize, &sn); assert(r==0);
r = deserialize_brtnode_from(fd, nodesize*20, &dn, nodesize);
r = deserialize_brtnode_from(fd, nodesize*20, &dn, nodesize, 0);
assert(r==0);
assert(dn->thisnodename==nodesize*20);

View file

@ -178,7 +178,8 @@ void serialize_brtnode_to(int fd, DISKOFF off, DISKOFF size, BRTNODE node) {
toku_free(buf);
}
int deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int nodesize) {
int deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int nodesize,
int (*compare_fun)(DB *, const DBT *, const DBT *)) {
TAGMALLOC(BRTNODE, result);
struct rbuf rc;
int i;
@ -334,7 +335,7 @@ int deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int nodesiz
} else {
int n_in_buf = rbuf_int(&rc);
result->u.l.n_bytes_in_buffer = 0;
r=pma_create(&result->u.l.buffer, default_compare_fun, nodesize);
r=pma_create(&result->u.l.buffer, compare_fun, nodesize);
if (r!=0) {
if (0) { died_21: pma_free(&result->u.l.buffer); }
goto died1;

View file

@ -173,10 +173,10 @@ void brtnode_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void *brtnod
//printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced);
}
int brtnode_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **brtnode_pv, long *sizep __attribute__((unused)), void*extraargs, LSN *written_lsn) {
long nodesize=(long)extraargs;
int brtnode_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **brtnode_pv, long *sizep, void*extraargs, LSN *written_lsn) {
BRT t =(BRT)extraargs;
BRTNODE *result=(BRTNODE*)brtnode_pv;
int r = deserialize_brtnode_from(cachefile_fd(cachefile), nodename, result, nodesize);
int r = deserialize_brtnode_from(cachefile_fd(cachefile), nodename, result, t->nodesize, t->compare_fun);
if (r == 0)
*sizep = brtnode_size(*result);
*written_lsn = (*result)->lsn;
@ -320,7 +320,7 @@ static void create_new_brtnode (BRT t, BRTNODE *result, int height, BRTNODE pare
// n->brt = t;
//printf("%s:%d putting %p (%lld) parent=%p\n", __FILE__, __LINE__, n, n->thisnodename, parent_brtnode);
r=cachetable_put(t->cf, n->thisnodename, n, brtnode_size(n),
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)t->h->nodesize);
brtnode_flush_callback, brtnode_fetch_callback, t);
assert(r==0);
}
@ -745,7 +745,7 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum,
DISKOFF targetchild = node->u.n.children[childnum];
assert(targetchild>=0 && targetchild<t->h->unused_memory); // This assertion could fail in a concurrent setting since another process might have bumped unused memory.
r = cachetable_get_and_pin(t->cf, targetchild, &childnode_v, NULL,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)t->h->nodesize);
brtnode_flush_callback, brtnode_fetch_callback, t);
if (r!=0) return r;
//printf("%s:%d pin %p\n", __FILE__, __LINE__, childnode_v);
child=childnode_v;
@ -960,7 +960,7 @@ static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD *cmd,
r = cachetable_maybe_get_and_pin(t->cf, node->u.n.children[childnum], &child_v);
else
r = cachetable_get_and_pin(t->cf, node->u.n.children[childnum], &child_v, NULL,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)t->h->nodesize);
brtnode_flush_callback, brtnode_fetch_callback, t);
if (r != 0)
return r;
@ -1175,7 +1175,7 @@ static int setup_brt_root_node (BRT t, DISKOFF offset) {
}
//printf("%s:%d putting %p (%lld)\n", __FILE__, __LINE__, node, node->thisnodename);
r=cachetable_put(t->cf, offset, node, brtnode_size(node),
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)t->h->nodesize);
brtnode_flush_callback, brtnode_fetch_callback, t);
if (r!=0) {
toku_free(node);
return r;
@ -1470,7 +1470,7 @@ int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKE
if (r!=0) return r;
//printf("%s:%d put %lld\n", __FILE__, __LINE__, brt->root);
cachetable_put(brt->cf, newroot_diskoff, newroot, brtnode_size(newroot),
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize);
brtnode_flush_callback, brtnode_fetch_callback, brt);
brt_update_cursors_new_root(brt, newroot, nodea, nodeb);
return 0;
}
@ -1492,7 +1492,7 @@ static int brt_root_put_cmd(BRT brt, BRT_CMD *cmd, TOKUTXN txn) {
rootp = toku_calculate_root_offset_pointer(brt);
if (debug) printf("%s:%d Getting %lld\n", __FILE__, __LINE__, *rootp);
if ((r=cachetable_get_and_pin(brt->cf, *rootp, &node_v, NULL,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize))) {
brtnode_flush_callback, brtnode_fetch_callback, brt))) {
goto died0;
}
//printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
@ -1547,7 +1547,7 @@ int brt_lookup_node (BRT brt, DISKOFF off, DBT *k, DBT *v, DB *db, BRTNODE paren
int result;
void *node_v;
int r = cachetable_get_and_pin(brt->cf, off, &node_v, NULL,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize);
brtnode_flush_callback, brtnode_fetch_callback, brt);
if (r!=0)
return r;
@ -1644,7 +1644,7 @@ int dump_brtnode (BRT brt, DISKOFF off, int depth, bytevec lorange, ITEMLEN lole
BRTNODE node;
void *node_v;
int r = cachetable_get_and_pin(brt->cf, off, &node_v, NULL,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize);
brtnode_flush_callback, brtnode_fetch_callback, brt);
assert(r==0);
printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node=node_v;
@ -1712,7 +1712,7 @@ int show_brtnode_blocknumbers (BRT brt, DISKOFF off, BRTNODE parent_brtnode) {
int i,r;
assert(off%brt->h->nodesize==0);
if ((r = cachetable_get_and_pin(brt->cf, off, &node_v, NULL,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize))) {
brtnode_flush_callback, brtnode_fetch_callback, brt))) {
if (0) { died0: cachetable_unpin(brt->cf, off, 0, 0); }
return r;
}
@ -2082,7 +2082,7 @@ int brtcurs_set_position_last (BRT_CURSOR cursor, DISKOFF off, DBT *key, DB *db,
void *node_v;
int r = cachetable_get_and_pin(brt->cf, off, &node_v, NULL,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize);
brtnode_flush_callback, brtnode_fetch_callback, brt);
if (r!=0) {
if (0) { died0: cachetable_unpin(brt->cf, off, 1, 0); }
return r;
@ -2144,7 +2144,7 @@ int brtcurs_set_position_first (BRT_CURSOR cursor, DISKOFF off, DBT *key, DB *db
void *node_v;
int r = cachetable_get_and_pin(brt->cf, off, &node_v, NULL,
brtnode_flush_callback, brtnode_fetch_callback, (void*)(long)brt->h->nodesize);
brtnode_flush_callback, brtnode_fetch_callback, brt);
if (r!=0) {
if (0) { died0: cachetable_unpin(brt->cf, off, 1, 0); }
return r;
@ -2327,7 +2327,7 @@ int brtcurs_set_key(BRT_CURSOR cursor, DISKOFF off, DBT *key, DBT *val, int flag
void *node_v;
int r;
r = cachetable_get_and_pin(brt->cf, off, &node_v, NULL, brtnode_flush_callback,
brtnode_fetch_callback, (void*)(long)brt->h->nodesize);
brtnode_fetch_callback, brt);
if (r != 0)
return r;
@ -2389,7 +2389,7 @@ int brtcurs_set_range(BRT_CURSOR cursor, DISKOFF off, DBT *key, DB *db, TOKUTXN
void *node_v;
int r;
r = cachetable_get_and_pin(brt->cf, off, &node_v, NULL, brtnode_flush_callback,
brtnode_fetch_callback, (void*)(long)brt->h->nodesize);
brtnode_fetch_callback, brt);
if (r != 0)
return r;

View file

@ -0,0 +1,144 @@
/* try a reverse compare function to verify that the database always uses the application's
compare function */
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <db.h>
DBT *dbt_init(DBT *dbt, void *data, u_int32_t size) {
memset(dbt, 0, sizeof *dbt);
dbt->data = data;
dbt->size = size;
return dbt;
}
DBT *dbt_init_malloc(DBT *dbt) {
memset(dbt, 0, sizeof *dbt);
dbt->flags = DB_DBT_MALLOC;
return dbt;
}
int keycompare (const void *key1, unsigned int key1len, const void *key2, unsigned int key2len) {
if (key1len==key2len) {
return memcmp(key1,key2,key1len);
} else if (key1len<key2len) {
int r = memcmp(key1,key2,key1len);
if (r<=0) return -1; /* If the keys are the same up to 1's length, then return -1, since key1 is shorter than key2. */
else return 1;
} else {
return -keycompare(key2,key2len,key1,key1len);
}
}
int reverse_compare(DB *db __attribute__((__unused__)), const DBT *a, const DBT*b) {
return -keycompare(a->data, a->size, b->data, b->size);
}
void expect(DBC *cursor, int k, int v) {
DBT key, val;
int r = cursor->c_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_NEXT);
assert(r == 0);
assert(key.size == sizeof k);
int kk;
memcpy(&kk, key.data, key.size);
assert(val.size == sizeof v);
int vv;
memcpy(&vv, val.data, val.size);
if (kk != k || vv != v) printf("expect key %d got %d - %d %d\n", htonl(k), htonl(kk), htonl(v), htonl(vv));
assert(kk == k);
assert(vv == v);
free(key.data);
free(val.data);
}
void test_reverse_compare(int n) {
printf("test_reverse_compare:%d\n", n);
DB_ENV * const null_env = 0;
DB *db;
DB_TXN * const null_txn = 0;
const char * const fname = "test.reverse.compare.brt";
int r;
int i;
unlink(fname);
/* create the dup database file */
r = db_create(&db, null_env, 0);
assert(r == 0);
r = db->set_flags(db, 0);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->set_bt_compare(db, reverse_compare);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
/* insert n unique keys {0, 1, n-1} */
for (i=0; i<n; i++) {
DBT key, val;
int k, v;
k = htonl(i);
dbt_init(&key, &k, sizeof k);
v = i;
dbt_init(&val, &v, sizeof v);
r = db->put(db, null_txn, &key, &val, 0);
assert(r == 0);
}
/* reopen the database to force nonleaf buffering */
r = db->close(db, 0);
assert(r == 0);
r = db_create(&db, null_env, 0);
assert(r == 0);
r = db->set_flags(db, 0);
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->set_bt_compare(db, reverse_compare);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, 0, 0666);
assert(r == 0);
/* insert n unique keys {n, n+1, 2*n-1} */
for (i=n; i<2*n; i++) {
DBT key, val;
int k, v;
k = htonl(i);
dbt_init(&key, &k, sizeof k);
v = i;
dbt_init(&val, &v, sizeof v);
r = db->put(db, null_txn, &key, &val, 0);
assert(r == 0);
}
/* verify the sort order with a cursor */
DBC *cursor;
r = db->cursor(db, null_txn, &cursor, 0);
assert(r == 0);
//for (i=0; i<2*n; i++)
for (i=2*n-1; i>=0; i--)
expect(cursor, htonl(i), i);
r = cursor->c_close(cursor);
assert(r == 0);
r = db->close(db, 0);
assert(r == 0);
}
int main() {
int i;
for (i = 1; i <= (1<<16); i *= 2) {
test_reverse_compare(i);
}
return 0;
}