diff --git a/newbrt/Makefile b/newbrt/Makefile index 4fa6f22216a..0538ea5be1c 100644 --- a/newbrt/Makefile +++ b/newbrt/Makefile @@ -1,6 +1,6 @@ # GCOV_FLAGS = -fprofile-arcs -ftest-coverage #PROF_FLAGS = -pg -#OPTFLAGS = -O2 +OPTFLAGS = -O2 CFLAGS = -Wall -W $(OPTFLAGS) -g $(GCOV_FLAGS) $(PROF_FLAGS) -Werror -fPIC LDFLAGS = $(OPTFLAGS) -g $(GCOV_FLAGS) $(PROF_FLAGS) @@ -24,7 +24,7 @@ pma.o: pma.h yerror.h pma-internal.h memory.h key.h ybt.h brttypes.h ../include ybt.o: ybt.h brttypes.h ybt-test: ybt-test.o ybt.o memory.o cachetable.o: cachetable.h -brt-test: brt.o hashtable.o pma.o memory.o brt-serialize.o cachetable.o header-io.o ybt.o key.o +brt-test: ybt.o brt.o hashtable.o pma.o memory.o brt-serialize.o cachetable.o header-io.o ybt.o key.o brt-test.o brt.o: brt.h cachetable.h brttypes.h brt-serialize-test.o: pma.h yerror.h brt.h memory.h hashtable.h brttypes.h brt-internal.h brt.o: brt.h mdict.h pma.h brttypes.h memory.h brt-internal.h cachetable.h diff --git a/newbrt/brt-internal.h b/newbrt/brt-internal.h index 4bb2e80b902..e1d6d1e1c96 100644 --- a/newbrt/brt-internal.h +++ b/newbrt/brt-internal.h @@ -63,13 +63,16 @@ struct brt { struct brt_header *h; BRT_CURSOR cursors_head, cursors_tail; + + int (*compare_fun)(DB*,DBT*,DBT*); + + void *skey,*sval; /* Used for DBT return values. */ }; /* serialization code */ void serialize_brtnode_to(int fd, diskoff off, diskoff size, BRTNODE node); int deserialize_brtnode_from (int fd, diskoff off, BRTNODE *brtnode, int nodesize); unsigned int serialize_brtnode_size(BRTNODE node); /* How much space will it take? */ -unsigned int brtnode_which_child (BRTNODE node, bytevec key, ITEMLEN keylen); int keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len); void verify_counts(BRTNODE); @@ -77,7 +80,7 @@ void verify_counts(BRTNODE); int serialize_brt_header_to (int fd, struct brt_header *h); int deserialize_brtheader_from (int fd, diskoff off, struct brt_header **brth); -static inline int brtnode_n_hashtables(BRTNODE node) { if (node->height==0) return 1; else return node->u.n.n_children; } +//static inline int brtnode_n_hashtables(BRTNODE node) { if (node->height==0) return 1; else return node->u.n.n_children; } //int write_brt_header (int fd, struct brt_header *header); diff --git a/newbrt/brt-serialize.c b/newbrt/brt-serialize.c index 32ac48034bb..85e7501ecda 100644 --- a/newbrt/brt-serialize.c +++ b/newbrt/brt-serialize.c @@ -4,6 +4,7 @@ #include "memory.h" //#include "pma.h" #include "brt-internal.h" +#include "key.h" #include #include @@ -87,7 +88,7 @@ static unsigned int serialize_brtnode_size_slow(BRTNODE node) { for (i=0; iu.n.n_children; i++) { size+=8; } - int n_hashtables = brtnode_n_hashtables(node); + int n_hashtables = node->u.n.n_bytes_in_hashtables; size+=4; /* n_entries */ for (i=0; i< n_hashtables; i++) { HASHTABLE_ITERATE(node->u.n.htables[i], @@ -118,8 +119,7 @@ unsigned int serialize_brtnode_size (BRTNODE node) { result+=4; /* n_children */ result+=4*(node->u.n.n_children-1); /* key lengths */ result+=node->u.n.totalchildkeylens; /* the lengths of the pivot keys, without their key lengths. */ - result+=8*(node->u.n.n_children); /* child offsets. */ - result+=4; /* n_entries in hash table. */ + result+=(8+4)*(node->u.n.n_children); /* For each child, a child offset and a count for the number of hash table entries. */ result+=node->u.n.n_bytes_in_hashtables; } else { result+=4; /* n_entries in buffer table. */ @@ -158,15 +158,10 @@ void serialize_brtnode_to(int fd, diskoff off, diskoff size, BRTNODE node) { } { - int n_entries=0; - int n_hash_tables = brtnode_n_hashtables(node); + int n_hash_tables = node->u.n.n_children; for (i=0; i< n_hash_tables; i++) { //printf("%s:%d p%d=%p n_entries=%d\n", __FILE__, __LINE__, i, node->mdicts[i], mdict_n_entries(node->mdicts[i])); - n_entries += hashtable_n_entries(node->u.n.htables[i]); - } - //printf("%s:%d n_entries=%d\n", __FILE__, __LINE__, n_entries); - wbuf_int(&w, n_entries); - for (i=0; i< n_hash_tables; i++) { + wbuf_int(&w, hashtable_n_entries(node->u.n.htables[i])); HASHTABLE_ITERATE(node->u.n.htables[i], key, keylen, data, datalen, (wbuf_bytes(&w, key, keylen), wbuf_bytes(&w, data, datalen))); @@ -206,7 +201,7 @@ int deserialize_brtnode_from (int fd, diskoff off, BRTNODE *brtnode, int nodesiz } { uint32_t datasize_n; - int r = pread(fd, &datasize_n, sizeof(datasize_n), off); + r = pread(fd, &datasize_n, sizeof(datasize_n), off); //printf("%s:%d r=%d the datasize=%d\n", __FILE__, __LINE__, r, ntohl(datasize_n)); if (r!=sizeof(datasize_n)) { if (r==-1) r=errno; @@ -261,42 +256,43 @@ int deserialize_brtnode_from (int fd, diskoff off, BRTNODE *brtnode, int nodesiz result->u.n.n_bytes_in_hashtable[i] = 0; } result->u.n.n_bytes_in_hashtables = 0; - for (i=0; iu.n.n_children; i++) { int r=hashtable_create(&result->u.n.htables[i]); if (r!=0) { int j; - if (0) { died_12: j=brtnode_n_hashtables(result); } + if (0) { died_12: j=result->u.n.n_bytes_in_hashtables; } for (j=0; ju.n.htables[j]); goto died1; } } { - int n_in_hash = rbuf_int(&rc); - //printf("%d in hash\n", n_in_hash); - - for (i=0; iu.n.htables[childnum], key, keylen, val, vallen); /* Copies the data into the hash table. */ - if (r!=0) { goto died_12; } + int cnum; + for (cnum=0; cnumu.n.n_children; cnum++) { + int n_in_this_hash = rbuf_int(&rc); + //printf("%d in hash\n", n_in_hash); + for (i=0; iu.n.htables[cnum], key, keylen, val, vallen); /* Copies the data into the hash table. */ + if (r!=0) { goto died_12; } + } + diff = keylen + vallen + KEY_VALUE_OVERHEAD; + result->u.n.n_bytes_in_hashtables += diff; + result->u.n.n_bytes_in_hashtable[cnum] += diff; + //printf("Inserted\n"); } - diff = keylen + vallen + KEY_VALUE_OVERHEAD; - result->u.n.n_bytes_in_hashtables += diff; - result->u.n.n_bytes_in_hashtable[childnum] += diff; - //printf("Inserted\n"); } } } else { int n_in_buf = rbuf_int(&rc); result->u.l.n_bytes_in_buffer = 0; - int r=pma_create(&result->u.l.buffer); + int r=pma_create(&result->u.l.buffer, default_compare_fun); if (r!=0) { if (0) { died_21: pma_free(&result->u.l.buffer); } goto died1; @@ -309,7 +305,8 @@ int deserialize_brtnode_from (int fd, diskoff off, BRTNODE *brtnode, int nodesiz rbuf_bytes(&rc, &key, &keylen); /* Returns a pointer into the rbuf. */ rbuf_bytes(&rc, &val, &vallen); { - int r = pma_insert(result->u.l.buffer, key, keylen, val, vallen); + DBT k,v; + int r = pma_insert(result->u.l.buffer, fill_dbt(&k, key, keylen), fill_dbt(&v, val, vallen), 0); if (r!=0) goto died_21; } result->u.l.n_bytes_in_buffer += keylen + vallen + KEY_VALUE_OVERHEAD; @@ -322,17 +319,6 @@ int deserialize_brtnode_from (int fd, diskoff off, BRTNODE *brtnode, int nodesiz return 0; } -unsigned int brtnode_which_child (BRTNODE node, bytevec key, ITEMLEN keylen) { - int i; - assert(node->height>0); - for (i=0; iu.n.n_children-1; i++) { - if (keycompare(key, keylen, node->u.n.childkeys[i], node->u.n.childkeylens[i])<=0) { - return i; - } - } - return node->u.n.n_children-1; -} - void verify_counts (BRTNODE node) { if (node->height==0) { assert(node->u.l.buffer); diff --git a/newbrt/brt-test.c b/newbrt/brt-test.c index 398fc03e963..e2d8c63001c 100644 --- a/newbrt/brt-test.c +++ b/newbrt/brt-test.c @@ -1,4 +1,5 @@ #include "brt.h" +#include "key.h" #include "memory.h" #include @@ -23,7 +24,7 @@ static void test0 (void) { assert(r==0); printf("%s:%d test0\n", __FILE__, __LINE__); unlink(fname); - r = open_brt(fname, 0, 1, &t, 1024, ct); + r = open_brt(fname, 0, 1, &t, 1024, ct, default_compare_fun); assert(r==0); printf("%s:%d test0\n", __FILE__, __LINE__); printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced); @@ -39,20 +40,20 @@ static void test1 (void) { int r; CACHETABLE ct; char fname[]="testbrt.brt"; + DBT k,v; memory_check=1; memory_check_all_free(); r = brt_create_cachetable(&ct, 0); assert(r==0); unlink(fname); - r = open_brt(fname, 0, 1, &t, 1024, ct); + r = open_brt(fname, 0, 1, &t, 1024, ct, default_compare_fun); assert(r==0); - brt_insert(t, "hello", 6, "there", 6); + brt_insert(t, fill_dbt(&k, "hello", 6), fill_dbt(&v, "there", 6), 0); { - bytevec val; ITEMLEN vallen; - r = brt_lookup(t, "hello", 6, &val, &vallen); + r = brt_lookup(t, fill_dbt(&k, "hello", 6), init_dbt(&v), 0); assert(r==0); - assert(strcmp(val, "there")==0); - assert(vallen==6); + assert(strcmp(v.data, "there")==0); + assert(v.size==6); } r = close_brt(t); assert(r==0); r = cachetable_close(ct); assert(r==0); @@ -71,14 +72,15 @@ static void test2 (int memcheck) { memory_check_all_free(); r = brt_create_cachetable(&ct, 0); assert(r==0); unlink(fname); - r = open_brt(fname, 0, 1, &t, 1024, ct); + r = open_brt(fname, 0, 1, &t, 1024, ct, default_compare_fun); printf("%s:%d did setup\n", __FILE__, __LINE__); assert(r==0); for (i=0; i<2048; i++) { + DBT k,v; char key[100],val[100]; snprintf(key,100,"hello%d",i); snprintf(val,100,"there%d",i); - brt_insert(t, key, 1+strlen(key), val, 1+strlen(val)); + brt_insert(t, fill_dbt(&k, key, 1+strlen(key)), fill_dbt(&v, val, 1+strlen(val)), 0); //printf("%s:%d did insert %d\n", __FILE__, __LINE__, i); if (0) { brt_flush(t); @@ -109,13 +111,14 @@ static void test3 (int nodesize, int count, int memcheck) { r = brt_create_cachetable(&ct, 0); assert(r==0); gettimeofday(&t0, 0); unlink(fname); - r = open_brt(fname, 0, 1, &t, nodesize, ct); + r = open_brt(fname, 0, 1, &t, nodesize, ct, default_compare_fun); assert(r==0); for (i=0; i=0) { char key[100], valexpected[100]; - bytevec val; - ITEMLEN vallen; + DBT k,v; if (i%1000==0) printf("r"); fflush(stdout); snprintf(key, 100, "key%d", rk); snprintf(valexpected, 100, "val%d", values[rk]); - r = brt_lookup(t, key, 1+strlen(key), &val, &vallen); + r = brt_lookup(t, fill_dbt(&k, key, 1+strlen(key)), init_dbt(&v), 0); assert(r==0); - assert(vallen==(1+strlen(valexpected))); - assert(memcmp(val,valexpected,vallen)==0); + assert(v.size==(1+strlen(valexpected))); + assert(memcmp(v.data,valexpected,v.size)==0); } } printf("\n"); @@ -213,7 +217,7 @@ static void test_dump_empty_db (void) { r = brt_create_cachetable(&ct, 0); assert(r==0); unlink(fname); - r = open_brt(fname, 0, 1, &t, 1024, ct); + r = open_brt(fname, 0, 1, &t, 1024, ct, default_compare_fun); assert(r==0); dump_brt(t); r = close_brt(t); assert(r==0); @@ -233,15 +237,16 @@ static void test_multiple_files_of_size (int size) { unlink(n1); memory_check_all_free(); r = brt_create_cachetable(&ct, 0); assert(r==0); - r = open_brt(n0, 0, 1, &t0, size, ct); assert(r==0); - r = open_brt(n1, 0, 1, &t1, size, ct); assert(r==0); + r = open_brt(n0, 0, 1, &t0, size, ct, default_compare_fun); assert(r==0); + r = open_brt(n1, 0, 1, &t1, size, ct, default_compare_fun); assert(r==0); for (i=0; i<10000; i++) { char key[100],val[100]; + DBT k,v; snprintf(key, 100, "key%d", i); snprintf(val, 100, "val%d", i); - brt_insert(t0, key, 1+strlen(key), val, 1+strlen(val)); + brt_insert(t0, fill_dbt(&k, key, 1+strlen(key)), fill_dbt(&v, val, 1+strlen(val)), 0); snprintf(val, 100, "Val%d", i); - brt_insert(t1, key, 1+strlen(key), val, 1+strlen(val)); + brt_insert(t1, fill_dbt(&k, key, 1+strlen(key)), fill_dbt(&v, val, 1+strlen(val)), 0); } //verify_brt(t0); //dump_brt(t0); @@ -256,26 +261,25 @@ static void test_multiple_files_of_size (int size) { /* Now see if the data is all there. */ r = brt_create_cachetable(&ct, 0); assert(r==0); - r = open_brt(n0, 0, 0, &t0, 1<<12, ct); + r = open_brt(n0, 0, 0, &t0, 1<<12, ct, default_compare_fun); printf("%s:%d r=%d\n", __FILE__, __LINE__,r); assert(r==0); - r = open_brt(n1, 0, 0, &t1, 1<<12, ct); assert(r==0); + r = open_brt(n1, 0, 0, &t1, 1<<12, ct, default_compare_fun); assert(r==0); for (i=0; i<10000; i++) { char key[100],val[100]; - bytevec actualval; - ITEMLEN actuallen; + DBT k,actual; snprintf(key, 100, "key%d", i); snprintf(val, 100, "val%d", i); - r=brt_lookup(t0, key, 1+strlen(key), &actualval, &actuallen); + r=brt_lookup(t0, fill_dbt(&k, key, 1+strlen(key)), init_dbt(&actual), 0); assert(r==0); - assert(strcmp(val,actualval)==0); - assert(actuallen==1+strlen(val)); + assert(strcmp(val,actual.data)==0); + assert(actual.size==1+strlen(val)); snprintf(val, 100, "Val%d", i); - r=brt_lookup(t1, key, 1+strlen(key), &actualval, &actuallen); + r=brt_lookup(t1, fill_dbt(&k, key, 1+strlen(key)), init_dbt(&actual), 0); assert(r==0); - assert(strcmp(val,actualval)==0); - assert(actuallen==1+strlen(val)); + assert(strcmp(val,actual.data)==0); + assert(actual.size==1+strlen(val)); } r = close_brt(t0); assert(r==0); @@ -295,14 +299,17 @@ static void test_named_db (void) { CACHETABLE ct; BRT t0; int r; + DBT k,v; + printf("test_named_db\n"); unlink(n0); unlink(n1); memory_check_all_free(); r = brt_create_cachetable(&ct, 0); assert(r==0); - r = open_brt(n0, "db1", 1, &t0, 1<<12, ct); assert(r==0); + r = open_brt(n0, "db1", 1, &t0, 1<<12, ct, default_compare_fun); assert(r==0); - brt_insert(t0, "good", 5, "day", 4); assert(r==0); + + brt_insert(t0, fill_dbt(&k, "good", 5), fill_dbt(&v, "day", 4), 0); assert(r==0); r = close_brt(t0); assert(r==0); r = cachetable_close(ct); assert(r==0); @@ -310,15 +317,13 @@ static void test_named_db (void) { memory_check_all_free(); r = brt_create_cachetable(&ct, 0); assert(r==0); - r = open_brt(n0, "db1", 0, &t0, 1<<12, ct); assert(r==0); + r = open_brt(n0, "db1", 0, &t0, 1<<12, ct, default_compare_fun); assert(r==0); { - bytevec val; - ITEMLEN vallen; - r = brt_lookup(t0, "good", 5, &val, &vallen); + r = brt_lookup(t0, fill_dbt(&k, "good", 5), init_dbt(&v), 0); assert(r==0); - assert(vallen==4); - assert(strcmp(val,"day")==0); + assert(v.size==4); + assert(strcmp(v.data,"day")==0); } r = close_brt(t0); assert(r==0); @@ -332,16 +337,17 @@ static void test_multiple_dbs (void) { CACHETABLE ct; BRT t0,t1; int r; + DBT k,v; printf("test_multiple_dbs: "); unlink(n0); unlink(n1); memory_check_all_free(); r = brt_create_cachetable(&ct, 0); assert(r==0); - r = open_brt(n0, "db1", 1, &t0, 1<<12, ct); assert(r==0); - r = open_brt(n1, "db2", 1, &t1, 1<<12, ct); assert(r==0); + r = open_brt(n0, "db1", 1, &t0, 1<<12, ct, default_compare_fun); assert(r==0); + r = open_brt(n1, "db2", 1, &t1, 1<<12, ct, default_compare_fun); assert(r==0); - brt_insert(t0, "good", 5, "grief", 6); assert(r==0); - brt_insert(t1, "bad", 4, "night", 6); assert(r==0); + brt_insert(t0, fill_dbt(&k, "good", 5), fill_dbt(&v, "grief", 6), 0); assert(r==0); + brt_insert(t1, fill_dbt(&k, "bad", 4), fill_dbt(&v, "night", 6), 0); assert(r==0); r = close_brt(t0); assert(r==0); r = close_brt(t1); assert(r==0); @@ -350,27 +356,25 @@ static void test_multiple_dbs (void) { memory_check_all_free(); r = brt_create_cachetable(&ct, 0); assert(r==0); - r = open_brt(n0, "db1", 0, &t0, 1<<12, ct); assert(r==0); - r = open_brt(n1, "db2", 0, &t1, 1<<12, ct); assert(r==0); + r = open_brt(n0, "db1", 0, &t0, 1<<12, ct, default_compare_fun); assert(r==0); + r = open_brt(n1, "db2", 0, &t1, 1<<12, ct, default_compare_fun); assert(r==0); { - bytevec val; - ITEMLEN vallen; - r = brt_lookup(t0, "good", 5, &val, &vallen); + r = brt_lookup(t0, fill_dbt(&k, "good", 5), init_dbt(&v), 0); assert(r==0); - assert(vallen==6); - assert(strcmp(val,"grief")==0); + assert(v.size==6); + assert(strcmp(v.data,"grief")==0); - r = brt_lookup(t1, "good", 5, &val, &vallen); + r = brt_lookup(t1, fill_dbt(&k, "good", 5), init_dbt(&v), 0); assert(r!=0); - r = brt_lookup(t0, "bad", 4, &val, &vallen); + r = brt_lookup(t0, fill_dbt(&k, "bad", 4), init_dbt(&v), 0); assert(r!=0); - r = brt_lookup(t1, "bad", 4, &val, &vallen); + r = brt_lookup(t1, fill_dbt(&k, "bad", 4), init_dbt(&v), 0); assert(r==0); - assert(vallen==6); - assert(strcmp(val,"night")==0); + assert(v.size==6); + assert(strcmp(v.data,"night")==0); } r = close_brt(t0); assert(r==0); @@ -395,14 +399,15 @@ static void test_multiple_dbs_many (void) { for (i=0; iu.n.n_bytes_in_hashtables = 0; } else { - int r = pma_create(&n->u.l.buffer); + int r = pma_create(&n->u.l.buffer, t->compare_fun); static int rcount=0; assert(r==0); //printf("%s:%d n PMA= %p (rcount=%d)\n", __FILE__, __LINE__, n->u.l.buffer, rcount); @@ -262,16 +262,16 @@ void delete_node (BRT t, BRTNODE node) { } -static void insert_to_buffer_in_leaf (BRTNODE node, bytevec key, unsigned int keylen, bytevec val, unsigned int vallen) { - unsigned int n_bytes_added = KEY_VALUE_OVERHEAD + keylen + vallen; - int r = pma_insert(node->u.l.buffer, key, keylen, val, vallen); +static void insert_to_buffer_in_leaf (BRTNODE node, DBT *k, DBT *v, DB *db) { + unsigned int n_bytes_added = KEY_VALUE_OVERHEAD + k->size + v->size; + int r = pma_insert(node->u.l.buffer, k, v, db); assert(r==0); node->u.l.n_bytes_in_buffer += n_bytes_added; } -static int insert_to_hash_in_nonleaf (BRTNODE node, int childnum, bytevec key, unsigned int keylen, bytevec val, unsigned int vallen) { - unsigned int n_bytes_added = KEY_VALUE_OVERHEAD + keylen + vallen; - int r = hash_insert(node->u.n.htables[childnum], key, keylen, val, vallen); +static int insert_to_hash_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT *v) { + unsigned int n_bytes_added = KEY_VALUE_OVERHEAD + k->size + v->size; + int r = hash_insert(node->u.n.htables[childnum], k->data, k->size, v->data, v->size); if (r!=0) return r; node->u.n.n_bytes_in_hashtable[childnum] += n_bytes_added; node->u.n.n_bytes_in_hashtables += n_bytes_added; @@ -279,7 +279,7 @@ static int insert_to_hash_in_nonleaf (BRTNODE node, int childnum, bytevec key, u } -int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, bytevec*splitkey, ITEMLEN *splitkeylen) { +int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, DB *db) { int did_split=0; BRTNODE A,B; assert(node->height==0); @@ -296,15 +296,15 @@ int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, bytevec* assert(node->height>0 || node->u.l.buffer!=0); PMA_ITERATE(node->u.l.buffer, key, keylen, val, vallen, ({ + DBT k,v; if (!did_split) { - insert_to_buffer_in_leaf(A, key, keylen, val, vallen); + insert_to_buffer_in_leaf(A, fill_dbt(&k, key, keylen), fill_dbt(&v, val, vallen), db); if (A->u.l.n_bytes_in_buffer *2 >= node->u.l.n_bytes_in_buffer) { - *splitkey = memdup(key, keylen); - *splitkeylen = keylen; + fill_dbt(splitk, memdup(key, keylen), keylen); did_split=1; } } else { - insert_to_buffer_in_leaf(B, key, keylen, val, vallen); + insert_to_buffer_in_leaf(B, fill_dbt(&k, key, keylen), fill_dbt(&v, val, vallen), db); } })); assert(node->height>0 || node->u.l.buffer!=0); @@ -320,7 +320,8 @@ int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, bytevec* return 0; } -void brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, bytevec*splitkey, ITEMLEN *splitkeylen) { +/* Side effect: sets splitk->data pointer to a malloc'd value */ +void brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk) { int n_children_in_a = node->u.n.n_children/2; BRTNODE A,B; assert(node->height>0); @@ -363,8 +364,8 @@ void brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, byt node->u.n.childkeys[i] = 0; node->u.n.childkeylens[i] = 0; } - *splitkey = node->u.n.childkeys[n_children_in_a-1]; - *splitkeylen = node->u.n.childkeylens[n_children_in_a-1]; + splitk->data = (void*)(node->u.n.childkeys[n_children_in_a-1]); + splitk->size = node->u.n.childkeylens[n_children_in_a-1]; node->u.n.totalchildkeylens -= node->u.n.childkeylens[n_children_in_a-1]; node->u.n.childkeys[n_children_in_a-1]=0; node->u.n.childkeylens[n_children_in_a-1]=0; @@ -470,60 +471,70 @@ void find_heaviest_data (BRTNODE node, int *childnum_ret, KVPAIR *pairs_ret, int } #endif -static int brtnode_insert (BRT t, BRTNODE node, bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN vallen, - int *did_split, BRTNODE *nodea, BRTNODE *nodeb, bytevec*splitkey, ITEMLEN *splitkeylen, - int debug); +static int brtnode_insert (BRT t, BRTNODE node, DBT *k, DBT *v, + int *did_split, BRTNODE *nodea, BRTNODE *nodeb, + DBT *split, + int debug, + DB *db); /* key is not in the hashtable in node. Either put the key-value pair in the child, or put it in the node. */ static int push_kvpair_down_only_if_it_wont_push_more_else_put_here (BRT t, BRTNODE node, BRTNODE child, - bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN vallen, - int childnum_of_node) { + DBT *k, DBT *v, + int childnum_of_node, + DB *db) { assert(node->height>0); /* Not a leaf. */ - int to_child=serialize_brtnode_size(child)+keylen+vallen+KEY_VALUE_OVERHEAD <= child->nodesize; + int to_child=serialize_brtnode_size(child)+k->size+v->size+KEY_VALUE_OVERHEAD <= child->nodesize; if (brt_debug_mode) { - printf("%s:%d pushing %s to %s %d", __FILE__, __LINE__, (char*)key, to_child? "child" : "hash", childnum_of_node); + printf("%s:%d pushing %s to %s %d", __FILE__, __LINE__, (char*)k->data, to_child? "child" : "hash", childnum_of_node); if (childnum_of_node+1u.n.n_children) { + DBT k2; printf(" nextsplitkey=%s\n", (char*)node->u.n.childkeys[childnum_of_node]); - assert(keycompare(key, keylen, node->u.n.childkeys[childnum_of_node], node->u.n.childkeylens[childnum_of_node])<=0); + assert(t->compare_fun(db, k, fill_dbt(&k2, node->u.n.childkeys[childnum_of_node], node->u.n.childkeylens[childnum_of_node]))<=0); } else { printf("\n"); } } if (to_child) { - int again_split=-1; BRTNODE againa,againb; bytevec againkey; ITEMLEN againlen; + int again_split=-1; BRTNODE againa,againb; + DBT againk; + init_dbt(&againk); //printf("%s:%d hello!\n", __FILE__, __LINE__); - int r = brtnode_insert(t, child, key, keylen, val, vallen, - &again_split, &againa, &againb, &againkey, &againlen, - 0); + int r = brtnode_insert(t, child, k, v, + &again_split, &againa, &againb, &againk, + 0, + db); if (r!=0) return r; assert(again_split==0); /* I only did the insert if I knew it wouldn't push down, and hence wouldn't split. */ return r; } else { - int r=insert_to_hash_in_nonleaf(node, childnum_of_node, key, keylen, val, vallen); + int r=insert_to_hash_in_nonleaf(node, childnum_of_node, k, v); return r; } } static int push_a_kvpair_down (BRT t, BRTNODE node, BRTNODE child, int childnum, - bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN vallen, - int *child_did_split, BRTNODE *childa, BRTNODE *childb, bytevec*childsplitkey, ITEMLEN *childsplitkeylen) { + DBT *k, DBT *v, + int *child_did_split, BRTNODE *childa, BRTNODE *childb, + DBT *childsplitk, + DB *db) { //if (debug) printf("%s:%d %*sinserting down\n", __FILE__, __LINE__, debug, ""); //printf("%s:%d hello!\n", __FILE__, __LINE__); assert(node->height>0); { - int r = brtnode_insert(t, child, key, keylen, val, vallen, - child_did_split, childa, childb, childsplitkey, childsplitkeylen, - 0); + int r = brtnode_insert(t, child, k, v, + child_did_split, childa, childb, childsplitk, + 0, + db); if (r!=0) return r; } //if (debug) printf("%s:%d %*sinserted down child_did_split=%d\n", __FILE__, __LINE__, debug, "", child_did_split); { - int r = hash_delete(node->u.n.htables[childnum], key, keylen); // Must delete after doing the insert, to avoid operating on freed' key + int r = hash_delete(node->u.n.htables[childnum], k->data, k->size); // Must delete after doing the insert, to avoid operating on freed' key if (r!=0) return r; } { - int n_bytes_removed = (keylen + vallen + KEY_VALUE_OVERHEAD); + int n_bytes_removed = (k->size + k->size + KEY_VALUE_OVERHEAD); node->u.n.n_bytes_in_hashtables -= n_bytes_removed; node->u.n.n_bytes_in_hashtable[childnum] -= n_bytes_removed; } @@ -540,8 +551,11 @@ int split_count=0; * We also unpin the new children. */ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, - BRTNODE childa, BRTNODE childb, bytevec childsplitkey, ITEMLEN childsplitkeylen, - int *did_split, BRTNODE *nodea, BRTNODE *nodeb, bytevec*splitkey, ITEMLEN *splitkeylen) { + BRTNODE childa, BRTNODE childb, + DBT *childsplitk, /* the data in the childsplitk is alloc'd and is consumed by this call. */ + int *did_split, BRTNODE *nodea, BRTNODE *nodeb, + DBT *splitk, + DB *db) { assert(node->height>0); HASHTABLE old_h = node->u.n.htables[childnum]; int old_count = node->u.n.n_bytes_in_hashtable[childnum]; @@ -551,7 +565,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, if (brt_debug_mode) { int i; - printf("%s:%d Child %d did split on %s\n", __FILE__, __LINE__, childnum, (char*)childsplitkey); + printf("%s:%d Child %d did split on %s\n", __FILE__, __LINE__, childnum, (char*)childsplitk->data); printf("%s:%d oldsplitkeys:", __FILE__, __LINE__); for(i=0; iu.n.n_children-1; i++) printf(" %s", (char*)node->u.n.childkeys[i]); printf("\n"); @@ -574,9 +588,9 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, node->u.n.childkeys[cnum] = node->u.n.childkeys[cnum-1]; node->u.n.childkeylens[cnum] = node->u.n.childkeylens[cnum-1]; } - node->u.n.childkeys[childnum]=childsplitkey; - node->u.n.childkeylens[childnum]= childsplitkeylen; - node->u.n.totalchildkeylens += childsplitkeylen; + node->u.n.childkeys[childnum]= (char*)childsplitk->data; + node->u.n.childkeylens[childnum]= childsplitk->size; + node->u.n.totalchildkeylens += childsplitk->size; node->u.n.n_children++; if (brt_debug_mode) { @@ -589,10 +603,13 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, node->u.n.n_bytes_in_hashtables -= old_count; /* By default, they are all removed. We might add them back in. */ /* Keep pushing to the children, but not if the children would require a pushdown */ HASHTABLE_ITERATE(old_h, skey, skeylen, sval, svallen, ({ - if (keycompare(skey, skeylen, childsplitkey, childsplitkeylen)<=0) { - r=push_kvpair_down_only_if_it_wont_push_more_else_put_here(t, node, childa, skey, skeylen, sval, svallen, childnum); + DBT skd, svd; + fill_dbt(&skd, skey, skeylen); skd.app_private=childsplitk->app_private; + fill_dbt(&svd, sval, svallen); + if (t->compare_fun(db, &skd, childsplitk)<=0) { + r=push_kvpair_down_only_if_it_wont_push_more_else_put_here(t, node, childa, &skd, &svd, childnum, db); } else { - r=push_kvpair_down_only_if_it_wont_push_more_else_put_here(t, node, childb, skey, skeylen, sval, svallen, childnum+1); + r=push_kvpair_down_only_if_it_wont_push_more_else_put_here(t, node, childb, &skd, &svd, childnum+1, db); } if (r!=0) return r; })); @@ -610,7 +627,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, if (node->u.n.n_children>TREE_FANOUT) { //printf("%s:%d about to split having pushed %d out of %d keys\n", __FILE__, __LINE__, i, n_pairs); - brt_nonleaf_split(t, node, nodea, nodeb, splitkey, splitkeylen); + brt_nonleaf_split(t, node, nodea, nodeb, splitk); //printf("%s:%d did split\n", __FILE__, __LINE__); split_count++; *did_split=1; @@ -630,8 +647,10 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, } static int push_some_kvpairs_down (BRT t, BRTNODE node, int childnum, - int *did_split, BRTNODE *nodea, BRTNODE *nodeb, bytevec *splitkey, ITEMLEN *splitkeylen, - int debug) { + int *did_split, BRTNODE *nodea, BRTNODE *nodeb, + DBT *splitk, + int debug, + DB *db) { void *childnode_v; BRTNODE child; int r; @@ -654,18 +673,24 @@ static int push_some_kvpairs_down (BRT t, BRTNODE node, int childnum, bytevec key,val; ITEMLEN keylen, vallen; while(0==hashtable_random_pick(node->u.n.htables[childnum], &key, &keylen, &val, &vallen)) { - int child_did_split=0; BRTNODE childa, childb; bytevec childsplitkey; ITEMLEN childsplitkeylen; + int child_did_split=0; BRTNODE childa, childb; + DBT hk,hv; + DBT childsplitk; + init_dbt(&childsplitk); + childsplitk.app_private = splitk->app_private; if (debug) printf("%s:%d %*spush down %s\n", __FILE__, __LINE__, debug, "", (char*)key); r = push_a_kvpair_down (t, node, child, childnum, - key, keylen, val, vallen, - &child_did_split, &childa, &childb, &childsplitkey, &childsplitkeylen); + fill_dbt(&hk, key, keylen), fill_dbt(&hv, val, vallen), + &child_did_split, &childa, &childb, + &childsplitk, + db); if (r!=0) return r; if (child_did_split) { // If the child splits, we don't push down any further. - if (debug) printf("%s:%d %*shandle split splitkey=%s\n", __FILE__, __LINE__, debug, "", (char*)childsplitkey); + if (debug) printf("%s:%d %*shandle split splitkey=%s\n", __FILE__, __LINE__, debug, "", (char*)childsplitk.data); r=handle_split_of_child (t, node, childnum, - childa, childb, childsplitkey, childsplitkeylen, - did_split, nodea, nodeb, splitkey, splitkeylen); + childa, childb, &childsplitk, + did_split, nodea, nodeb, splitk, db); return r; /* Don't do any more pushing if the child splits. */ } } @@ -681,7 +706,7 @@ int debugp1 (int debug) { return debug ? debug+1 : 0; } -static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, bytevec *splitkey, ITEMLEN *splitkeylen, int debug) +static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, int debug, DB *db) /* If the buffer is too full, then push down. Possibly the child will split. That may make us split. */ { assert(node->height>0); @@ -697,7 +722,7 @@ static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE find_heaviest_child(node, &childnum); if (debug) printf("%s:%d %*spush some down from %lld into %lld\n", __FILE__, __LINE__, debug, "", node->thisnodename, node->u.n.children[childnum]); assert(node->u.n.children[childnum]!=0); - int r = push_some_kvpairs_down(t, node, childnum, did_split, nodea, nodeb, splitkey, splitkeylen, debugp1(debug)); + int r = push_some_kvpairs_down(t, node, childnum, did_split, nodea, nodeb, splitk, debugp1(debug), db); if (r!=0) return r; assert(*did_split==0 || *did_split==1); if (debug) printf("%s:%d %*sdid push_some_kvpairs_down did_split=%d\n", __FILE__, __LINE__, debug, "", *did_split); @@ -719,22 +744,22 @@ static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE return 0; } -static int brt_leaf_insert (BRT t, BRTNODE node, bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN vallen, - int *did_split, BRTNODE *nodea, BRTNODE *nodeb, bytevec*splitkey, ITEMLEN *splitkeylen, - int debug) { - bytevec olddata; - ITEMLEN olddatalen; - enum pma_errors pma_status = pma_lookup(node->u.l.buffer, key, keylen, &olddata, &olddatalen); +static int brt_leaf_insert (BRT t, BRTNODE node, DBT *k, DBT *v, + int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, + int debug, + DB *db) { + DBT v2; + enum pma_errors pma_status = pma_lookup(node->u.l.buffer, k, init_dbt(&v2), db); if (pma_status==BRT_OK) { - pma_status = pma_delete(node->u.l.buffer, key, keylen); + pma_status = pma_delete(node->u.l.buffer, k, db); assert(pma_status==BRT_OK); - node->u.l.n_bytes_in_buffer -= keylen + olddatalen + KEY_VALUE_OVERHEAD; + node->u.l.n_bytes_in_buffer -= k->size + v2.size + KEY_VALUE_OVERHEAD; } - pma_status = pma_insert(node->u.l.buffer, key, keylen, val, vallen); - node->u.l.n_bytes_in_buffer += keylen + vallen + KEY_VALUE_OVERHEAD; + pma_status = pma_insert(node->u.l.buffer, k, v, db); + node->u.l.n_bytes_in_buffer += k->size + v->size + KEY_VALUE_OVERHEAD; // If it doesn't fit, then split the leaf. if (serialize_brtnode_size(node) > node->nodesize) { - int r = brtleaf_split (t, node, nodea, nodeb, splitkey, splitkeylen); + int r = brtleaf_split (t, node, nodea, nodeb, splitk, db); if (r!=0) return r; //printf("%s:%d splitkey=%s\n", __FILE__, __LINE__, (char*)*splitkey); split_count++; @@ -749,14 +774,28 @@ static int brt_leaf_insert (BRT t, BRTNODE node, bytevec key, ITEMLEN keylen, by return 0; } -static int brt_nonleaf_insert (BRT t, BRTNODE node, bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN vallen, - int *did_split, BRTNODE *nodea, BRTNODE *nodeb, bytevec*splitkey, ITEMLEN *splitkeylen, - int debug) { +static unsigned int brtnode_which_child (BRTNODE node , DBT *k, BRT t, DB *db) { + int i; + assert(node->height>0); + for (i=0; iu.n.n_children-1; i++) { + DBT k2; + if (t->compare_fun(db, k, fill_dbt(&k2, node->u.n.childkeys[i], node->u.n.childkeylens[i]))<=0) { + return i; + } + } + return node->u.n.n_children-1; +} + +static int brt_nonleaf_insert (BRT t, BRTNODE node, DBT *k, DBT *v, + int *did_split, BRTNODE *nodea, BRTNODE *nodeb, + DBT *splitk, + int debug, + DB *db) { bytevec olddata; ITEMLEN olddatalen; - unsigned int childnum = brtnode_which_child(node, key, keylen); - int found = !hash_find(node->u.n.htables[childnum], key, keylen, &olddata, &olddatalen); + unsigned int childnum = brtnode_which_child(node, k, t, db); + int found = !hash_find(node->u.n.htables[childnum], k->data, k->size, &olddata, &olddatalen); if (0) { // It is faster to do this, except on yobiduck where things grind to a halt. void *child_v; @@ -765,8 +804,8 @@ static int brt_nonleaf_insert (BRT t, BRTNODE node, bytevec key, ITEMLEN keylen, /* If the child is in memory, then go ahead and put it in the child. */ BRTNODE child = child_v; if (found) { - int diff = keylen + olddatalen + KEY_VALUE_OVERHEAD; - int r = hash_delete(node->u.n.htables[childnum], key, keylen); + int diff = k->size + olddatalen + KEY_VALUE_OVERHEAD; + int r = hash_delete(node->u.n.htables[childnum], k->data, k->size); assert(r==0); node->u.n.n_bytes_in_hashtables -= diff; node->u.n.n_bytes_in_hashtable[childnum] -= diff; @@ -774,15 +813,14 @@ static int brt_nonleaf_insert (BRT t, BRTNODE node, bytevec key, ITEMLEN keylen, { int child_did_split; BRTNODE childa, childb; - bytevec childsplitkey; - ITEMLEN childsplitkeylen; - int r = brtnode_insert(t, child, key, keylen, val, vallen, - &child_did_split, &childa, &childb, &childsplitkey, &childsplitkeylen, 0); + DBT childsplitk; + int r = brtnode_insert(t, child, k, v, + &child_did_split, &childa, &childb, &childsplitk, 0, db); if (r!=0) return r; if (child_did_split) { r=handle_split_of_child(t, node, childnum, - childa, childb, childsplitkey, childsplitkeylen, - did_split, nodea, nodeb, splitkey, splitkeylen); + childa, childb, &childsplitk, + did_split, nodea, nodeb, splitk, db); if (r!=0) return r; } else { cachetable_unpin(t->cf, child->thisnodename, 1); @@ -796,23 +834,23 @@ static int brt_nonleaf_insert (BRT t, BRTNODE node, bytevec key, ITEMLEN keylen, if (debug) printf("%s:%d %*sDoing hash_insert\n", __FILE__, __LINE__, debug, ""); verify_counts(node); if (found) { - int r = hash_delete(node->u.n.htables[childnum], key, keylen); - int diff = keylen + olddatalen + KEY_VALUE_OVERHEAD; + int r = hash_delete(node->u.n.htables[childnum], k->data, k->size); + int diff = k->size + olddatalen + KEY_VALUE_OVERHEAD; assert(r==0); node->u.n.n_bytes_in_hashtables -= diff; node->u.n.n_bytes_in_hashtable[childnum] -= diff; //printf("%s:%d deleted %d bytes\n", __FILE__, __LINE__, diff); } { - int diff = keylen + vallen + KEY_VALUE_OVERHEAD; - int r=hash_insert(node->u.n.htables[childnum], key, keylen, val, vallen); + int diff = k->size + v->size + KEY_VALUE_OVERHEAD; + int r=hash_insert(node->u.n.htables[childnum], k->data, k->size, v->data, v->size); assert(r==0); node->u.n.n_bytes_in_hashtables += diff; node->u.n.n_bytes_in_hashtable[childnum] += diff; } if (debug) printf("%s:%d %*sDoing maybe_push_down\n", __FILE__, __LINE__, debug, ""); - int r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitkey, splitkeylen, debugp1(debug)); + int r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), db); if (r!=0) return r; if (debug) printf("%s:%d %*sDid maybe_push_down\n", __FILE__, __LINE__, debug, ""); if (*did_split) { @@ -832,17 +870,20 @@ static int brt_nonleaf_insert (BRT t, BRTNODE node, bytevec key, ITEMLEN keylen, } -static int brtnode_insert (BRT t, BRTNODE node, bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN vallen, - int *did_split, BRTNODE *nodea, BRTNODE *nodeb, bytevec*splitkey, ITEMLEN *splitkeylen, - int debug) { +static int brtnode_insert (BRT t, BRTNODE node, DBT *k, DBT *v, + int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, + int debug, + DB *db) { if (node->height==0) { - return brt_leaf_insert(t, node, key, keylen, val, vallen, - did_split, nodea, nodeb, splitkey, splitkeylen, - debug); + return brt_leaf_insert(t, node, k, v, + did_split, nodea, nodeb, splitk, + debug, + db); } else { - return brt_nonleaf_insert(t, node, key, keylen, val, vallen, - did_split, nodea, nodeb, splitkey, splitkeylen, - debug); + return brt_nonleaf_insert(t, node, k, v, + did_split, nodea, nodeb, splitk, + debug, + db); } } @@ -889,7 +930,8 @@ static int setup_brt_root_node (BRT t, diskoff offset) { #define WHEN_BRTTRACE(x) ((void)0) #endif -int open_brt (const char *fname, const char *dbname, int is_create, BRT *newbrt, int nodesize, CACHETABLE cachetable) { +int open_brt (const char *fname, const char *dbname, int is_create, BRT *newbrt, int nodesize, CACHETABLE cachetable, + int (*compare_fun)(DB*,DBT*,DBT*)) { /* If dbname is NULL then we setup to hold a single tree. Otherwise we setup an array. */ int r; BRT t; @@ -903,6 +945,8 @@ int open_brt (const char *fname, const char *dbname, int is_create, BRT *newbrt, if (0) { died0: toku_free(t); } return r; } + t->compare_fun = compare_fun; + t->skey = t->sval = 0; if (dbname) { malloced_name = mystrdup(dbname); if (malloced_name==0) { @@ -1003,6 +1047,8 @@ int close_brt (BRT brt) { //printf("%s:%d closing cachetable\n", __FILE__, __LINE__); if ((r = cachefile_close(brt->cf))!=0) return r; if (brt->database_name) toku_free(brt->database_name); + free(brt->skey); + free(brt->sval); toku_free(brt); return 0; } @@ -1023,12 +1069,13 @@ CACHEKEY* calculate_root_offset_pointer (BRT brt) { abort(); } -int brt_insert (BRT brt, bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN vallen) { +int brt_insert (BRT brt, DBT *k, DBT *v, DB* db) { void *node_v; BRTNODE node; CACHEKEY *rootp; int r; - int did_split; BRTNODE nodea=0, nodeb=0; bytevec splitkey; ITEMLEN splitkeylen; + int did_split; BRTNODE nodea=0, nodeb=0; + DBT splitk; int debug = brt_debug_mode;//strcmp(key,"hello387")==0; //assert(0==cachetable_assert_all_unpinned(brt->cachetable)); if ((r = read_and_pin_brt_header(brt->cf, &brt->h))) { @@ -1043,9 +1090,9 @@ int brt_insert (BRT brt, bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN valle } node=node_v; if (debug) printf("%s:%d node inserting\n", __FILE__, __LINE__); - r = brtnode_insert(brt, node, key, keylen, val, vallen, - &did_split, &nodea, &nodeb, &splitkey, &splitkeylen, - debug); + r = brtnode_insert(brt, node, k, v, + &did_split, &nodea, &nodeb, &splitk, + debug, db); if (r!=0) return r; if (debug) printf("%s:%d did_insert\n", __FILE__, __LINE__); if (did_split) { @@ -1064,9 +1111,9 @@ int brt_insert (BRT brt, bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN valle initialize_brtnode (brt, newroot, newroot_diskoff, nodea->height+1); newroot->u.n.n_children=2; //printf("%s:%d Splitkey=%p %s\n", __FILE__, __LINE__, splitkey, splitkey); - newroot->u.n.childkeys[0] = splitkey; - newroot->u.n.childkeylens[0] = splitkeylen; - newroot->u.n.totalchildkeylens=splitkeylen; + newroot->u.n.childkeys[0] = splitk.data; + newroot->u.n.childkeylens[0] = splitk.size; + newroot->u.n.totalchildkeylens=splitk.size; newroot->u.n.children[0]=nodea->thisnodename; newroot->u.n.children[1]=nodeb->thisnodename; r=hashtable_create(&newroot->u.n.htables[0]); if (r!=0) return r; @@ -1090,12 +1137,11 @@ int brt_insert (BRT brt, bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN valle // This is pretty ugly. static unsigned char lookup_result[1000000]; -int brt_lookup_node (BRT brt, diskoff off, bytevec key, ITEMLEN keylen, bytevec *val, ITEMLEN *vallen) { +int brt_lookup_node (BRT brt, diskoff off, DBT *k, DBT *v, DB *db) { void *node_v; int r = cachetable_get_and_pin(brt->cf, off, &node_v, brtnode_flush_callback, brtnode_fetch_callback, (void*)brt->h->nodesize); - bytevec answer; - ITEMLEN answerlen; + DBT answer; BRTNODE node; int childnum; if (r!=0) { @@ -1107,28 +1153,30 @@ int brt_lookup_node (BRT brt, diskoff off, bytevec key, ITEMLEN keylen, bytevec } node=node_v; if (node->height==0) { - r = pma_lookup(node->u.l.buffer, key, keylen, &answer, &answerlen); + r = pma_lookup(node->u.l.buffer, k, &answer, db); //printf("%s:%d looked up something, got answerlen=%d\n", __FILE__, __LINE__, answerlen); if (r!=0) goto died0; if (r==0) { - *val = answer; - *vallen = answerlen; + *v = answer; } r = cachetable_unpin(brt->cf, off, 0); return r; } - childnum = brtnode_which_child(node, key, keylen); + childnum = brtnode_which_child(node, k, brt, db); // Leaves have a single mdict, where the data is found. - if (hash_find (node->u.n.htables[childnum], key, keylen, &answer, vallen)==0) { - //printf("Found %d bytes\n", *vallen); - assert(*vallen<=(int)(sizeof(lookup_result))); - memcpy(lookup_result, answer, *vallen); - //printf("Returning %s\n", lookup_result); - *val = lookup_result; - r = cachetable_unpin(brt->cf, off, 0); - assert(r==0); - return 0; + { + bytevec hanswer; + ITEMLEN hanswerlen; + if (hash_find (node->u.n.htables[childnum], k->data, k->size, &hanswer, &hanswerlen)==0) { + //printf("Found %d bytes\n", *vallen); + assert(hanswerlen<=(int)(sizeof(lookup_result))); + ybt_set_value(v, hanswer, hanswerlen, &brt->sval); + //printf("Returning %s\n", lookup_result); + r = cachetable_unpin(brt->cf, off, 0); + assert(r==0); + return 0; + } } if (node->height==0) { r = cachetable_unpin(brt->cf, off, 0); @@ -1136,7 +1184,7 @@ int brt_lookup_node (BRT brt, diskoff off, bytevec key, ITEMLEN keylen, bytevec else return r; } { - int result = brt_lookup_node(brt, node->u.n.children[childnum], key, keylen, val, vallen); + int result = brt_lookup_node(brt, node->u.n.children[childnum], k, v, db); r = cachetable_unpin(brt->cf, off, 0); if (r!=0) return r; return result; @@ -1144,7 +1192,7 @@ int brt_lookup_node (BRT brt, diskoff off, bytevec key, ITEMLEN keylen, bytevec } -int brt_lookup (BRT brt, bytevec key, unsigned int keylen, bytevec*val, unsigned int *vallen) { +int brt_lookup (BRT brt, DBT *k, DBT *v, DB *db) { int r; CACHEKEY *rootp; assert(0==cachefile_count_pinned(brt->cf, 1)); @@ -1156,7 +1204,7 @@ int brt_lookup (BRT brt, bytevec key, unsigned int keylen, bytevec*val, unsigned return r; } rootp = calculate_root_offset_pointer(brt); - if ((r = brt_lookup_node(brt, *rootp, key, keylen, val, vallen))) { + if ((r = brt_lookup_node(brt, *rootp, k, v, db))) { printf("%s:%d\n", __FILE__, __LINE__); goto died0; } diff --git a/newbrt/brt.h b/newbrt/brt.h index 78a88b71d9b..e33b667a1cf 100644 --- a/newbrt/brt.h +++ b/newbrt/brt.h @@ -9,11 +9,11 @@ #include "../include/ydb-constants.h" #include "cachetable.h" typedef struct brt *BRT; -int open_brt (const char *fname, const char *dbname, int is_create, BRT *, int nodesize, CACHETABLE); +int open_brt (const char *fname, const char *dbname, int is_create, BRT *, int nodesize, CACHETABLE, int(*)(DB*,DBT*,DBT*)); //int brt_create (BRT **, int nodesize, int n_nodes_in_cache); /* the nodesize and n_nodes in cache really should be separately configured. */ //int brt_open (BRT *, char *fname, char *dbname); -int brt_insert (BRT brt, bytevec key, ITEMLEN keylen, bytevec val, ITEMLEN vallen); -int brt_lookup (BRT brt, bytevec key, ITEMLEN keylen, bytevec*val, ITEMLEN *vallen); +int brt_insert (BRT brt, DBT *k, DBT *v, DB*db); +int brt_lookup (BRT brt, DBT *k, DBT *v, DB*db); int close_brt (BRT); int dump_brt (BRT brt); void brt_fsync (BRT); /* fsync, but don't clear the caches. */ diff --git a/newbrt/key.h b/newbrt/key.h index 9624638614a..fa4186be8ff 100644 --- a/newbrt/key.h +++ b/newbrt/key.h @@ -1,3 +1,4 @@ +#include "ybt.h" #include "brttypes.h" int keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len); diff --git a/newbrt/pma-test.c b/newbrt/pma-test.c index 97b8aca09b8..e62f188eabe 100644 --- a/newbrt/pma-test.c +++ b/newbrt/pma-test.c @@ -268,7 +268,7 @@ static void test_find_insert (void) { r=pma_insert(pma, fill_dbt(&k, "aaa", 3), fill_dbt(&v, "aaadata", 7), 0); assert(r==BRT_OK); - ybt_init(&v); + init_dbt(&v); r=pma_lookup(pma, fill_dbt(&k, "aaa", 3), &v, 0); assert(r==BRT_OK); assert(v.size==7); @@ -278,12 +278,12 @@ static void test_find_insert (void) { r=pma_insert(pma, fill_dbt(&k, "bbb", 4), fill_dbt(&v, "bbbdata", 8), 0); assert(r==BRT_OK); - ybt_init(&v); + init_dbt(&v); r=pma_lookup(pma, fill_dbt(&k, "aaa", 3), &v, 0); assert(r==BRT_OK); assert(keycompare(v.data,v.size,"aaadata", 7)==0); - ybt_init(&v); + init_dbt(&v); r=pma_lookup(pma, fill_dbt(&k, "bbb", 4), &v, 0); assert(r==BRT_OK); assert(keycompare(v.data,v.size,"bbbdata", 8)==0); @@ -415,8 +415,8 @@ void test_pma_cursor_2 (void) { PMA_CURSOR c=0; int r; DBT key,val; - ybt_init(&key); key.flags=DB_DBT_REALLOC; - ybt_init(&val); val.flags=DB_DBT_REALLOC; + init_dbt(&key); key.flags=DB_DBT_REALLOC; + init_dbt(&val); val.flags=DB_DBT_REALLOC; r=pma_create(&pma, default_compare_fun); assert(r==0); r=pma_cursor(pma, &c); assert(r==0); assert(c!=0); r=pma_cursor_set_position_last(c); assert(r==DB_NOTFOUND); @@ -434,8 +434,8 @@ void test_pma_cursor_3 (void) { r=pma_insert(pma, fill_dbt(&k, "x", 2), fill_dbt(&v, "xx", 3), 0); assert(r==BRT_OK); r=pma_insert(pma, fill_dbt(&k, "m", 2), fill_dbt(&v, "mm", 3), 0); assert(r==BRT_OK); r=pma_insert(pma, fill_dbt(&k, "aa", 3), fill_dbt(&v,"a", 2), 0); assert(r==BRT_OK); - ybt_init(&key); key.flags=DB_DBT_REALLOC; - ybt_init(&val); val.flags=DB_DBT_REALLOC; + init_dbt(&key); key.flags=DB_DBT_REALLOC; + init_dbt(&val); val.flags=DB_DBT_REALLOC; r=pma_cursor(pma, &c); assert(r==0); assert(c!=0); r=pma_cursor_set_position_first(c); assert(r==0); @@ -508,8 +508,8 @@ void test_pma_compare_fun (int wrong_endian_p) { r = pma_insert(pma, fill_dbt(&k, "00", 3), fill_dbt(&v, "00v", 4), 0); assert(r==BRT_OK); r = pma_insert(pma, fill_dbt(&k, "01", 3), fill_dbt(&v, "01v", 4), 0); assert(r==BRT_OK); r = pma_insert(pma, fill_dbt(&k, "11", 3), fill_dbt(&v, "11v", 4), 0); assert(r==BRT_OK); - ybt_init(&key); key.flags=DB_DBT_REALLOC; - ybt_init(&val); val.flags=DB_DBT_REALLOC; + init_dbt(&key); key.flags=DB_DBT_REALLOC; + init_dbt(&val); val.flags=DB_DBT_REALLOC; r=pma_cursor(pma, &c); assert(r==0); assert(c!=0); for (i=0; i<4; i++) { diff --git a/newbrt/ybt-test.c b/newbrt/ybt-test.c index edb9313f026..8fa39c5a247 100644 --- a/newbrt/ybt-test.c +++ b/newbrt/ybt-test.c @@ -8,8 +8,8 @@ void ybt_test0 (void) { void *v0=0,*v1=0; DBT t0,t1; - ybt_init(&t0); - ybt_init(&t1); + init_dbt(&t0); + init_dbt(&t1); ybt_set_value(&t0, "hello", 6, &v0); ybt_set_value(&t1, "foo", 4, &v1); assert(t0.size==6); @@ -25,7 +25,7 @@ void ybt_test0 (void) { memory_check_all_free(); /* See if we can probe to find out how big something is by setting ulen=0 with YBT_USERMEM */ - ybt_init(&t0); + init_dbt(&t0); t0.flags = DB_DBT_USERMEM; t0.ulen = 0; ybt_set_value(&t0, "hello", 6, 0); @@ -33,7 +33,7 @@ void ybt_test0 (void) { assert(t0.size==6); /* Check realloc. */ - ybt_init(&t0); + init_dbt(&t0); t0.flags = DB_DBT_REALLOC; v0 = 0; ybt_set_value(&t0, "internationalization", 21, &v0); diff --git a/newbrt/ybt.c b/newbrt/ybt.c index 8f446c2dd68..e4d94fb473c 100644 --- a/newbrt/ybt.c +++ b/newbrt/ybt.c @@ -4,13 +4,13 @@ #include #include -int ybt_init (DBT *ybt) { +DBT *init_dbt (DBT *ybt) { memset(ybt, 0, sizeof(*ybt)); - return 0; + return ybt; } DBT *fill_dbt(DBT *dbt, bytevec k, ITEMLEN len) { - ybt_init(dbt); + init_dbt(dbt); dbt->size=len; dbt->data=(char*)k; return dbt; diff --git a/newbrt/ybt.h b/newbrt/ybt.h index 13a69c5a35c..b30d50b9512 100644 --- a/newbrt/ybt.h +++ b/newbrt/ybt.h @@ -6,7 +6,7 @@ #include "../include/db.h" -int ybt_init (DBT *); +DBT* init_dbt (DBT *); DBT *fill_dbt(DBT *dbt, bytevec k, ITEMLEN len); int ybt_set_value (DBT *, bytevec val, ITEMLEN vallen, void **staticptrp);