mirror of
https://github.com/MariaDB/server.git
synced 2025-01-22 06:44:16 +01:00
refs #3634, separate ct lock from workqueue lock, remove usage of workqueues
git-svn-id: file:///svn/toku/tokudb@45658 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
parent
9fd9a26e62
commit
c5d5cf7646
1 changed files with 68 additions and 82 deletions
150
ft/cachetable.c
150
ft/cachetable.c
|
@ -115,8 +115,6 @@ struct ctpair {
|
|||
|
||||
struct nb_mutex value_nb_mutex; // single writer, protects value_data
|
||||
struct nb_mutex disk_nb_mutex; // single writer, protects disk_data, is used for writing cloned nodes for checkpoint
|
||||
struct workitem asyncwork; // work item for the worker threads
|
||||
struct workitem checkpoint_asyncwork; // work item for the worker threads
|
||||
struct toku_list next_for_cachefile; // link in the cachefile list
|
||||
};
|
||||
|
||||
|
@ -154,13 +152,11 @@ struct cachetable {
|
|||
int64_t size_evicting; // the sum of the sizes of the pairs being written
|
||||
int64_t size_max; // high water mark of size_current (max value size_current ever had)
|
||||
TOKULOGGER logger;
|
||||
toku_mutex_t *mutex; // coarse lock that protects the cachetable, the cachefiles, and the pairs
|
||||
struct workqueue wq; // async work queue
|
||||
THREADPOOL threadpool; // pool of worker threads
|
||||
struct workqueue checkpoint_wq;
|
||||
THREADPOOL checkpoint_threadpool;
|
||||
toku_mutex_t mutex; // coarse lock that protects the cachetable, the cachefiles, and the pairs
|
||||
|
||||
KIBBUTZ kibbutz; // another pool of worker threads and jobs to do asynchronously.
|
||||
KIBBUTZ client_kibbutz; // pool of worker threads and jobs to do asynchronously for the client.
|
||||
KIBBUTZ ct_kibbutz; // pool of worker threads and jobs to do asynchronously for the cachetable
|
||||
KIBBUTZ checkpointing_kibbutz; // small pool for checkpointing cloned pairs
|
||||
|
||||
LSN lsn_of_checkpoint_in_progress;
|
||||
// Variables used to detect threadsafety bugs are declared volatile to prevent compiler from using thread-local cache.
|
||||
|
@ -231,14 +227,14 @@ toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
|
|||
// verifying that we just incremented it with the matching BEGIN macro.
|
||||
#define END_CRITICAL_REGION {invariant(ct->checkpoint_prohibited > 0); __sync_fetch_and_sub(&ct->checkpoint_prohibited, 1);}
|
||||
|
||||
// Lock the cachetable. Used for a variety of purposes. TODO: like what?
|
||||
// Lock the cachetable. Used for a variety of purposes.
|
||||
static inline void cachetable_lock(CACHETABLE ct __attribute__((unused))) {
|
||||
toku_mutex_lock(ct->mutex);
|
||||
toku_mutex_lock(&ct->mutex);
|
||||
}
|
||||
|
||||
// Unlock the cachetable
|
||||
static inline void cachetable_unlock(CACHETABLE ct __attribute__((unused))) {
|
||||
toku_mutex_unlock(ct->mutex);
|
||||
toku_mutex_unlock(&ct->mutex);
|
||||
}
|
||||
|
||||
// Wait for cache table space to become available
|
||||
|
@ -247,7 +243,7 @@ static inline void cachetable_unlock(CACHETABLE ct __attribute__((unused))) {
|
|||
static inline void cachetable_wait_write(CACHETABLE ct) {
|
||||
// if we're writing more than half the data in the cachetable
|
||||
while (2*ct->size_evicting > ct->size_current) {
|
||||
toku_cond_wait(&ct->flow_control_cond, ct->mutex);
|
||||
toku_cond_wait(&ct->flow_control_cond, &ct->mutex);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -311,7 +307,7 @@ void cachefile_kibbutz_enq (CACHEFILE cf, void (*f)(void*), void *extra)
|
|||
// at a time when the manager is accepting background jobs, otherwise
|
||||
// the client is screwing up
|
||||
assert_zero(r);
|
||||
toku_kibbutz_enq(cf->cachetable->kibbutz, f, extra);
|
||||
toku_kibbutz_enq(cf->cachetable->client_kibbutz, f, extra);
|
||||
}
|
||||
|
||||
static int
|
||||
|
@ -394,11 +390,13 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN UU(initial_l
|
|||
ct->size_limit = size_limit;
|
||||
ct->size_reserved = unreservable_memory(size_limit);
|
||||
ct->logger = logger;
|
||||
toku_init_workers(&ct->wq, &ct->threadpool, 1);
|
||||
toku_init_workers(&ct->checkpoint_wq, &ct->checkpoint_threadpool, 8);
|
||||
ct->mutex = workqueue_lock_ref(&ct->wq);
|
||||
toku_mutex_init(&ct->mutex, NULL);
|
||||
|
||||
ct->kibbutz = toku_kibbutz_create(toku_os_get_number_active_processors());
|
||||
int num_processors = toku_os_get_number_active_processors();
|
||||
ct->client_kibbutz = toku_kibbutz_create(num_processors);
|
||||
ct->ct_kibbutz = toku_kibbutz_create(2*num_processors);
|
||||
int checkpointing_nworkers = (num_processors/4) ? num_processors/4 : 1;
|
||||
ct->checkpointing_kibbutz = toku_kibbutz_create(checkpointing_nworkers);
|
||||
|
||||
toku_minicron_setup(&ct->checkpointer, 0, checkpoint_thread, ct); // default is no checkpointing
|
||||
toku_minicron_setup(&ct->cleaner, 0, toku_cleaner_thread, ct); // default is no cleaner, for now
|
||||
|
@ -585,10 +583,9 @@ int toku_cachetable_openf (CACHEFILE *cfptr, CACHETABLE ct, const char *fname_in
|
|||
return r;
|
||||
}
|
||||
|
||||
void toku_cachefile_get_workqueue_load (CACHEFILE cf, int *n_in_queue, int *n_threads) {
|
||||
CACHETABLE ct = cf->cachetable;
|
||||
*n_in_queue = workqueue_n_in_queue(&ct->wq, 1);
|
||||
*n_threads = toku_thread_pool_get_current_threads(ct->threadpool);
|
||||
void toku_cachefile_get_workqueue_load (CACHEFILE UU(cf), int *n_in_queue, int *n_threads) {
|
||||
*n_in_queue = 0;
|
||||
*n_threads = 0;
|
||||
}
|
||||
|
||||
//Test-only function
|
||||
|
@ -999,7 +996,7 @@ static void cachetable_only_write_locked_data(
|
|||
static void cachetable_write_locked_pair(CACHETABLE ct, PAIR p) {
|
||||
PAIR_ATTR old_attr = p->attr;
|
||||
PAIR_ATTR new_attr = p->attr;
|
||||
rwlock_read_lock(&ct->pending_lock, ct->mutex);
|
||||
rwlock_read_lock(&ct->pending_lock, &ct->mutex);
|
||||
BOOL for_checkpoint = p->checkpoint_pending;
|
||||
p->checkpoint_pending = FALSE;
|
||||
// grabbing the disk_nb_mutex here ensures that
|
||||
|
@ -1007,7 +1004,7 @@ static void cachetable_write_locked_pair(CACHETABLE ct, PAIR p) {
|
|||
// if we grab the disk_nb_mutex inside the if clause,
|
||||
// then we may try to evict a PAIR that is in the process
|
||||
// of having its clone be written out
|
||||
nb_mutex_lock(&p->disk_nb_mutex, ct->mutex);
|
||||
nb_mutex_lock(&p->disk_nb_mutex, &ct->mutex);
|
||||
// make sure that assumption about cloned_value_data is true
|
||||
// if we have grabbed the disk_nb_mutex, then that means that
|
||||
// there should be no cloned value data
|
||||
|
@ -1059,8 +1056,8 @@ static void cachetable_evict_pair(CACHETABLE ct, PAIR p) {
|
|||
}
|
||||
|
||||
// Worker thread function to writes and evicts a pair from memory to its cachefile
|
||||
static void cachetable_evicter(WORKITEM wi) {
|
||||
PAIR p = (PAIR) workitem_arg(wi);
|
||||
static void cachetable_evicter(void* extra) {
|
||||
PAIR p = (PAIR)extra;
|
||||
CACHEFILE cf = p->cachefile;
|
||||
CACHETABLE ct = cf->cachetable;
|
||||
cachetable_lock(ct);
|
||||
|
@ -1080,7 +1077,7 @@ static void try_evict_pair(CACHETABLE ct, PAIR p) {
|
|||
// must check for before we grab the write lock because we may
|
||||
// be trying to evict something this thread is trying to read
|
||||
if (!nb_mutex_users(&p->value_nb_mutex)) {
|
||||
nb_mutex_lock(&p->value_nb_mutex, ct->mutex);
|
||||
nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
|
||||
|
||||
assert(ct->size_evicting >= 0);
|
||||
ct->size_evicting += p->attr.size;
|
||||
|
@ -1096,10 +1093,7 @@ static void try_evict_pair(CACHETABLE ct, PAIR p) {
|
|||
bjm_remove_background_job(cf->bjm);
|
||||
}
|
||||
else {
|
||||
WORKITEM wi = &p->asyncwork;
|
||||
//responsibility of cachetable_evicter to remove background job
|
||||
workitem_init(wi, cachetable_evicter, p);
|
||||
workqueue_enq(&ct->wq, wi, 0);
|
||||
toku_kibbutz_enq(ct->ct_kibbutz, cachetable_evicter, p);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1121,8 +1115,8 @@ static void do_partial_eviction(CACHETABLE ct, PAIR p) {
|
|||
nb_mutex_unlock(&p->value_nb_mutex);
|
||||
}
|
||||
|
||||
static void cachetable_partial_eviction(WORKITEM wi) {
|
||||
PAIR p = (PAIR) workitem_arg(wi);
|
||||
static void cachetable_partial_eviction(void* extra) {
|
||||
PAIR p = (PAIR)extra;
|
||||
CACHEFILE cf = p->cachefile;
|
||||
CACHETABLE ct = cf->cachetable;
|
||||
cachetable_lock(ct);
|
||||
|
@ -1147,7 +1141,7 @@ static bool run_eviction_on_pair(PAIR curr_in_clock, CACHETABLE ct) {
|
|||
if (curr_in_clock->count > 0) {
|
||||
curr_in_clock->count--;
|
||||
// call the partial eviction callback
|
||||
nb_mutex_lock(&curr_in_clock->value_nb_mutex, ct->mutex);
|
||||
nb_mutex_lock(&curr_in_clock->value_nb_mutex, &ct->mutex);
|
||||
|
||||
void *value = curr_in_clock->value_data;
|
||||
void* disk_data = curr_in_clock->disk_data;
|
||||
|
@ -1172,10 +1166,7 @@ static bool run_eviction_on_pair(PAIR curr_in_clock, CACHETABLE ct) {
|
|||
if (bytes_freed_estimate > 0) {
|
||||
curr_in_clock->size_evicting_estimate = bytes_freed_estimate;
|
||||
ct->size_evicting += bytes_freed_estimate;
|
||||
WORKITEM wi = &curr_in_clock->asyncwork;
|
||||
// responsibility of cachetable_partial_eviction to remove background job
|
||||
workitem_init(wi, cachetable_partial_eviction, curr_in_clock);
|
||||
workqueue_enq(&ct->wq, wi, 0);
|
||||
toku_kibbutz_enq(ct->ct_kibbutz, cachetable_partial_eviction, curr_in_clock);
|
||||
}
|
||||
else {
|
||||
nb_mutex_unlock(&curr_in_clock->value_nb_mutex);
|
||||
|
@ -1347,7 +1338,7 @@ static int cachetable_put_internal(
|
|||
CACHETABLE_DIRTY
|
||||
);
|
||||
assert(p);
|
||||
nb_mutex_lock(&p->value_nb_mutex, ct->mutex);
|
||||
nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
|
||||
//note_hash_count(count);
|
||||
return 0;
|
||||
}
|
||||
|
@ -1405,8 +1396,8 @@ clone_pair(CACHETABLE ct, PAIR p) {
|
|||
ct->size_current += p->cloned_value_size;
|
||||
}
|
||||
|
||||
static void checkpoint_cloned_pair(WORKITEM wi) {
|
||||
PAIR p = (PAIR) workitem_arg(wi);
|
||||
static void checkpoint_cloned_pair(void* extra) {
|
||||
PAIR p = (PAIR)extra;
|
||||
CACHETABLE ct = p->cachefile->cachetable;
|
||||
cachetable_lock(ct);
|
||||
PAIR_ATTR new_attr;
|
||||
|
@ -1427,9 +1418,7 @@ static void checkpoint_cloned_pair(WORKITEM wi) {
|
|||
|
||||
static void
|
||||
checkpoint_cloned_pair_on_writer_thread(CACHETABLE ct, PAIR p) {
|
||||
WORKITEM wi = &p->checkpoint_asyncwork;
|
||||
workitem_init(wi, checkpoint_cloned_pair, p);
|
||||
workqueue_enq(&ct->checkpoint_wq, wi, 1);
|
||||
toku_kibbutz_enq(ct->checkpointing_kibbutz, checkpoint_cloned_pair, p);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1446,7 +1435,7 @@ write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p)
|
|||
{
|
||||
if (p->dirty && p->checkpoint_pending) {
|
||||
if (p->clone_callback) {
|
||||
nb_mutex_lock(&p->disk_nb_mutex, ct->mutex);
|
||||
nb_mutex_lock(&p->disk_nb_mutex, &ct->mutex);
|
||||
assert(!p->cloned_value_data);
|
||||
clone_pair(ct, p);
|
||||
assert(p->cloned_value_data);
|
||||
|
@ -1489,10 +1478,10 @@ write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p)
|
|||
static void
|
||||
write_pair_for_checkpoint_thread (CACHETABLE ct, PAIR p)
|
||||
{
|
||||
nb_mutex_lock(&p->value_nb_mutex, ct->mutex); // grab an exclusive lock on the pair
|
||||
nb_mutex_lock(&p->value_nb_mutex, &ct->mutex); // grab an exclusive lock on the pair
|
||||
if (p->dirty && p->checkpoint_pending) {
|
||||
if (p->clone_callback) {
|
||||
nb_mutex_lock(&p->disk_nb_mutex, ct->mutex);
|
||||
nb_mutex_lock(&p->disk_nb_mutex, &ct->mutex);
|
||||
assert(!p->cloned_value_data);
|
||||
clone_pair(ct, p);
|
||||
assert(p->cloned_value_data);
|
||||
|
@ -1702,7 +1691,7 @@ do_partial_fetch(
|
|||
// so we do a sanity check here.
|
||||
assert(!p->dirty);
|
||||
|
||||
nb_mutex_lock(&p->disk_nb_mutex, ct->mutex);
|
||||
nb_mutex_lock(&p->disk_nb_mutex, &ct->mutex);
|
||||
cachetable_unlock(ct);
|
||||
int r = pf_callback(p->value_data, p->disk_data, read_extraargs, cachefile->fd, &new_attr);
|
||||
lazy_assert_zero(r);
|
||||
|
@ -1726,18 +1715,19 @@ void toku_cachetable_pf_pinned_pair(
|
|||
{
|
||||
PAIR_ATTR attr;
|
||||
PAIR p = NULL;
|
||||
cachetable_lock(cf->cachetable);
|
||||
CACHETABLE ct = cf->cachetable;
|
||||
cachetable_lock(ct);
|
||||
int r = cachetable_get_pair(cf, key, fullhash, &p);
|
||||
assert_zero(r);
|
||||
assert(p->value_data == value);
|
||||
assert(nb_mutex_writers(&p->value_nb_mutex));
|
||||
nb_mutex_lock(&p->disk_nb_mutex, cf->cachetable->mutex);
|
||||
nb_mutex_lock(&p->disk_nb_mutex, &ct->mutex);
|
||||
int fd = cf->fd;
|
||||
cachetable_unlock(cf->cachetable);
|
||||
cachetable_unlock(ct);
|
||||
pf_callback(value, p->disk_data, read_extraargs, fd, &attr);
|
||||
cachetable_lock(cf->cachetable);
|
||||
cachetable_lock(ct);
|
||||
nb_mutex_unlock(&p->disk_nb_mutex);
|
||||
cachetable_unlock(cf->cachetable);
|
||||
cachetable_unlock(ct);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1802,7 +1792,7 @@ static void cachetable_fetch_pair(
|
|||
// FIXME this should be enum cachetable_dirty, right?
|
||||
int dirty = 0;
|
||||
|
||||
nb_mutex_lock(&p->disk_nb_mutex, ct->mutex);
|
||||
nb_mutex_lock(&p->disk_nb_mutex, &ct->mutex);
|
||||
cachetable_unlock(ct);
|
||||
|
||||
int r;
|
||||
|
@ -1907,7 +1897,7 @@ int toku_cachetable_get_and_pin_with_dep_pairs (
|
|||
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
|
||||
if (p->key.b==key.b && p->cachefile==cachefile) {
|
||||
// still have the cachetable lock
|
||||
nb_mutex_lock(&p->value_nb_mutex, ct->mutex);
|
||||
nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
|
||||
pair_touch(p);
|
||||
if (may_modify_value) {
|
||||
checkpoint_pair_and_dependent_pairs(
|
||||
|
@ -1962,7 +1952,7 @@ int toku_cachetable_get_and_pin_with_dep_pairs (
|
|||
CACHETABLE_CLEAN
|
||||
);
|
||||
assert(p);
|
||||
nb_mutex_lock(&p->value_nb_mutex, ct->mutex);
|
||||
nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
|
||||
if (may_modify_value) {
|
||||
checkpoint_pair_and_dependent_pairs(
|
||||
ct,
|
||||
|
@ -2014,7 +2004,7 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3
|
|||
nb_mutex_users(&p->value_nb_mutex) == 0
|
||||
) {
|
||||
// because nb_mutex_users is 0, this is fast
|
||||
nb_mutex_lock(&p->value_nb_mutex, ct->mutex);
|
||||
nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
|
||||
*value = p->value_data;
|
||||
pair_touch(p);
|
||||
r = 0;
|
||||
|
@ -2040,7 +2030,7 @@ int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key,
|
|||
nb_mutex_users(&p->value_nb_mutex) == 0
|
||||
) {
|
||||
// because nb_mutex_users is 0, this is fast
|
||||
nb_mutex_lock(&p->value_nb_mutex, ct->mutex);
|
||||
nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
|
||||
*value = p->value_data;
|
||||
r = 0;
|
||||
}
|
||||
|
@ -2145,7 +2135,7 @@ int toku_cachetable_get_and_pin_nonblocking (
|
|||
if (!nb_mutex_writers(&p->value_nb_mutex) &&
|
||||
(!may_modify_value || resolve_checkpointing_fast(p)))
|
||||
{
|
||||
nb_mutex_lock(&p->value_nb_mutex, ct->mutex);
|
||||
nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
|
||||
if (may_modify_value && p->checkpoint_pending) {
|
||||
write_locked_pair_for_checkpoint(ct, p);
|
||||
}
|
||||
|
@ -2178,7 +2168,7 @@ int toku_cachetable_get_and_pin_nonblocking (
|
|||
else {
|
||||
run_unlockers(unlockers); // The contract says the unlockers are run with the ct lock being held.
|
||||
// Now wait for the I/O to occur.
|
||||
nb_mutex_lock(&p->value_nb_mutex, ct->mutex);
|
||||
nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
|
||||
if (may_modify_value && p->checkpoint_pending) {
|
||||
write_locked_pair_for_checkpoint(ct, p);
|
||||
}
|
||||
|
@ -2202,7 +2192,7 @@ int toku_cachetable_get_and_pin_nonblocking (
|
|||
CACHETABLE_CLEAN
|
||||
);
|
||||
assert(p);
|
||||
nb_mutex_lock(&p->value_nb_mutex, ct->mutex);
|
||||
nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
|
||||
run_unlockers(unlockers); // we hold the ct mutex.
|
||||
u_int64_t t0 = get_tnow();
|
||||
cachetable_fetch_pair(ct, cf, p, fetch_callback, read_extraargs, FALSE);
|
||||
|
@ -2225,8 +2215,8 @@ struct cachefile_partial_prefetch_args {
|
|||
};
|
||||
|
||||
// Worker thread function to read a pair from a cachefile to memory
|
||||
static void cachetable_reader(WORKITEM wi) {
|
||||
struct cachefile_prefetch_args* cpargs = (struct cachefile_prefetch_args *) workitem_arg(wi);
|
||||
static void cachetable_reader(void* extra) {
|
||||
struct cachefile_prefetch_args* cpargs = (struct cachefile_prefetch_args*)extra;
|
||||
CACHEFILE cf = cpargs->p->cachefile;
|
||||
CACHETABLE ct = cf->cachetable;
|
||||
cachetable_lock(ct);
|
||||
|
@ -2243,8 +2233,8 @@ static void cachetable_reader(WORKITEM wi) {
|
|||
toku_free(cpargs);
|
||||
}
|
||||
|
||||
static void cachetable_partial_reader(WORKITEM wi) {
|
||||
struct cachefile_partial_prefetch_args *cpargs = (struct cachefile_partial_prefetch_args *) workitem_arg(wi);
|
||||
static void cachetable_partial_reader(void* extra) {
|
||||
struct cachefile_partial_prefetch_args *cpargs = (struct cachefile_partial_prefetch_args*)extra;
|
||||
CACHEFILE cf = cpargs->p->cachefile;
|
||||
CACHETABLE ct = cf->cachetable;
|
||||
cachetable_lock(ct);
|
||||
|
@ -2294,20 +2284,19 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
|
|||
CACHETABLE_CLEAN
|
||||
);
|
||||
assert(p);
|
||||
nb_mutex_lock(&p->value_nb_mutex, ct->mutex);
|
||||
nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
|
||||
struct cachefile_prefetch_args *MALLOC(cpargs);
|
||||
cpargs->p = p;
|
||||
cpargs->fetch_callback = fetch_callback;
|
||||
cpargs->read_extraargs = read_extraargs;
|
||||
workitem_init(&p->asyncwork, cachetable_reader, cpargs);
|
||||
workqueue_enq(&ct->wq, &p->asyncwork, 0);
|
||||
toku_kibbutz_enq(ct->ct_kibbutz, cachetable_reader, cpargs);
|
||||
if (doing_prefetch) {
|
||||
*doing_prefetch = TRUE;
|
||||
}
|
||||
}
|
||||
else if (nb_mutex_users(&p->value_nb_mutex)==0) {
|
||||
// nobody else is using the node, so we should go ahead and prefetch
|
||||
nb_mutex_lock(&p->value_nb_mutex, ct->mutex);
|
||||
nb_mutex_lock(&p->value_nb_mutex, &ct->mutex);
|
||||
BOOL partial_fetch_required = pf_req_callback(p->value_data, read_extraargs);
|
||||
|
||||
if (partial_fetch_required) {
|
||||
|
@ -2317,8 +2306,7 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
|
|||
cpargs->p = p;
|
||||
cpargs->pf_callback = pf_callback;
|
||||
cpargs->read_extraargs = read_extraargs;
|
||||
workitem_init(&p->asyncwork, cachetable_partial_reader, cpargs);
|
||||
workqueue_enq(&ct->wq, &p->asyncwork, 0);
|
||||
toku_kibbutz_enq(ct->ct_kibbutz, cachetable_partial_reader, cpargs);
|
||||
if (doing_prefetch) {
|
||||
*doing_prefetch = TRUE;
|
||||
}
|
||||
|
@ -2407,8 +2395,8 @@ struct pair_flush_for_close{
|
|||
BACKGROUND_JOB_MANAGER bjm;
|
||||
};
|
||||
|
||||
static void cachetable_flush_pair_for_close(WORKITEM wi) {
|
||||
struct pair_flush_for_close *args = cast_to_typeof(args) workitem_arg(wi);
|
||||
static void cachetable_flush_pair_for_close(void* extra) {
|
||||
struct pair_flush_for_close *args = (struct pair_flush_for_close*) extra;
|
||||
PAIR p = args->p;
|
||||
CACHEFILE cf = p->cachefile;
|
||||
CACHETABLE ct = cf->cachetable;
|
||||
|
@ -2498,9 +2486,7 @@ static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
|
|||
struct pair_flush_for_close *XMALLOC(args);
|
||||
args->p = p;
|
||||
args->bjm = bjm;
|
||||
workitem_init(&p->asyncwork, cachetable_flush_pair_for_close, args);
|
||||
workqueue_enq(&ct->wq, &p->asyncwork, 0);
|
||||
|
||||
toku_kibbutz_enq(ct->ct_kibbutz, cachetable_flush_pair_for_close, args);
|
||||
}
|
||||
}
|
||||
cachetable_unlock(ct);
|
||||
|
@ -2561,9 +2547,9 @@ toku_cachetable_close (CACHETABLE *ctp) {
|
|||
assert(ct->size_evicting == 0);
|
||||
rwlock_destroy(&ct->pending_lock);
|
||||
cachetable_unlock(ct);
|
||||
toku_destroy_workers(&ct->wq, &ct->threadpool);
|
||||
toku_destroy_workers(&ct->checkpoint_wq, &ct->checkpoint_threadpool);
|
||||
toku_kibbutz_destroy(ct->kibbutz);
|
||||
toku_kibbutz_destroy(ct->client_kibbutz);
|
||||
toku_kibbutz_destroy(ct->ct_kibbutz);
|
||||
toku_kibbutz_destroy(ct->checkpointing_kibbutz);
|
||||
bjm_destroy(ct->checkpoint_clones_bjm);
|
||||
toku_cond_destroy(&ct->flow_control_cond);
|
||||
toku_free(ct->table);
|
||||
|
@ -2592,7 +2578,7 @@ int toku_cachetable_unpin_and_remove (
|
|||
assert(nb_mutex_writers(&p->value_nb_mutex));
|
||||
// grab disk_nb_mutex to ensure any background thread writing
|
||||
// out a cloned value completes
|
||||
nb_mutex_lock(&p->disk_nb_mutex, ct->mutex);
|
||||
nb_mutex_lock(&p->disk_nb_mutex, &ct->mutex);
|
||||
assert(p->cloned_value_data == NULL);
|
||||
|
||||
//
|
||||
|
@ -2684,7 +2670,7 @@ int toku_cachetable_unpin_and_remove (
|
|||
toku_cond_init(&cond, NULL);
|
||||
nb_mutex_wait_for_users(
|
||||
&p->value_nb_mutex,
|
||||
ct->mutex,
|
||||
&ct->mutex,
|
||||
&cond
|
||||
);
|
||||
toku_cond_destroy(&cond);
|
||||
|
@ -2898,7 +2884,7 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
|
|||
// updated. This is not correct. The result is that the checkpoint does not include the proper
|
||||
// state of this PAIR.
|
||||
//
|
||||
rwlock_write_lock(&ct->pending_lock, ct->mutex);
|
||||
rwlock_write_lock(&ct->pending_lock, &ct->mutex);
|
||||
ct->checkpoint_is_beginning = TRUE; // detect threadsafety bugs, must set checkpoint_is_beginning ...
|
||||
invariant(ct->checkpoint_prohibited == 0); // ... before testing checkpoint_prohibited
|
||||
bjm_reset(ct->checkpoint_clones_bjm);
|
||||
|
@ -3350,7 +3336,7 @@ toku_cleaner_thread (void *cachetable_v)
|
|||
cachetable_unlock(ct);
|
||||
continue;
|
||||
}
|
||||
nb_mutex_lock(&best_pair->value_nb_mutex, ct->mutex);
|
||||
nb_mutex_lock(&best_pair->value_nb_mutex, &ct->mutex);
|
||||
// verify a key assumption.
|
||||
assert(cleaner_thread_rate_pair(best_pair) > 0);
|
||||
if (best_pair->checkpoint_pending) {
|
||||
|
|
Loading…
Add table
Reference in a new issue