diff --git a/newbrt/brt-internal.h b/newbrt/brt-internal.h index 59426991c5a..de7e06abb2b 100644 --- a/newbrt/brt-internal.h +++ b/newbrt/brt-internal.h @@ -507,6 +507,12 @@ struct brt { }; /* serialization code */ +void +toku_create_compressed_partition_from_available( + BRTNODE node, + int childnum, + SUB_BLOCK sb + ); int toku_serialize_brtnode_to_memory (BRTNODE node, unsigned int basementnodesize, /*out*/ size_t *n_bytes_to_write, @@ -559,6 +565,7 @@ struct brtenv { extern void toku_brtnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, void *brtnode_v, void *extraargs, long size, BOOL write_me, BOOL keep_me, BOOL for_checkpoint); extern int toku_brtnode_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, u_int32_t fullhash, void **brtnode_pv, long *sizep, int*dirty, void*extraargs); +extern void toku_brtnode_pe_est_callback(void* brtnode_pv, long* bytes_freed_estimate, enum partial_eviction_cost *cost, void* write_extraargs); extern int toku_brtnode_pe_callback (void *brtnode_pv, long bytes_to_free, long* bytes_freed, void *extraargs); extern BOOL toku_brtnode_pf_req_callback(void* brtnode_pv, void* read_extraargs); int toku_brtnode_pf_callback(void* brtnode_pv, void* read_extraargs, int fd, long* sizep); diff --git a/newbrt/brt-serialize.c b/newbrt/brt-serialize.c index 5341df5d860..16eb9209d09 100644 --- a/newbrt/brt-serialize.c +++ b/newbrt/brt-serialize.c @@ -615,6 +615,44 @@ rebalance_brtnode_leaf(BRTNODE node, unsigned int basementnodesize) static void serialize_and_compress(BRTNODE node, int npartitions, struct sub_block sb[]); +static void +serialize_and_compress_partition(BRTNODE node, int childnum, SUB_BLOCK sb) +{ + serialize_brtnode_partition(node, childnum, sb); + compress_brtnode_sub_block(sb); +} + +void +toku_create_compressed_partition_from_available( + BRTNODE node, + int childnum, + SUB_BLOCK sb + ) +{ + serialize_and_compress_partition(node, childnum, sb); + // + // now we have an sb that would be ready for being written out, + // but we are not writing it out, we are storing it in cache for a potentially + // long time, so we need to do some cleanup + // + // The buffer created above contains metadata in the first 8 bytes, and is overallocated + // It allocates a bound on the compressed length (evaluated before compression) as opposed + // to just the amount of the actual compressed data. So, we create a new buffer and copy + // just the compressed data. + // + u_int32_t compressed_size = toku_dtoh32(*(u_int32_t *)sb->compressed_ptr); + void* compressed_data = toku_xmalloc(compressed_size); + memcpy(compressed_data, (char *)sb->compressed_ptr + 8, compressed_size); + toku_free(sb->compressed_ptr); + sb->compressed_ptr = compressed_data; + sb->compressed_size = compressed_size; + if (sb->uncompressed_ptr) { + toku_free(sb->uncompressed_ptr); + sb->uncompressed_ptr = NULL; + } + +} + // tests are showing that serial insertions are slightly faster // using the pthreads than using CILK. Disabling CILK until we have @@ -626,8 +664,7 @@ static void serialize_and_compress(BRTNODE node, int npartitions, struct sub_block sb[]) { #pragma cilk grainsize = 1 cilk_for (int i = 0; i < npartitions; i++) { - serialize_brtnode_partition(node, i, &sb[i]); - compress_brtnode_sub_block(&sb[i]); + serialize_and_compress_partition(node, i, &sb[i]); } } @@ -648,8 +685,7 @@ serialize_and_compress_worker(void *arg) { if (w == NULL) break; int i = w->i; - serialize_brtnode_partition(w->node, i, &w->sb[i]); - compress_brtnode_sub_block(&w->sb[i]); + serialize_and_compress_partition(w->node, i, &w->sb[i]); } workset_release_ref(ws); return arg; @@ -658,8 +694,7 @@ serialize_and_compress_worker(void *arg) { static void serialize_and_compress(BRTNODE node, int npartitions, struct sub_block sb[]) { if (npartitions == 1) { - serialize_brtnode_partition(node, 0, &sb[0]); - compress_brtnode_sub_block(&sb[0]); + serialize_and_compress_partition(node, 0, &sb[0]); } else { int T = num_cores; if (T > npartitions) diff --git a/newbrt/brt-test-helpers.c b/newbrt/brt-test-helpers.c index 17587a31854..37e1f5e6b71 100644 --- a/newbrt/brt-test-helpers.c +++ b/newbrt/brt-test-helpers.c @@ -86,6 +86,7 @@ int toku_testsetup_get_sersize(BRT brt, BLOCKNUM diskoff) // Return the size on NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, + toku_brtnode_pe_est_callback, toku_brtnode_pe_callback, toku_brtnode_pf_req_callback, toku_brtnode_pf_callback, @@ -114,6 +115,7 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, + toku_brtnode_pe_est_callback, toku_brtnode_pe_callback, toku_brtnode_pf_req_callback, toku_brtnode_pf_callback, @@ -185,6 +187,7 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_msg_t NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, + toku_brtnode_pe_est_callback, toku_brtnode_pe_callback, toku_brtnode_pf_req_callback, toku_brtnode_pf_callback, diff --git a/newbrt/brt-verify.c b/newbrt/brt-verify.c index 4d35aab1e37..a7e5c6c22f9 100644 --- a/newbrt/brt-verify.c +++ b/newbrt/brt-verify.c @@ -123,6 +123,7 @@ toku_verify_brtnode (BRT brt, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, + toku_brtnode_pe_est_callback, toku_brtnode_pe_callback, toku_brtnode_pf_req_callback, toku_brtnode_pf_callback, diff --git a/newbrt/brt.c b/newbrt/brt.c index 7ac6bfe4818..125df0f1ec0 100644 --- a/newbrt/brt.c +++ b/newbrt/brt.c @@ -284,6 +284,7 @@ int toku_pin_brtnode (BRT brt, BLOCKNUM blocknum, u_int32_t fullhash, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, + toku_brtnode_pe_est_callback, toku_brtnode_pe_callback, toku_brtnode_pf_req_callback, toku_brtnode_pf_callback, @@ -316,6 +317,7 @@ void toku_pin_brtnode_holding_lock (BRT brt, BLOCKNUM blocknum, u_int32_t fullha NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, + toku_brtnode_pe_est_callback, toku_brtnode_pe_callback, toku_brtnode_pf_req_callback, toku_brtnode_pf_callback, @@ -491,6 +493,19 @@ fetch_from_buf (OMT omt, u_int32_t idx) { return (LEAFENTRY)v; } +static long +get_avail_internal_node_partition_size(BRTNODE node, int i) +{ + long retval = 0; + assert(node->height > 0); + NONLEAF_CHILDINFO childinfo = BNC(node, i); + retval += sizeof(*childinfo); + retval += toku_fifo_memory_size(BNC_BUFFER(node, i)); + retval += toku_omt_memory_size(BNC_BROADCAST_BUFFER(node, i)); + retval += toku_omt_memory_size(BNC_MESSAGE_TREE(node, i)); + return retval; +} + static long brtnode_memory_size (BRTNODE node) // Effect: Estimate how much main memory a node requires. @@ -513,11 +528,7 @@ brtnode_memory_size (BRTNODE node) } else if (BP_STATE(node,i) == PT_AVAIL) { if (node->height > 0) { - NONLEAF_CHILDINFO childinfo = BNC(node, i); - retval += sizeof(*childinfo); - retval += toku_fifo_memory_size(BNC_BUFFER(node, i)); - retval += toku_omt_memory_size(BNC_BROADCAST_BUFFER(node, i)); - retval += toku_omt_memory_size(BNC_MESSAGE_TREE(node, i)); + retval += get_avail_internal_node_partition_size(node, i); } else { BASEMENTNODE bn = BLB(node, i); @@ -659,17 +670,96 @@ int toku_brtnode_fetch_callback (CACHEFILE UU(cachefile), int fd, BLOCKNUM noden return r; } + +void toku_brtnode_pe_est_callback( + void* brtnode_pv, + long* bytes_freed_estimate, + enum partial_eviction_cost *cost, + void* UU(write_extraargs) + ) +{ + assert(brtnode_pv != NULL); + long bytes_to_free = 0; + BRTNODE node = (BRTNODE)brtnode_pv; + if (node->dirty || node->height == 0) { + *bytes_freed_estimate = 0; + *cost = PE_CHEAP; + goto exit; + } + + // + // we are dealing with a clean internal node + // + *cost = PE_EXPENSIVE; + // now lets get an estimate for how much data we can free up + // we estimate the compressed size of data to be how large + // the compressed data is on disk + for (int i = 0; i < node->n_children; i++) { + if (BP_SHOULD_EVICT(node,i)) { + // calculate how much data would be freed if + // we compress this node and add it to + // bytes_to_free + + // first get an estimate for how much space will be taken + // after compression, it is simply the size of compressed + // data on disk plus the size of the struct that holds it + u_int32_t compressed_data_size = + ((i==0) ? + BP_OFFSET(node,i) : + (BP_OFFSET(node,i) - BP_OFFSET(node,i-1))); + compressed_data_size += sizeof(struct sub_block); + + // now get the space taken now + u_int32_t decompressed_data_size = get_avail_internal_node_partition_size(node,i); + bytes_to_free += (decompressed_data_size - compressed_data_size); + } + } + + *bytes_freed_estimate = bytes_to_free; +exit: + return; +} + + // callback for partially evicting a node -int toku_brtnode_pe_callback (void *brtnode_pv, long bytes_to_free, long* bytes_freed, void* UU(extraargs)) { +int toku_brtnode_pe_callback (void *brtnode_pv, long UU(bytes_to_free), long* bytes_freed, void* UU(extraargs)) { BRTNODE node = (BRTNODE)brtnode_pv; long orig_size = brtnode_memory_size(node); - assert(bytes_to_free > 0); // // nothing on internal nodes for now // - if (node->dirty || node->height > 0) { + if (node->dirty) { *bytes_freed = 0; + goto exit; + } + // + // partial eviction for internal nodes + // + if (node->height > 0) { + for (int i = 0; i < node->n_children; i++) { + if (BP_STATE(node,i) == PT_AVAIL) { + if (BP_SHOULD_EVICT(node,i)) { + // if we should evict, compress the + // message buffer into a sub_block + SUB_BLOCK sb = NULL; + sb = toku_xmalloc(sizeof(struct sub_block)); + sub_block_init(sb); + toku_create_compressed_partition_from_available(node, i, sb); + + // now free the old partition and replace it with this + destroy_nonleaf_childinfo(BNC(node,i)); + set_BSB(node, i, sb); + BP_STATE(node,i) = PT_COMPRESSED; + } + else { + BP_SWEEP_CLOCK(node,i); + } + } + else { + continue; + } + } } // // partial eviction strategy for basement nodes: @@ -683,7 +773,7 @@ int toku_brtnode_pe_callback (void *brtnode_pv, long bytes_to_free, long* bytes_ SUB_BLOCK sb = BSB(node, i); toku_free(sb->compressed_ptr); toku_free(sb); - set_BNULL(node, i); + set_BNULL(node, i); BP_STATE(node,i) = PT_ON_DISK; } else if (BP_STATE(node,i) == PT_AVAIL) { @@ -693,7 +783,7 @@ int toku_brtnode_pe_callback (void *brtnode_pv, long bytes_to_free, long* bytes_ OMT curr_omt = BLB_BUFFER(node, i); toku_omt_free_items(curr_omt); destroy_basement_node(bn); - set_BNULL(node,i); + set_BNULL(node,i); BP_STATE(node,i) = PT_ON_DISK; } else { @@ -709,11 +799,14 @@ int toku_brtnode_pe_callback (void *brtnode_pv, long bytes_to_free, long* bytes_ } } *bytes_freed = orig_size - brtnode_memory_size(node); - invariant(*bytes_freed >= 0); - if (node->height == 0) - brt_status.bytes_leaf -= *bytes_freed; - else - brt_status.bytes_nonleaf -= *bytes_freed; + +exit: + if (node->height == 0) { + brt_status.bytes_leaf -= *bytes_freed; + } + else { + brt_status.bytes_nonleaf -= *bytes_freed; + } return 0; } @@ -1085,7 +1178,7 @@ brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *r u_int32_t fullhash = toku_cachetable_hash(brt->cf, newroot_diskoff); newroot->fullhash = fullhash; toku_cachetable_put(brt->cf, newroot_diskoff, fullhash, newroot, brtnode_memory_size(newroot), - toku_brtnode_flush_callback, toku_brtnode_pe_callback, brt->h); + toku_brtnode_flush_callback, toku_brtnode_pe_est_callback, toku_brtnode_pe_callback, brt->h); *newrootp = newroot; } @@ -1107,7 +1200,7 @@ toku_create_new_brtnode (BRT t, BRTNODE *result, int height, int n_children) { n->fullhash = fullhash; int r = toku_cachetable_put(t->cf, n->thisnodename, fullhash, n, brtnode_memory_size(n), - toku_brtnode_flush_callback, toku_brtnode_pe_callback, t->h); + toku_brtnode_flush_callback, toku_brtnode_pe_est_callback, toku_brtnode_pe_callback, t->h); assert_zero(r); *result = n; @@ -3514,7 +3607,7 @@ static int setup_initial_brt_root_node (BRT t, BLOCKNUM blocknum) { node->fullhash = fullhash; int r = toku_cachetable_put(t->cf, blocknum, fullhash, node, brtnode_memory_size(node), - toku_brtnode_flush_callback, toku_brtnode_pe_callback, t->h); + toku_brtnode_flush_callback, toku_brtnode_pe_est_callback, toku_brtnode_pe_callback, t->h); if (r != 0) toku_free(node); else @@ -5493,6 +5586,7 @@ brt_node_maybe_prefetch(BRT brt, BRTNODE node, int childnum, BRT_CURSOR brtcurso nextfullhash, toku_brtnode_flush_callback, brtnode_fetch_callback_and_free_bfe, + toku_brtnode_pe_est_callback, toku_brtnode_pe_callback, toku_brtnode_pf_req_callback, brtnode_pf_callback_and_free_bfe, @@ -6358,6 +6452,7 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, struct kv_ NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, + toku_brtnode_pe_est_callback, toku_brtnode_pe_callback, toku_brtnode_pf_req_callback, toku_brtnode_pf_callback, @@ -6675,6 +6770,7 @@ static BOOL is_empty_fast_iter (BRT brt, BRTNODE node) { NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, + toku_brtnode_pe_est_callback, toku_brtnode_pe_callback, toku_brtnode_pf_req_callback, toku_brtnode_pf_callback, @@ -6720,6 +6816,7 @@ BOOL toku_brt_is_empty_fast (BRT brt) NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, + toku_brtnode_pe_est_callback, toku_brtnode_pe_callback, toku_brtnode_pf_req_callback, toku_brtnode_pf_callback, diff --git a/newbrt/cachetable.c b/newbrt/cachetable.c index 904408ae668..6b3b6e9c924 100644 --- a/newbrt/cachetable.c +++ b/newbrt/cachetable.c @@ -77,12 +77,14 @@ struct ctpair { enum cachetable_dirty dirty; char verify_flag; // Used in verify_cachetable() + BOOL remove_me; // write_pair u_int32_t fullhash; CACHETABLE_FLUSH_CALLBACK flush_callback; - CACHETABLE_FETCH_CALLBACK fetch_callback; - CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback; + CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK pe_est_callback; + CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback; + long size_evicting_estimate; void *write_extraargs; PAIR next,prev; // In clock. @@ -137,6 +139,7 @@ struct cachetable { int64_t size_reserved; // How much memory is reserved (e.g., by the loader) int64_t size_current; // the sum of the sizes of the pairs in the cachetable int64_t size_limit; // the limit to the sum of the pair sizes + int64_t size_evicting; // the sum of the sizes of the pairs being written int64_t size_max; // high water mark of size_current (max value size_current ever had) TOKULOGGER logger; toku_pthread_mutex_t *mutex; // coarse lock that protects the cachetable, the cachefiles, and the pairs @@ -181,6 +184,16 @@ static inline void cachetable_unlock(CACHETABLE ct __attribute__((unused))) { int r = toku_pthread_mutex_unlock(ct->mutex); resource_assert_zero(r); } +// Wait for cache table space to become available +// size_current is number of bytes currently occupied by data (referred to by pairs) +// size_evicting is number of bytes queued up to be written out (sum of sizes of pairs in CTPAIR_WRITING state) +static inline void cachetable_wait_write(CACHETABLE ct) { + // if we're writing more than half the data in the cachetable + while (2*ct->size_evicting > ct->size_current) { + workqueue_wait_write(&ct->wq, 0); + } +} + enum cachefile_checkpoint_state { CS_INVALID = 0, CS_NOT_IN_PROGRESS, @@ -282,6 +295,7 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN UU(initial_l u_int64_t toku_cachetable_reserve_memory(CACHETABLE ct, double fraction) { cachetable_lock(ct); + cachetable_wait_write(ct); uint64_t reserved_memory = fraction*(ct->size_limit-ct->size_reserved); ct->size_reserved += reserved_memory; { @@ -1188,6 +1202,16 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p, BOOL remove_me) { assert(!p->checkpoint_pending); rwlock_read_unlock(&ct->pending_lock); + // maybe wakeup any stalled writers when the pending writes fall below + // 1/8 of the size of the cachetable + if (remove_me) { + ct->size_evicting -= p->size; + assert(ct->size_evicting >= 0); + if (8*ct->size_evicting <= ct->size_current) { + workqueue_wakeup_write(&ct->wq, 0); + } + } + // stuff it into a completion queue for delayed completion if a completion queue exists // otherwise complete the write now if (p->cq) @@ -1203,7 +1227,7 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p, BOOL remove_me) { static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remove) { p->cq = 0; p->state = CTPAIR_IDLE; - + rwlock_write_unlock(&p->rwlock); if (do_remove) cachetable_maybe_remove_and_free_pair(ct, p); @@ -1219,13 +1243,14 @@ static void flush_dirty_pair(CACHETABLE ct, PAIR p) { if (!rwlock_users(&p->rwlock)) { rwlock_write_lock(&p->rwlock, ct->mutex); p->state = CTPAIR_WRITING; + p->remove_me = FALSE; WORKITEM wi = &p->asyncwork; workitem_init(wi, cachetable_writer, p); workqueue_enq(&ct->wq, wi, 0); } } -static void try_evict_pair(CACHETABLE ct, PAIR p, BOOL* is_attainable) { +static void try_evict_pair(CACHETABLE ct, PAIR p) { // evictions without a write or unpinned pair's that are clean // can be run in the current thread @@ -1234,11 +1259,20 @@ static void try_evict_pair(CACHETABLE ct, PAIR p, BOOL* is_attainable) { if (!rwlock_users(&p->rwlock)) { rwlock_write_lock(&p->rwlock, ct->mutex); p->state = CTPAIR_WRITING; - cachetable_write_pair(ct, p, TRUE); - *is_attainable = TRUE; - } - else { - *is_attainable = FALSE; + + assert(ct->size_evicting >= 0); + ct->size_evicting += p->size; + assert(ct->size_evicting >= 0); + + if (!p->dirty) { + cachetable_write_pair(ct, p, TRUE); + } + else { + p->remove_me = TRUE; + WORKITEM wi = &p->asyncwork; + workitem_init(wi, cachetable_writer, p); + workqueue_enq(&ct->wq, wi, 0); + } } } @@ -1252,10 +1286,20 @@ static void flush_and_maybe_remove (CACHETABLE ct, PAIR p) { // this needs to be done here regardless of whether the eviction occurs on the main thread or on // a writer thread, because there may be a completion queue that needs access to this information WORKITEM wi = &p->asyncwork; + // we are not going to remove if we are posting a dirty node + // to the writer thread. + // In that case, we will let the caller decide if they want to remove it. + // We may be able to just let the writer thread evict the node, + // but I (Zardosht) do not understand the caller well enough + // so I am hesitant to change it. + p->remove_me = FALSE; workitem_init(wi, cachetable_writer, p); // evictions without a write or unpinned pair's that are clean // can be run in the current thread if (!rwlock_readers(&p->rwlock) && !p->dirty) { + assert(ct->size_evicting >= 0); + ct->size_evicting += p->size; + assert(ct->size_evicting >= 0); cachetable_write_pair(ct, p, TRUE); } else { @@ -1263,47 +1307,119 @@ static void flush_and_maybe_remove (CACHETABLE ct, PAIR p) { } } + +static void do_partial_eviction(CACHETABLE ct, PAIR p) { + // This really should be something else, but need to set it to something + // other than CTPAIR_IDLE so that other threads know to not hold + // ydb lock while waiting on this node + p->state = CTPAIR_WRITING; + long bytes_freed; + + cachetable_unlock(ct); + p->pe_callback(p->value, 0, &bytes_freed, p->write_extraargs); + cachetable_lock(ct); + + assert(bytes_freed <= ct->size_current); + assert(bytes_freed <= p->size); + ct->size_current -= bytes_freed; + p->size -= bytes_freed; + + assert(ct->size_evicting >= p->size_evicting_estimate); + ct->size_evicting -= p->size_evicting_estimate; + + p->state = CTPAIR_IDLE; + if (p->cq) { + workitem_init(&p->asyncwork, NULL, p); + workqueue_enq(p->cq, &p->asyncwork, 1); + } + else { + rwlock_write_unlock(&p->rwlock); + } +} + +static void cachetable_partial_eviction(WORKITEM wi) { + PAIR p = workitem_arg(wi); + CACHETABLE ct = p->cachefile->cachetable; + cachetable_lock(ct); + do_partial_eviction(ct,p); + cachetable_unlock(ct); +} + static int maybe_flush_some (CACHETABLE ct, long size) { int r = 0; + // - // this is the data that cannot be accessed right now, because it has a user - // If all the data is unattainable, we get into an infinite loop, so we count it with - // the limit + // These variables will help us detect if everything in the clock is currently being accessed. + // We must detect this case otherwise we will end up in an infinite loop below. // - u_int32_t unattainable_data = 0; + CACHEKEY curr_cachekey; + FILENUM curr_filenum; + BOOL set_val = FALSE; - while ((ct->head) && (size + ct->size_current > ct->size_limit + unattainable_data)) { + while ((ct->head) && (size + ct->size_current > ct->size_limit + ct->size_evicting)) { PAIR curr_in_clock = ct->head; - // if we come across a dirty pair, and its count is low, write it out with hope that next - // time clock comes around, we can just evict it - if ((curr_in_clock->count <= CLOCK_WRITE_OUT) && curr_in_clock->dirty) { - flush_dirty_pair(ct, curr_in_clock); - } - if (curr_in_clock->count > 0) { - if (curr_in_clock->state == CTPAIR_IDLE && !rwlock_users(&curr_in_clock->rwlock)) { - curr_in_clock->count--; - // call the partial eviction callback - rwlock_write_lock(&curr_in_clock->rwlock, ct->mutex); - long size_remaining = (size + ct->size_current) - (ct->size_limit + unattainable_data); - void *value = curr_in_clock->value; - void *write_extraargs = curr_in_clock->write_extraargs; - long bytes_freed; - curr_in_clock->pe_callback(value, size_remaining, &bytes_freed, write_extraargs); - assert(bytes_freed <= ct->size_current); - assert(bytes_freed <= curr_in_clock->size); - ct->size_current -= bytes_freed; - curr_in_clock->size -= bytes_freed; - - rwlock_write_unlock(&curr_in_clock->rwlock); + if (rwlock_users(&curr_in_clock->rwlock)) { + if (set_val && + curr_in_clock->key.b == curr_cachekey.b && + curr_in_clock->cachefile->filenum.fileid == curr_filenum.fileid) + { + // we have identified a cycle where everything in the clock is in use + // do not return an error + // just let memory be overfull + r = 0; + goto exit; } else { - unattainable_data += curr_in_clock->size; + if (!set_val) { + set_val = TRUE; + curr_cachekey = ct->head->key; + curr_filenum = ct->head->cachefile->filenum; + } } } else { - BOOL is_attainable; - try_evict_pair(ct, curr_in_clock, &is_attainable); // as of now, this does an eviction on the main thread - if (!is_attainable) unattainable_data += curr_in_clock->size; + set_val = FALSE; + if (curr_in_clock->count > 0) { + curr_in_clock->count--; + // call the partial eviction callback + rwlock_write_lock(&curr_in_clock->rwlock, ct->mutex); + + void *value = curr_in_clock->value; + void *write_extraargs = curr_in_clock->write_extraargs; + enum partial_eviction_cost cost; + long bytes_freed_estimate = 0; + curr_in_clock->pe_est_callback( + value, + &bytes_freed_estimate, + &cost, + write_extraargs + ); + if (cost == PE_CHEAP) { + curr_in_clock->size_evicting_estimate = 0; + do_partial_eviction(ct, curr_in_clock); + } + else if (cost == PE_EXPENSIVE) { + // only bother running an expensive partial eviction + // if it is expected to free space + if (bytes_freed_estimate > 0) { + curr_in_clock->size_evicting_estimate = bytes_freed_estimate; + ct->size_evicting += bytes_freed_estimate; + WORKITEM wi = &curr_in_clock->asyncwork; + workitem_init(wi, cachetable_partial_eviction, curr_in_clock); + workqueue_enq(&ct->wq, wi, 0); + } + else { + rwlock_write_unlock(&curr_in_clock->rwlock); + } + } + else { + assert(FALSE); + } + + } + else { + try_evict_pair(ct, curr_in_clock); + } } // at this point, either curr_in_clock is still in the list because it has not been fully evicted, // and we need to move ct->head over. Otherwise, curr_in_clock has been fully evicted @@ -1317,6 +1433,7 @@ static int maybe_flush_some (CACHETABLE ct, long size) { if ((4 * ct->n_in_table < ct->table_size) && ct->table_size > 4) { cachetable_rehash(ct, ct->table_size/2); } +exit: return r; } @@ -1332,6 +1449,7 @@ static PAIR cachetable_insert_at(CACHETABLE ct, u_int32_t fullhash, long size, CACHETABLE_FLUSH_CALLBACK flush_callback, + CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK pe_est_callback, CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback, void *write_extraargs, enum cachetable_dirty dirty) { @@ -1348,9 +1466,11 @@ static PAIR cachetable_insert_at(CACHETABLE ct, p->state = state; p->flush_callback = flush_callback; p->pe_callback = pe_callback; + p->pe_est_callback = pe_est_callback; p->write_extraargs = write_extraargs; p->fullhash = fullhash; p->next = p->prev = 0; + p->remove_me = FALSE; rwlock_init(&p->rwlock); p->cq = 0; pair_add_to_clock(ct, p); @@ -1386,13 +1506,15 @@ note_hash_count (int count) { } int toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void*value, long size, - CACHETABLE_FLUSH_CALLBACK flush_callback, + CACHETABLE_FLUSH_CALLBACK flush_callback, + CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK pe_est_callback, CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback, void *write_extraargs) { WHEN_TRACE_CT(printf("%s:%d CT cachetable_put(%lld)=%p\n", __FILE__, __LINE__, key, value)); CACHETABLE ct = cachefile->cachetable; int count=0; cachetable_lock(ct); + cachetable_wait_write(ct); { PAIR p; for (p=ct->table[fullhash&(cachefile->cachetable->table_size-1)]; p; p=p->hash_chain) { @@ -1424,7 +1546,8 @@ int toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, v CTPAIR_IDLE, fullhash, size, - flush_callback, + flush_callback, + pe_est_callback, pe_callback, write_extraargs, CACHETABLE_DIRTY @@ -1511,7 +1634,9 @@ do_partial_fetch(CACHETABLE ct, CACHEFILE cachefile, PAIR p, CACHETABLE_PARTIAL_ workitem_init(&p->asyncwork, NULL, p); workqueue_enq(p->cq, &p->asyncwork, 1); } - rwlock_write_unlock(&p->rwlock); + else { + rwlock_write_unlock(&p->rwlock); + } } // for debugging @@ -1531,6 +1656,7 @@ int toku_cachetable_get_and_pin ( long *sizep, CACHETABLE_FLUSH_CALLBACK flush_callback, CACHETABLE_FETCH_CALLBACK fetch_callback, + CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK pe_est_callback, CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback, CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, @@ -1551,7 +1677,7 @@ int toku_cachetable_get_and_pin ( get_and_pin_fullhash = fullhash; get_and_pin_footprint = 2; - // a function used to live here, not changing all the counts of get_and_pin_footprint + cachetable_wait_write(ct); get_and_pin_footprint = 3; for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) { count++; @@ -1661,6 +1787,7 @@ int toku_cachetable_get_and_pin ( fullhash, zero_size, flush_callback, + pe_est_callback, pe_callback, write_extraargs, CACHETABLE_CLEAN @@ -1829,6 +1956,7 @@ int toku_cachetable_get_and_pin_nonblocking ( long *sizep, CACHETABLE_FLUSH_CALLBACK flush_callback, CACHETABLE_FETCH_CALLBACK fetch_callback, + CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK pe_est_callback, CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback, CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, @@ -1954,7 +2082,7 @@ int toku_cachetable_get_and_pin_nonblocking ( assert(p==0); // Not found - p = cachetable_insert_at(ct, cf, key, zero_value, CTPAIR_READING, fullhash, zero_size, flush_callback, pe_callback, write_extraargs, CACHETABLE_CLEAN); + p = cachetable_insert_at(ct, cf, key, zero_value, CTPAIR_READING, fullhash, zero_size, flush_callback, pe_est_callback, pe_callback, write_extraargs, CACHETABLE_CLEAN); assert(p); rwlock_write_lock(&p->rwlock, ct->mutex); run_unlockers(unlockers); // we hold the ct mutex. @@ -1984,6 +2112,7 @@ struct cachefile_partial_prefetch_args { int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, CACHETABLE_FLUSH_CALLBACK flush_callback, CACHETABLE_FETCH_CALLBACK fetch_callback, + CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK pe_est_callback, CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback, CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, @@ -2020,7 +2149,7 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, // if not found then create a pair in the READING state and fetch it if (p == 0) { cachetable_prefetches++; - p = cachetable_insert_at(ct, cf, key, zero_value, CTPAIR_READING, fullhash, zero_size, flush_callback, pe_callback, write_extraargs, CACHETABLE_CLEAN); + p = cachetable_insert_at(ct, cf, key, zero_value, CTPAIR_READING, fullhash, zero_size, flush_callback, pe_est_callback, pe_callback, write_extraargs, CACHETABLE_CLEAN); assert(p); rwlock_write_lock(&p->rwlock, ct->mutex); struct cachefile_prefetch_args *MALLOC(cpargs); @@ -2232,13 +2361,9 @@ static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) { cachetable_lock(ct); PAIR p = workitem_arg(wi); p->cq = 0; - if (p->state == CTPAIR_READING) { //Some other thread owned the lock, but transferred ownership to the thread executing this function - rwlock_write_unlock(&p->rwlock); //Release the lock - //No one has a pin. This was being read because of a prefetch. + if (p->state == CTPAIR_READING || p->state == CTPAIR_WRITING) { //Some other thread owned the lock, but transferred ownership to the thread executing this function + rwlock_write_unlock(&p->rwlock); //Release the lock, no one has a pin. cachetable_maybe_remove_and_free_pair(ct, p); - } else if (p->state == CTPAIR_WRITING) { //Some other thread (or this thread) owned the lock and transferred ownership to the thread executing this function - //No one has a pin. This was written because of an eviction. - cachetable_complete_write_pair(ct, p, TRUE); } else if (p->state == CTPAIR_INVALID) { abort_fetch_pair(p); } else @@ -2282,6 +2407,7 @@ toku_cachetable_close (CACHETABLE *ctp) { for (i=0; itable_size; i++) { if (ct->table[i]) return -1; } + assert(ct->size_evicting == 0); rwlock_destroy(&ct->pending_lock); r = toku_pthread_mutex_destroy(&ct->openfd_mutex); resource_assert_zero(r); cachetable_unlock(ct); @@ -2692,7 +2818,7 @@ static void cachetable_writer(WORKITEM wi) { PAIR p = workitem_arg(wi); CACHETABLE ct = p->cachefile->cachetable; cachetable_lock(ct); - cachetable_write_pair(ct, p, FALSE); + cachetable_write_pair(ct, p, p->remove_me); cachetable_unlock(ct); } @@ -2921,7 +3047,7 @@ void toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS s) { s->size_current = ct->size_current; s->size_limit = ct->size_limit; s->size_max = ct->size_max; - s->size_writing = 0; + s->size_writing = ct->size_evicting; s->get_and_pin_footprint = get_and_pin_footprint; s->local_checkpoint = local_checkpoint; s->local_checkpoint_files = local_checkpoint_files; diff --git a/newbrt/cachetable.h b/newbrt/cachetable.h index 17e05d35759..1c5361f8cf1 100644 --- a/newbrt/cachetable.h +++ b/newbrt/cachetable.h @@ -105,6 +105,11 @@ void toku_cachefile_get_workqueue_load (CACHEFILE, int *n_in_queue, int *n_threa // Handles the case where cf points to /dev/null int toku_cachefile_fsync(CACHEFILE cf); +enum partial_eviction_cost { + PE_CHEAP=0, // running partial eviction is cheap, and can be done on the client thread + PE_EXPENSIVE=1, // running partial eviction is expensive, and should not be done on the client thread +}; + // The flush callback is called when a key value pair is being written to storage and possibly removed from the cachetable. // When write_me is true, the value should be written to storage. // When keep_me is false, the value should be freed. @@ -120,6 +125,13 @@ typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, int fd, CACHEKEY key, void // Can access fd (fd is protected by a readlock during call) typedef int (*CACHETABLE_FETCH_CALLBACK)(CACHEFILE, int fd, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, int *dirtyp, void *read_extraargs); +// The partial eviction estimate callback is a cheap operation called by the cachetable on the client thread +// to determine whether partial eviction is cheap and can be run on the client thread, or partial eviction +// is expensive and should be done on a background (writer) thread. If the callback says that +// partial eviction is expensive, it returns an estimate of the number of bytes it will free +// so that the cachetable can estimate how much data is being evicted on background threads +typedef void (*CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK)(void *brtnode_pv, long* bytes_freed_estimate, enum partial_eviction_cost *cost, void *write_extraargs); + // The partial eviction callback is called by the cachetable to possibly try and partially evict pieces // of the PAIR. The strategy for what to evict is left to the callback. The callback may choose to free // nothing, may choose to free as much as possible. @@ -172,6 +184,7 @@ CACHETABLE toku_cachefile_get_cachetable(CACHEFILE cf); int toku_cachetable_put(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, void *value, long size, CACHETABLE_FLUSH_CALLBACK flush_callback, + CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK pe_est_callback, CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback, void *write_extraargs ); @@ -189,6 +202,7 @@ int toku_cachetable_get_and_pin ( long *sizep, CACHETABLE_FLUSH_CALLBACK flush_callback, CACHETABLE_FETCH_CALLBACK fetch_callback, + CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK pe_est_callback, CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback, CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback __attribute__((unused)), CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback __attribute__((unused)), @@ -215,6 +229,7 @@ int toku_cachetable_get_and_pin_nonblocking ( long *sizep, CACHETABLE_FLUSH_CALLBACK flush_callback, CACHETABLE_FETCH_CALLBACK fetch_callback, + CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK pe_est_callback, CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback, CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback __attribute__((unused)), CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback __attribute__((unused)), @@ -258,7 +273,8 @@ int toku_cachetable_unpin_and_remove (CACHEFILE, CACHEKEY); /* Removing somethin int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, CACHETABLE_FLUSH_CALLBACK flush_callback, - CACHETABLE_FETCH_CALLBACK fetch_callback, + CACHETABLE_FETCH_CALLBACK fetch_callback, + CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK pe_est_callback, CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback, CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback, CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, diff --git a/newbrt/rollback.c b/newbrt/rollback.c index 02a0644bb60..fe5802d9862 100644 --- a/newbrt/rollback.c +++ b/newbrt/rollback.c @@ -510,15 +510,26 @@ static int toku_rollback_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM l return r; } +static void toku_rollback_pe_est_callback( + void* rollback_v, + long* bytes_freed_estimate, + enum partial_eviction_cost *cost, + void* UU(write_extraargs) + ) +{ + assert(rollback_v != NULL); + *bytes_freed_estimate = 0; + *cost = PE_CHEAP; +} + // callback for partially evicting a cachetable entry static int toku_rollback_pe_callback ( void *rollback_v, - long bytes_to_free, + long UU(bytes_to_free), long* bytes_freed, void* UU(extraargs) ) { - assert(bytes_to_free > 0); assert(rollback_v != NULL); *bytes_freed = 0; return 0; @@ -562,6 +573,7 @@ static int toku_create_new_rollback_log (TOKUTXN txn, BLOCKNUM older, uint32_t o r=toku_cachetable_put(cf, log->thislogname, log->thishash, log, rollback_memory_size(log), toku_rollback_flush_callback, + toku_rollback_pe_est_callback, toku_rollback_pe_callback, h); assert(r==0); @@ -779,6 +791,7 @@ toku_maybe_prefetch_older_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) { r = toku_cachefile_prefetch(cf, name, hash, toku_rollback_flush_callback, toku_rollback_fetch_callback, + toku_rollback_pe_est_callback, toku_rollback_pe_callback, toku_brtnode_pf_req_callback, toku_brtnode_pf_callback, @@ -809,6 +822,7 @@ int toku_get_and_pin_rollback_log(TOKUTXN txn, TXNID xid, uint64_t sequence, BLO &log_v, NULL, toku_rollback_flush_callback, toku_rollback_fetch_callback, + toku_rollback_pe_est_callback, toku_rollback_pe_callback, toku_rollback_pf_req_callback, toku_rollback_pf_callback, diff --git a/newbrt/tests/brt-serialize-test.c b/newbrt/tests/brt-serialize-test.c index 9f85356a162..bd381b14bcc 100644 --- a/newbrt/tests/brt-serialize-test.c +++ b/newbrt/tests/brt-serialize-test.c @@ -111,15 +111,43 @@ setup_dn(enum brtnode_verify_type bft, int fd, struct brt_header *brt_h, BRTNODE } // if read_none, get rid of the compressed bp's if (bft == read_none) { - long bytes_freed = 0; - toku_brtnode_pe_callback(*dn, 0xffffffff, &bytes_freed, NULL); - // assert all bp's are on disk - for (int i = 0; i < (*dn)->n_children; i++) { - if ((*dn)->height == 0) { - assert(BP_STATE(*dn,i) == PT_ON_DISK); - assert(is_BNULL(*dn, i)); + if ((*dn)->height == 0) { + long bytes_freed = 0; + toku_brtnode_pe_callback(*dn, 0xffffffff, &bytes_freed, NULL); + // assert all bp's are on disk + for (int i = 0; i < (*dn)->n_children; i++) { + if ((*dn)->height == 0) { + assert(BP_STATE(*dn,i) == PT_ON_DISK); + assert(is_BNULL(*dn, i)); + } + else { + assert(BP_STATE(*dn,i) == PT_COMPRESSED); + } } - else { + } + else { + // first decompress everything, and make sure + // that it is available + // then run partial eviction to get it compressed + fill_bfe_for_full_read(&bfe, brt_h, NULL, string_key_cmp); + assert(toku_brtnode_pf_req_callback(*dn, &bfe)); + long size; + r = toku_brtnode_pf_callback(*dn, &bfe, fd, &size); + assert(r==0); + // assert all bp's are available + for (int i = 0; i < (*dn)->n_children; i++) { + assert(BP_STATE(*dn,i) == PT_AVAIL); + } + long bytes_freed = 0; + toku_brtnode_pe_callback(*dn, 0xffffffff, &bytes_freed, NULL); + for (int i = 0; i < (*dn)->n_children; i++) { + // assert all bp's are still available, because we touched the clock + assert(BP_STATE(*dn,i) == PT_AVAIL); + // now assert all should be evicted + assert(BP_SHOULD_EVICT(*dn, i)); + } + toku_brtnode_pe_callback(*dn, 0xffffffff, &bytes_freed, NULL); + for (int i = 0; i < (*dn)->n_children; i++) { assert(BP_STATE(*dn,i) == PT_COMPRESSED); } } @@ -1126,6 +1154,85 @@ test_serialize_nonleaf(enum brtnode_verify_type bft) { assert(dn->totalchildkeylens==6); assert(BP_BLOCKNUM(dn,0).b==30); assert(BP_BLOCKNUM(dn,1).b==35); + + FIFO src_fifo_1 = BNC_BUFFER(&sn, 0); + FIFO src_fifo_2 = BNC_BUFFER(&sn, 1); + FIFO dest_fifo_1 = BNC_BUFFER(dn, 0); + FIFO dest_fifo_2 = BNC_BUFFER(dn, 1); + bytevec src_key,src_val, dest_key, dest_val; + ITEMLEN src_keylen, src_vallen; + u_int32_t src_type; + MSN src_msn; + XIDS src_xids; + ITEMLEN dest_keylen, dest_vallen; + u_int32_t dest_type; + MSN dest_msn; + XIDS dest_xids; + r = toku_fifo_peek(src_fifo_1, &src_key, &src_keylen, &src_val, &src_vallen, &src_type, &src_msn, &src_xids); + assert(r==0); + r = toku_fifo_peek(dest_fifo_1, &dest_key, &dest_keylen, &dest_val, &dest_vallen, &dest_type, &dest_msn, &dest_xids); + assert(r==0); + assert(src_keylen == dest_keylen); + assert(src_keylen == 2); + assert(src_vallen == dest_vallen); + assert(src_vallen == 5); + assert(src_type == dest_type); + assert(src_msn.msn == dest_msn.msn); + assert(strcmp(src_key, "a") == 0); + assert(strcmp(dest_key, "a") == 0); + assert(strcmp(src_val, "aval") == 0); + assert(strcmp(dest_val, "aval") == 0); + r = toku_fifo_deq(src_fifo_1); + assert(r==0); + r = toku_fifo_deq(dest_fifo_1); + assert(r==0); + r = toku_fifo_peek(src_fifo_1, &src_key, &src_keylen, &src_val, &src_vallen, &src_type, &src_msn, &src_xids); + assert(r==0); + r = toku_fifo_peek(dest_fifo_1, &dest_key, &dest_keylen, &dest_val, &dest_vallen, &dest_type, &dest_msn, &dest_xids); + assert(r==0); + assert(src_keylen == dest_keylen); + assert(src_keylen == 2); + assert(src_vallen == dest_vallen); + assert(src_vallen == 5); + assert(src_type == dest_type); + assert(src_msn.msn == dest_msn.msn); + assert(strcmp(src_key, "b") == 0); + assert(strcmp(dest_key, "b") == 0); + assert(strcmp(src_val, "bval") == 0); + assert(strcmp(dest_val, "bval") == 0); + r = toku_fifo_deq(src_fifo_1); + assert(r==0); + r = toku_fifo_deq(dest_fifo_1); + assert(r==0); + r = toku_fifo_peek(src_fifo_1, &src_key, &src_keylen, &src_val, &src_vallen, &src_type, &src_msn, &src_xids); + assert(r!=0); + r = toku_fifo_peek(dest_fifo_1, &dest_key, &dest_keylen, &dest_val, &dest_vallen, &dest_type, &dest_msn, &dest_xids); + assert(r!=0); + + r = toku_fifo_peek(src_fifo_2, &src_key, &src_keylen, &src_val, &src_vallen, &src_type, &src_msn, &src_xids); + assert(r==0); + r = toku_fifo_peek(dest_fifo_2, &dest_key, &dest_keylen, &dest_val, &dest_vallen, &dest_type, &dest_msn, &dest_xids); + assert(r==0); + assert(src_keylen == dest_keylen); + assert(src_keylen == 2); + assert(src_vallen == dest_vallen); + assert(src_vallen == 5); + assert(src_type == dest_type); + assert(src_msn.msn == dest_msn.msn); + assert(strcmp(src_key, "x") == 0); + assert(strcmp(dest_key, "x") == 0); + assert(strcmp(src_val, "xval") == 0); + assert(strcmp(dest_val, "xval") == 0); + r = toku_fifo_deq(src_fifo_2); + assert(r==0); + r = toku_fifo_deq(dest_fifo_2); + assert(r==0); + r = toku_fifo_peek(src_fifo_2, &src_key, &src_keylen, &src_val, &src_vallen, &src_type, &src_msn, &src_xids); + assert(r!=0); + r = toku_fifo_peek(dest_fifo_2, &dest_key, &dest_keylen, &dest_val, &dest_vallen, &dest_type, &dest_msn, &dest_xids); + assert(r!=0); + + toku_brtnode_free(&dn); kv_pair_free(sn.childkeys[0]); diff --git a/newbrt/tests/cachetable-all-write.c b/newbrt/tests/cachetable-all-write.c index cd1826c95b8..e9fe12656a8 100644 --- a/newbrt/tests/cachetable-all-write.c +++ b/newbrt/tests/cachetable-all-write.c @@ -37,6 +37,19 @@ fetch (CACHEFILE f __attribute__((__unused__)), return 0; } +static void +pe_est_callback( + void* UU(brtnode_pv), + long* bytes_freed_estimate, + enum partial_eviction_cost *cost, + void* UU(write_extraargs) + ) +{ + *bytes_freed_estimate = 0; + *cost = PE_CHEAP; +} + + static int pe_callback ( void *brtnode_pv __attribute__((__unused__)), @@ -72,11 +85,11 @@ cachetable_test (void) { void* v1; void* v2; long s1, s2; - r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL); + r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL); r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, 8); - r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL, NULL); + r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL, NULL); // usleep (2*1024*1024); - //r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_callback, pf_req_callback, pf_callback, NULL); + //r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, NULL); r = toku_cachetable_unpin(f1, make_blocknum(2), 2, CACHETABLE_CLEAN, 8); diff --git a/newbrt/tests/cachetable-checkpoint-pending.c b/newbrt/tests/cachetable-checkpoint-pending.c index 96e1d5b0528..14d286c5818 100644 --- a/newbrt/tests/cachetable-checkpoint-pending.c +++ b/newbrt/tests/cachetable-checkpoint-pending.c @@ -52,6 +52,18 @@ fetch (CACHEFILE UU(thiscf), int UU(fd), CACHEKEY UU(key), u_int32_t UU(fullhash return 0; } +static void +pe_est_callback( + void* UU(brtnode_pv), + long* bytes_freed_estimate, + enum partial_eviction_cost *cost, + void* UU(write_extraargs) + ) +{ + *bytes_freed_estimate = 0; + *cost = PE_CHEAP; +} + static int pe_callback ( void *brtnode_pv __attribute__((__unused__)), @@ -83,7 +95,7 @@ do_update (void *UU(ignore)) u_int32_t hi = toku_cachetable_hash(cf, key); void *vv; long size; - int r = toku_cachetable_get_and_pin(cf, key, hi, &vv, &size, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0); + int r = toku_cachetable_get_and_pin(cf, key, hi, &vv, &size, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0); //printf("g"); assert(r==0); assert(size==sizeof(int)); @@ -132,7 +144,7 @@ static void checkpoint_pending(void) { CACHEKEY key = make_blocknum(i); u_int32_t hi = toku_cachetable_hash(cf, key); values[i] = 42; - r = toku_cachetable_put(cf, key, hi, &values[i], sizeof(int), flush, pe_callback, 0); + r = toku_cachetable_put(cf, key, hi, &values[i], sizeof(int), flush, pe_est_callback, pe_callback, 0); assert(r == 0); r = toku_cachetable_unpin(cf, key, hi, CACHETABLE_DIRTY, item_size); diff --git a/newbrt/tests/cachetable-checkpoint-test.c b/newbrt/tests/cachetable-checkpoint-test.c index 4a26500ed2e..f6338d585ff 100644 --- a/newbrt/tests/cachetable-checkpoint-test.c +++ b/newbrt/tests/cachetable-checkpoint-test.c @@ -21,6 +21,18 @@ static void flush(CACHEFILE cf, int UU(fd), CACHEKEY key, void *value, void *ext if (keep_me) n_keep_me++; } +static void +pe_est_callback( + void* UU(brtnode_pv), + long* bytes_freed_estimate, + enum partial_eviction_cost *cost, + void* UU(write_extraargs) + ) +{ + *bytes_freed_estimate = 0; + *cost = PE_CHEAP; +} + static int pe_callback ( void *brtnode_pv __attribute__((__unused__)), @@ -74,7 +86,7 @@ static void cachetable_checkpoint_test(int n, enum cachetable_dirty dirty) { for (i=0; i= KEYLIMIT) { @@ -147,7 +159,7 @@ static void test_rename (void) { void *current_value; long current_size; if (verbose) printf("Rename %" PRIx64 " to %" PRIx64 "\n", okey.b, nkey.b); - r = toku_cachetable_get_and_pin(f, okey, toku_cachetable_hash(f, okey), ¤t_value, ¤t_size, r_flush, r_fetch, pe_callback, pf_req_callback, pf_callback, 0, 0); + r = toku_cachetable_get_and_pin(f, okey, toku_cachetable_hash(f, okey), ¤t_value, ¤t_size, r_flush, r_fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0); if (r == -42) continue; assert(r==0); r = toku_cachetable_rename(f, okey, nkey); diff --git a/newbrt/tests/cachetable-scan.c b/newbrt/tests/cachetable-scan.c index f5ac925e225..7b2c39ac548 100644 --- a/newbrt/tests/cachetable-scan.c +++ b/newbrt/tests/cachetable-scan.c @@ -46,6 +46,18 @@ static int f_fetch (CACHEFILE f, return 0; } +static void +pe_est_callback( + void* UU(brtnode_pv), + long* bytes_freed_estimate, + enum partial_eviction_cost *cost, + void* UU(write_extraargs) + ) +{ + *bytes_freed_estimate = 0; + *cost = PE_CHEAP; +} + static int pe_callback ( void *brtnode_pv __attribute__((__unused__)), @@ -88,7 +100,7 @@ static void writeit (void) { u_int32_t fullhash = toku_cachetable_hash(f, key); int j; for (j=0; jkey.b==5); assert(strcmp(((struct item *)item_v)->something,"something")==0); @@ -279,7 +291,7 @@ static void test0 (void) { assert(r==0); expect1(4); did_fetch=make_blocknum(-1); - r=toku_cachetable_get_and_pin(f, make_blocknum(2), toku_cachetable_hash(f, make_blocknum(2)), &item_v, NULL, flush, fetch, pe_callback, pf_req_callback, pf_callback, t3, t3); /* 2p 5P 7U 6P 1P */ + r=toku_cachetable_get_and_pin(f, make_blocknum(2), toku_cachetable_hash(f, make_blocknum(2)), &item_v, NULL, flush, fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, t3, t3); /* 2p 5P 7U 6P 1P */ assert(r==0); assert(did_fetch.b==2); /* Expect that 2 is fetched in. */ assert(((struct item *)item_v)->key.b==2); @@ -355,9 +367,9 @@ static void test_nested_pin (void) { i0=0; i1=0; u_int32_t f1hash = toku_cachetable_hash(f, make_blocknum(1)); - r = toku_cachetable_put(f, make_blocknum(1), f1hash, &i0, 1, flush_n, pe_callback, f2); + r = toku_cachetable_put(f, make_blocknum(1), f1hash, &i0, 1, flush_n, pe_est_callback, pe_callback, f2); assert(r==0); - r = toku_cachetable_get_and_pin(f, make_blocknum(1), f1hash, &vv, NULL, flush_n, fetch_n, pe_callback, pf_req_callback, pf_callback, f2, f2); + r = toku_cachetable_get_and_pin(f, make_blocknum(1), f1hash, &vv, NULL, flush_n, fetch_n, pe_est_callback, pe_callback, pf_req_callback, pf_callback, f2, f2); assert(r==0); assert(vv==&i0); assert(i0==0); @@ -369,7 +381,7 @@ static void test_nested_pin (void) { r = toku_cachetable_unpin(f, make_blocknum(1), f1hash, CACHETABLE_CLEAN, test_object_size); assert(r==0); u_int32_t f2hash = toku_cachetable_hash(f, make_blocknum(2)); - r = toku_cachetable_put(f, make_blocknum(2), f2hash, &i1, test_object_size, flush_n, pe_callback, f2); + r = toku_cachetable_put(f, make_blocknum(2), f2hash, &i1, test_object_size, flush_n, pe_est_callback, pe_callback, f2); assert(r==0); // The other one is pinned, but now the cachetable fails gracefully: It allows the pin to happen r = toku_cachetable_unpin(f, make_blocknum(1), f1hash, CACHETABLE_CLEAN, test_object_size); assert(r==0); @@ -430,12 +442,12 @@ static void test_multi_filehandles (void) { assert(f1==f2); assert(f1!=f3); - r = toku_cachetable_put(f1, make_blocknum(1), toku_cachetable_hash(f1, make_blocknum(1)), (void*)124, test_object_size, null_flush, pe_callback, (void*)123); assert(r==0); - r = toku_cachetable_get_and_pin(f2, make_blocknum(1), toku_cachetable_hash(f2, make_blocknum(1)), &v, NULL, null_flush, add123_fetch, pe_callback, pf_req_callback, pf_callback, (void*)123, (void*)123); assert(r==0); + r = toku_cachetable_put(f1, make_blocknum(1), toku_cachetable_hash(f1, make_blocknum(1)), (void*)124, test_object_size, null_flush, pe_est_callback, pe_callback, (void*)123); assert(r==0); + r = toku_cachetable_get_and_pin(f2, make_blocknum(1), toku_cachetable_hash(f2, make_blocknum(1)), &v, NULL, null_flush, add123_fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, (void*)123, (void*)123); assert(r==0); assert((unsigned long)v==124); - r = toku_cachetable_get_and_pin(f2, make_blocknum(2), toku_cachetable_hash(f2, make_blocknum(2)), &v, NULL, null_flush, add123_fetch, pe_callback, pf_req_callback, pf_callback, (void*)123, (void*)123); assert(r==0); + r = toku_cachetable_get_and_pin(f2, make_blocknum(2), toku_cachetable_hash(f2, make_blocknum(2)), &v, NULL, null_flush, add123_fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, (void*)123, (void*)123); assert(r==0); assert((unsigned long)v==125); - r = toku_cachetable_get_and_pin(f3, make_blocknum(2), toku_cachetable_hash(f3, make_blocknum(2)), &v, NULL, null_flush, add222_fetch, pe_callback, pf_req_callback, pf_callback, (void*)222, (void*)222); assert(r==0); + r = toku_cachetable_get_and_pin(f3, make_blocknum(2), toku_cachetable_hash(f3, make_blocknum(2)), &v, NULL, null_flush, add222_fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, (void*)222, (void*)222); assert(r==0); assert((unsigned long)v==224); r = toku_cachetable_unpin(f1, make_blocknum(1), toku_cachetable_hash(f1, make_blocknum(1)), CACHETABLE_CLEAN, 0); assert(r==0); @@ -493,7 +505,7 @@ static void test_dirty(void) { key = make_blocknum(1); value = (void*)1; u_int32_t hkey = toku_cachetable_hash(f, key); - r = toku_cachetable_put(f, key, hkey, value, test_object_size, test_dirty_flush, pe_callback, 0); + r = toku_cachetable_put(f, key, hkey, value, test_object_size, test_dirty_flush, pe_est_callback, pe_callback, 0); assert(r == 0); // cachetable_print_state(t); @@ -510,7 +522,7 @@ static void test_dirty(void) { assert(pinned == 0); r = toku_cachetable_get_and_pin(f, key, hkey, &value, NULL, test_dirty_flush, - test_dirty_fetch, pe_callback, pf_req_callback, pf_callback, 0, 0); + test_dirty_fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0); assert(r == 0); // cachetable_print_state(t); @@ -532,7 +544,7 @@ static void test_dirty(void) { hkey = toku_cachetable_hash(f, key); r = toku_cachetable_get_and_pin(f, key, hkey, &value, NULL, test_dirty_flush, - test_dirty_fetch, pe_callback, pf_req_callback, pf_callback, 0, 0); + test_dirty_fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0); assert(r == 0); // cachetable_print_state(t); @@ -552,7 +564,7 @@ static void test_dirty(void) { r = toku_cachetable_get_and_pin(f, key, hkey, &value, NULL, test_dirty_flush, - test_dirty_fetch, pe_callback, pf_req_callback, pf_callback, 0, 0); + test_dirty_fetch, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0); assert(r == 0); // cachetable_print_state(t); @@ -623,7 +635,7 @@ static void test_size_resize(void) { u_int32_t hkey = toku_cachetable_hash(f, key); - r = toku_cachetable_put(f, key, hkey, value, size, test_size_flush_callback, pe_callback, 0); + r = toku_cachetable_put(f, key, hkey, value, size, test_size_flush_callback, pe_est_callback, pe_callback, 0); assert(r == 0); void *entry_value; int dirty; long long pinned; long entry_size; @@ -640,7 +652,7 @@ static void test_size_resize(void) { void *current_value; long current_size; - r = toku_cachetable_get_and_pin(f, key, hkey, ¤t_value, ¤t_size, test_size_flush_callback, 0, pe_callback, pf_req_callback, pf_callback, 0, 0); + r = toku_cachetable_get_and_pin(f, key, hkey, ¤t_value, ¤t_size, test_size_flush_callback, 0, pe_est_callback, pe_callback, pf_req_callback, pf_callback, 0, 0); assert(r == 0); assert(current_value == value); assert(current_size == new_size); @@ -686,7 +698,7 @@ static void test_size_flush(void) { void *value = (void *)(long)-i; // printf("test_size put %lld %p %lld\n", key, value, size); u_int32_t hkey = toku_cachetable_hash(f, key); - r = toku_cachetable_put(f, key, hkey, value, size, test_size_flush_callback, pe_callback, 0); + r = toku_cachetable_put(f, key, hkey, value, size, test_size_flush_callback, pe_est_callback, pe_callback, 0); assert(r == 0); int n_entries, hash_size; long size_current, size_limit, size_max; diff --git a/newbrt/tests/cachetable-test2.c b/newbrt/tests/cachetable-test2.c index 30e8712a47e..876c8c5977a 100644 --- a/newbrt/tests/cachetable-test2.c +++ b/newbrt/tests/cachetable-test2.c @@ -119,6 +119,18 @@ static int fetch_forchain (CACHEFILE f, int UU(fd), CACHEKEY key, u_int32_t full return 0; } +static void +pe_est_callback( + void* UU(brtnode_pv), + long* bytes_freed_estimate, + enum partial_eviction_cost *cost, + void* UU(write_extraargs) + ) +{ + *bytes_freed_estimate = 0; + *cost = PE_CHEAP; +} + static int pe_callback ( void *brtnode_pv __attribute__((__unused__)), @@ -183,7 +195,7 @@ static void test_chaining (void) { int fnum = i%N_FILES; //printf("%s:%d Add %d\n", __FILE__, __LINE__, i); u_int32_t fhash = toku_cachetable_hash(f[fnum], make_blocknum(i)); - r = toku_cachetable_put(f[fnum], make_blocknum(i), fhash, (void*)i, test_object_size, flush_forchain, pe_callback, (void*)i); + r = toku_cachetable_put(f[fnum], make_blocknum(i), fhash, (void*)i, test_object_size, flush_forchain, pe_est_callback, pe_callback, (void*)i); assert(r==0); item_becomes_present(ct, f[fnum], make_blocknum(i)); r = toku_cachetable_unpin(f[fnum], make_blocknum(i), fhash, CACHETABLE_CLEAN, test_object_size); @@ -211,6 +223,7 @@ static void test_chaining (void) { NULL, flush_forchain, fetch_forchain, + pe_est_callback, pe_callback, pf_req_callback, pf_callback, @@ -231,7 +244,7 @@ static void test_chaining (void) { // if i is a duplicate, cachetable_put will return -1 // printf("%s:%d Add {%ld,%p}\n", __FILE__, __LINE__, i, f[fnum]); u_int32_t fhash = toku_cachetable_hash(f[fnum], make_blocknum(i)); - r = toku_cachetable_put(f[fnum], make_blocknum(i), fhash, (void*)i, test_object_size, flush_forchain, pe_callback, (void*)i); + r = toku_cachetable_put(f[fnum], make_blocknum(i), fhash, (void*)i, test_object_size, flush_forchain, pe_est_callback, pe_callback, (void*)i); assert(r==0 || r==-1); if (r==0) { item_becomes_present(ct, f[fnum], make_blocknum(i)); diff --git a/newbrt/tests/cachetable-unpin-and-remove-test.c b/newbrt/tests/cachetable-unpin-and-remove-test.c index a9fd1fbadc7..b4329cba637 100644 --- a/newbrt/tests/cachetable-unpin-and-remove-test.c +++ b/newbrt/tests/cachetable-unpin-and-remove-test.c @@ -31,6 +31,18 @@ fetch (CACHEFILE f __attribute__((__unused__)), return 0; } +static void +pe_est_callback( + void* UU(brtnode_pv), + long* bytes_freed_estimate, + enum partial_eviction_cost *cost, + void* UU(write_extraargs) + ) +{ + *bytes_freed_estimate = 0; + *cost = PE_CHEAP; +} + static int pe_callback ( void *brtnode_pv __attribute__((__unused__)), @@ -76,7 +88,7 @@ cachetable_unpin_and_remove_test (int n) { // put the keys into the cachetable for (i=0; i +#include + +#include +#include +#include +#include +#include + + +// +// This test verifies that running evictions on a writer thread +// are ok. We create a dictionary bigger than the cachetable (around 4x greater). +// Then, we spawn a bunch of pthreads that do the following: +// - scan dictionary forward with bulk fetch +// - scan dictionary forward slowly +// - scan dictionary backward with bulk fetch +// - scan dictionary backward slowly +// - update existing values in the dictionary with db->put(DB_YESOVERWRITE) +// With the small cachetable, this should produce quite a bit of churn in reading in and evicting nodes. +// If the test runs to completion without crashing, we consider it a success. +// + +BOOL run_test; +int time_of_test; + +struct arg { + int n; + DB *db; + DB_ENV* env; + BOOL fast; + BOOL fwd; +}; + +static int +go_fast(DBT const *a, DBT const *b, void *c) { + assert(a); + assert(b); + assert(c==NULL); + return TOKUDB_CURSOR_CONTINUE; +} +static int +go_slow(DBT const *a, DBT const *b, void *c) { + assert(a); + assert(b); + assert(c==NULL); + return 0; +} + +static void *scan_db(void *arg) { + struct arg *myarg = (struct arg *) arg; + DB_ENV* env = myarg->env; + DB* db = myarg->db; + DB_TXN* txn = NULL; + int r = env->txn_begin(env, 0, &txn, DB_READ_UNCOMMITTED); CKERR(r); + while(run_test) { + DBC* cursor = NULL; + CHK(db->cursor(db, txn, &cursor, 0)); + while (r != DB_NOTFOUND) { + if (myarg->fwd) { + r = cursor->c_getf_next(cursor, 0, myarg->fast ? go_fast : go_slow, NULL); + } + else { + r = cursor->c_getf_prev(cursor, 0, myarg->fast ? go_fast : go_slow, NULL); + } + assert(r==0 || r==DB_NOTFOUND); + } + + CHK(cursor->c_close(cursor)); + } + CHK(txn->commit(txn,0)); + return arg; +} + +static void *ptquery_db(void *arg) { + struct arg *myarg = (struct arg *) arg; + DB_ENV* env = myarg->env; + DB* db = myarg->db; + DB_TXN* txn = NULL; + int n = myarg->n; + int r = env->txn_begin(env, 0, &txn, DB_READ_UNCOMMITTED); CKERR(r); + while(run_test) { + int rand_key = random() % n; + DBT key; + DBT val; + memset(&val, 0, sizeof(val)); + dbt_init(&key, &rand_key, sizeof(rand_key)); + r = db->get(db, txn, &key, &val, 0); + assert(r != DB_NOTFOUND); + } + CHK(txn->commit(txn,0)); + return arg; +} + +static void *update_db(void *arg) { + struct arg *myarg = (struct arg *) arg; + DB_ENV* env = myarg->env; + DB* db = myarg->db; + int n = myarg->n; + + DB_TXN* txn = NULL; + while (run_test) { + int r = env->txn_begin(env, 0, &txn, DB_READ_UNCOMMITTED); CKERR(r); + for (u_int32_t i = 0; i < 1000; i++) { + int rand_key = random() % n; + int rand_val = random(); + DBT key, val; + r = db->put( + db, + txn, + dbt_init(&key, &rand_key, sizeof(rand_key)), + dbt_init(&val, &rand_val, sizeof(rand_val)), + 0 + ); + CKERR(r); + } + CHK(txn->commit(txn,0)); + } + return arg; +} + +static void *test_time(void *arg) { + assert(arg == NULL); + usleep(time_of_test*1000*1000); + printf("should now end test\n"); + run_test = FALSE; + return arg; +} + + +static void +test_evictions (int nseconds) { + int n = 100000; + if (verbose) printf("test_rand_insert:%d \n", n); + + DB_TXN * const null_txn = 0; + const char * const fname = "test.bulk_fetch.brt"; + int r; + + r = system("rm -rf " ENVDIR); + CKERR(r); + r=toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0); + + /* create the dup database file */ + DB_ENV *env; + r = db_env_create(&env, 0); assert(r == 0); + r=env->set_default_bt_compare(env, int_dbt_cmp); CKERR(r); + // set the cache size to 10MB + r = env->set_cachesize(env, 0, 100000, 1); CKERR(r); + r=env->open(env, ENVDIR, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); + + DB *db; + r = db_create(&db, env, 0); + assert(r == 0); + r = db->set_flags(db, 0); + assert(r == 0); + r = db->set_pagesize(db, 4096); + assert(r == 0); + r = db->set_readpagesize(db, 1024); + assert(r == 0); + r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666); + assert(r == 0); + + int keys[n]; + for (int i=0; iput(db, null_txn, dbt_init(&key, &keys[i], sizeof keys[i]), dbt_init(&val, &i, sizeof i), 0); + assert(r == 0); + } + + // + // the threads that we want: + // - one thread constantly updating random values + // - one thread doing table scan with bulk fetch + // - one thread doing table scan without bulk fetch + // - one thread doing random point queries + // + toku_pthread_t mytids[7]; + struct arg myargs[7]; + for (u_int32_t i = 0; i < sizeof(myargs)/sizeof(myargs[0]); i++) { + myargs[i].n = n; + myargs[i].db = db; + myargs[i].env = env; + myargs[i].fast = TRUE; + myargs[i].fwd = TRUE; + } + + // make the forward fast scanner + myargs[0].fast = TRUE; + myargs[0].fwd = TRUE; + CHK(toku_pthread_create(&mytids[0], NULL, scan_db, &myargs[0])); + + // make the forward slow scanner + myargs[1].fast = FALSE; + myargs[1].fwd = TRUE; + CHK(toku_pthread_create(&mytids[1], NULL, scan_db, &myargs[1])); + + // make the backward fast scanner + myargs[2].fast = TRUE; + myargs[2].fwd = FALSE; + CHK(toku_pthread_create(&mytids[2], NULL, scan_db, &myargs[2])); + + // make the backward slow scanner + myargs[3].fast = FALSE; + myargs[3].fwd = FALSE; + CHK(toku_pthread_create(&mytids[3], NULL, scan_db, &myargs[3])); + + // make the guy that updates the db + CHK(toku_pthread_create(&mytids[4], NULL, update_db, &myargs[4])); + + // make the guy that does point queries + CHK(toku_pthread_create(&mytids[5], NULL, ptquery_db, &myargs[5])); + + run_test = TRUE; + time_of_test = nseconds; + // make the guy that sleeps + CHK(toku_pthread_create(&mytids[6], NULL, test_time, NULL)); + + for (u_int32_t i = 0; i < sizeof(myargs)/sizeof(myargs[0]); i++) { + void *ret; + r = toku_pthread_join(mytids[i], &ret); assert_zero(r); + } + + + r = db->close(db, 0); CKERR(r); + r = env->close(env, 0); CKERR(r); +} + +int +test_main(int argc, char *const argv[]) { + parse_args(argc, argv); + test_evictions(60); + return 0; +}