mirror of
https://github.com/MariaDB/server.git
synced 2025-01-22 14:54:20 +01:00
631fe49fc1
Fixes windows build issues Add checkpoint-safe fast truncate git-svn-id: file:///svn/toku/tokudb@11023 c7de825b-a66e-492c-adef-691d508d4ae1
1711 lines
54 KiB
C
1711 lines
54 KiB
C
/* -*- mode: C; c-basic-offset: 4 -*- */
|
|
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
|
|
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <malloc.h>
|
|
#include <time.h>
|
|
|
|
#include "toku_portability.h"
|
|
#include "memory.h"
|
|
#include "workqueue.h"
|
|
#include "threadpool.h"
|
|
#include "cachetable.h"
|
|
#include "rwlock.h"
|
|
#include "toku_worker.h"
|
|
#include "log_header.h"
|
|
|
|
#if !defined(TOKU_CACHETABLE_DO_EVICT_FROM_WRITER)
|
|
#error
|
|
#endif
|
|
|
|
// use worker threads 0->no 1->yes
|
|
#define DO_WORKER_THREAD 1
|
|
#if DO_WORKER_THREAD
|
|
static void cachetable_writer(WORKITEM);
|
|
static void cachetable_reader(WORKITEM);
|
|
#endif
|
|
|
|
// use cachetable locks 0->no 1->yes
|
|
#define DO_CACHETABLE_LOCK 1
|
|
|
|
// simulate long latency write operations with usleep. time in milliseconds.
|
|
#define DO_CALLBACK_USLEEP 0
|
|
#define DO_CALLBACK_BUSYWAIT 0
|
|
|
|
#define TRACE_CACHETABLE 0
|
|
#if TRACE_CACHETABLE
|
|
#define WHEN_TRACE_CT(x) x
|
|
#else
|
|
#define WHEN_TRACE_CT(x) ((void)0)
|
|
#endif
|
|
|
|
#define TOKU_DO_WAIT_TIME 0
|
|
|
|
static u_int64_t cachetable_hit;
|
|
static u_int64_t cachetable_miss;
|
|
static u_int64_t cachetable_wait_reading;
|
|
static u_int64_t cachetable_wait;
|
|
#if TOKU_DO_WAIT_TIME
|
|
static u_int64_t cachetable_miss_time;
|
|
static u_int64_t cachetable_wait_time;
|
|
#endif
|
|
|
|
enum ctpair_state {
|
|
CTPAIR_INVALID = 0, // invalid
|
|
CTPAIR_IDLE = 1, // in memory
|
|
CTPAIR_READING = 2, // being read into memory
|
|
CTPAIR_WRITING = 3, // being written from memory
|
|
};
|
|
|
|
typedef struct ctpair *PAIR;
|
|
struct ctpair {
|
|
enum typ_tag tag;
|
|
CACHEFILE cachefile;
|
|
CACHEKEY key;
|
|
void *value;
|
|
long size;
|
|
enum ctpair_state state;
|
|
enum cachetable_dirty dirty;
|
|
|
|
char verify_flag; // Used in verify_cachetable()
|
|
BOOL write_me; // write_pair
|
|
BOOL remove_me; // write_pair
|
|
|
|
u_int32_t fullhash;
|
|
|
|
CACHETABLE_FLUSH_CALLBACK flush_callback;
|
|
CACHETABLE_FETCH_CALLBACK fetch_callback;
|
|
void *extraargs;
|
|
|
|
PAIR next,prev; // In LRU list.
|
|
PAIR hash_chain;
|
|
|
|
LSN modified_lsn; // What was the LSN when modified (undefined if not dirty)
|
|
LSN written_lsn; // What was the LSN when written (we need to get this information when we fetch)
|
|
|
|
BOOL checkpoint_pending; // If this is on, then we have got to write the pair out to disk before modifying it.
|
|
PAIR pending_next;
|
|
PAIR pending_prev;
|
|
|
|
struct rwlock rwlock; // multiple get's, single writer
|
|
struct workqueue *cq; // writers sometimes return ctpair's using this queue
|
|
struct workitem asyncwork; // work item for the worker threads
|
|
u_int32_t refs; //References that prevent descruction
|
|
int already_removed; //If a pair is removed from the cachetable, but cannot be freed because refs>0, this is set.
|
|
};
|
|
|
|
static void * const zero_value = 0;
|
|
static int const zero_size = 0;
|
|
|
|
static inline void
|
|
ctpair_add_ref(PAIR p) {
|
|
assert(!p->already_removed);
|
|
p->refs++;
|
|
}
|
|
|
|
static inline void ctpair_destroy(PAIR p) {
|
|
assert(p->refs>0);
|
|
p->refs--;
|
|
if (p->refs==0) {
|
|
rwlock_destroy(&p->rwlock);
|
|
toku_free(p);
|
|
}
|
|
}
|
|
|
|
// The cachetable is as close to an ENV as we get.
|
|
struct cachetable {
|
|
enum typ_tag tag;
|
|
u_int32_t n_in_table; // number of pairs in the hash table
|
|
u_int32_t table_size; // number of buckets in the hash table
|
|
PAIR *table; // hash table
|
|
PAIR head,tail; // of LRU list. head is the most recently used. tail is least recently used.
|
|
CACHEFILE cachefiles; // list of cachefiles that use this cachetable
|
|
CACHEFILE cachefiles_in_checkpoint; //list of cachefiles included in checkpoint in progress
|
|
long size_current; // the sum of the sizes of the pairs in the cachetable
|
|
long size_limit; // the limit to the sum of the pair sizes
|
|
long size_writing; // the sum of the sizes of the pairs being written
|
|
TOKULOGGER logger;
|
|
toku_pthread_mutex_t *mutex; // coarse lock that protects the cachetable, the cachefiles, and the pair's
|
|
struct workqueue wq; // async work queue
|
|
THREADPOOL threadpool; // pool of worker threads
|
|
LSN lsn_of_checkpoint_in_progress;
|
|
PAIR pending_head; // list of pairs marked with checkpoint_pending
|
|
struct rwlock pending_lock; // multiple writer threads, single checkpoint thread
|
|
};
|
|
|
|
// Lock the cachetable
|
|
static inline void cachetable_lock(CACHETABLE ct __attribute__((unused))) {
|
|
#if DO_CACHETABLE_LOCK
|
|
int r = toku_pthread_mutex_lock(ct->mutex); assert(r == 0);
|
|
#endif
|
|
}
|
|
|
|
// Unlock the cachetable
|
|
static inline void cachetable_unlock(CACHETABLE ct __attribute__((unused))) {
|
|
#if DO_CACHETABLE_LOCK
|
|
int r = toku_pthread_mutex_unlock(ct->mutex); assert(r == 0);
|
|
#endif
|
|
}
|
|
|
|
// Wait for cache table space to become available
|
|
static inline void cachetable_wait_write(CACHETABLE ct) {
|
|
while (2*ct->size_writing > ct->size_current) {
|
|
workqueue_wait_write(&ct->wq, 0);
|
|
}
|
|
}
|
|
|
|
struct cachefile {
|
|
CACHEFILE next;
|
|
CACHEFILE next_in_checkpoint;
|
|
BOOL for_checkpoint; //True if part of the in-progress checkpoint
|
|
u_int64_t refcount; /* CACHEFILEs are shared. Use a refcount to decide when to really close it.
|
|
* The reference count is one for every open DB.
|
|
* Plus one for every commit/rollback record. (It would be harder to keep a count for every open transaction,
|
|
* because then we'd have to figure out if the transaction was already counted. If we simply use a count for
|
|
* every record in the transaction, we'll be ok. Hence we use a 64-bit counter to make sure we don't run out.
|
|
*/
|
|
int fd; /* Bug: If a file is opened read-only, then it is stuck in read-only. If it is opened read-write, then subsequent writers can write to it too. */
|
|
CACHETABLE cachetable;
|
|
struct fileid fileid;
|
|
FILENUM filenum;
|
|
char *fname;
|
|
|
|
void *userdata;
|
|
int (*close_userdata)(CACHEFILE cf, void *userdata, char **error_string); // when closing the last reference to a cachefile, first call this function.
|
|
int (*begin_checkpoint_userdata)(CACHEFILE cf, LSN lsn_of_checkpoint, void *userdata); // before checkpointing cachefiles call this function.
|
|
int (*checkpoint_userdata)(CACHEFILE cf, void *userdata); // when checkpointing a cachefile, call this function.
|
|
int (*end_checkpoint_userdata)(CACHEFILE cf, void *userdata); // after checkpointing cachefiles call this function.
|
|
};
|
|
|
|
int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN UU(initial_lsn), TOKULOGGER logger) {
|
|
#if defined __linux__
|
|
{
|
|
static int did_mallopt = 0;
|
|
if (!did_mallopt) {
|
|
mallopt(M_MMAP_THRESHOLD, 1024*64); // 64K and larger should be malloced with mmap().
|
|
did_mallopt = 1;
|
|
}
|
|
}
|
|
#endif
|
|
TAGMALLOC(CACHETABLE, ct);
|
|
if (ct == 0) return ENOMEM;
|
|
memset(ct, 0, sizeof(*ct));
|
|
ct->table_size = 4;
|
|
rwlock_init(&ct->pending_lock);
|
|
XCALLOC_N(ct->table_size, ct->table);
|
|
ct->size_limit = size_limit;
|
|
ct->logger = logger;
|
|
toku_init_workers(&ct->wq, &ct->threadpool);
|
|
ct->mutex = workqueue_lock_ref(&ct->wq);
|
|
*result = ct;
|
|
return 0;
|
|
}
|
|
|
|
// What cachefile goes with particular fd?
|
|
int toku_cachefile_of_filenum (CACHETABLE ct, FILENUM filenum, CACHEFILE *cf) {
|
|
CACHEFILE extant;
|
|
for (extant = ct->cachefiles; extant; extant=extant->next) {
|
|
if (extant->filenum.fileid==filenum.fileid) {
|
|
*cf = extant;
|
|
return 0;
|
|
}
|
|
}
|
|
return ENOENT;
|
|
}
|
|
|
|
static FILENUM next_filenum_to_use={0};
|
|
|
|
static void cachefile_init_filenum(CACHEFILE cf, int fd, const char *fname, struct fileid fileid) \
|
|
{
|
|
cf->fd = fd;
|
|
cf->fileid = fileid;
|
|
cf->fname = fname ? toku_strdup(fname) : 0;
|
|
}
|
|
|
|
// If something goes wrong, close the fd. After this, the caller shouldn't close the fd, but instead should close the cachefile.
|
|
int toku_cachetable_openfd (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char *fname) {
|
|
int r;
|
|
CACHEFILE extant;
|
|
struct fileid fileid;
|
|
r = toku_os_get_unique_file_id(fd, &fileid);
|
|
if (r != 0) {
|
|
r=errno; close(fd);
|
|
return r;
|
|
}
|
|
for (extant = ct->cachefiles; extant; extant=extant->next) {
|
|
if (memcmp(&extant->fileid, &fileid, sizeof(fileid))==0) {
|
|
r = close(fd);
|
|
assert(r == 0);
|
|
extant->refcount++;
|
|
*cfptr = extant;
|
|
return 0;
|
|
}
|
|
}
|
|
try_again:
|
|
for (extant = ct->cachefiles; extant; extant=extant->next) {
|
|
if (next_filenum_to_use.fileid==extant->filenum.fileid) {
|
|
next_filenum_to_use.fileid++;
|
|
goto try_again;
|
|
}
|
|
}
|
|
{
|
|
CACHEFILE XCALLOC(newcf);
|
|
newcf->cachetable = ct;
|
|
newcf->filenum.fileid = next_filenum_to_use.fileid++;
|
|
cachefile_init_filenum(newcf, fd, fname, fileid);
|
|
newcf->refcount = 1;
|
|
newcf->next = ct->cachefiles;
|
|
ct->cachefiles = newcf;
|
|
|
|
newcf->userdata = 0;
|
|
newcf->close_userdata = 0;
|
|
newcf->checkpoint_userdata = 0;
|
|
newcf->begin_checkpoint_userdata = 0;
|
|
newcf->end_checkpoint_userdata = 0;
|
|
|
|
*cfptr = newcf;
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
int toku_cachetable_openf (CACHEFILE *cfptr, CACHETABLE ct, const char *fname, int flags, mode_t mode) {
|
|
int fd = open(fname, flags+O_BINARY, mode);
|
|
if (fd<0) return errno;
|
|
return toku_cachetable_openfd (cfptr, ct, fd, fname);
|
|
}
|
|
|
|
WORKQUEUE toku_cachetable_get_workqueue(CACHETABLE ct) {
|
|
return &ct->wq;
|
|
}
|
|
|
|
void toku_cachefile_get_workqueue_load (CACHEFILE cf, int *n_in_queue, int *n_threads) {
|
|
CACHETABLE ct = cf->cachetable;
|
|
*n_in_queue = workqueue_n_in_queue(&ct->wq, 1);
|
|
*n_threads = threadpool_get_current_threads(ct->threadpool);
|
|
}
|
|
|
|
int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname) {
|
|
int r;
|
|
struct fileid fileid;
|
|
r=toku_os_get_unique_file_id(fd, &fileid);
|
|
if (r != 0) {
|
|
r=errno; close(fd); return r;
|
|
}
|
|
if (cf->close_userdata && (r = cf->close_userdata(cf, cf->userdata, 0))) {
|
|
return r;
|
|
}
|
|
cf->close_userdata = NULL;
|
|
cf->checkpoint_userdata = NULL;
|
|
cf->begin_checkpoint_userdata = NULL;
|
|
cf->end_checkpoint_userdata = NULL;
|
|
cf->userdata = NULL;
|
|
|
|
close(cf->fd);
|
|
cf->fd = -1;
|
|
if (cf->fname) {
|
|
toku_free(cf->fname);
|
|
cf->fname = 0;
|
|
}
|
|
cachefile_init_filenum(cf, fd, fname, fileid);
|
|
return 0;
|
|
}
|
|
|
|
int toku_cachefile_fd (CACHEFILE cf) {
|
|
return cf->fd;
|
|
}
|
|
|
|
int toku_cachefile_truncate0 (CACHEFILE cf) {
|
|
int r;
|
|
r = ftruncate(cf->fd, 0);
|
|
if (r != 0)
|
|
r = errno;
|
|
return r;
|
|
}
|
|
|
|
static CACHEFILE remove_cf_from_list (CACHEFILE cf, CACHEFILE list) {
|
|
if (list==0) return 0;
|
|
else if (list==cf) {
|
|
return list->next;
|
|
} else {
|
|
list->next = remove_cf_from_list(cf, list->next);
|
|
return list;
|
|
}
|
|
}
|
|
|
|
static int cachetable_flush_cachefile (CACHETABLE, CACHEFILE cf);
|
|
|
|
// Increment the reference count
|
|
void toku_cachefile_refup (CACHEFILE cf) {
|
|
cf->refcount++;
|
|
}
|
|
|
|
int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger, char **error_string) {
|
|
|
|
CACHEFILE cf = *cfp;
|
|
CACHETABLE ct = cf->cachetable;
|
|
cachetable_lock(ct);
|
|
assert(!cf->next_in_checkpoint);
|
|
assert(!cf->for_checkpoint);
|
|
assert(cf->refcount>0);
|
|
cf->refcount--;
|
|
if (cf->refcount==0) {
|
|
int r;
|
|
if ((r = cachetable_flush_cachefile(ct, cf))) {
|
|
error:
|
|
cf->cachetable->cachefiles = remove_cf_from_list(cf, cf->cachetable->cachefiles);
|
|
if (cf->fname) toku_free(cf->fname);
|
|
int r2 = close(cf->fd);
|
|
if (r2!=0) fprintf(stderr, "%s:%d During error handling, could not close file r=%d errno=%d\n", __FILE__, __LINE__, r2, errno);
|
|
//assert(r == 0);
|
|
toku_free(cf);
|
|
cachetable_unlock(ct);
|
|
return r;
|
|
}
|
|
if (cf->close_userdata && (r = cf->close_userdata(cf, cf->userdata, error_string))) {
|
|
goto error;
|
|
}
|
|
cf->close_userdata = NULL;
|
|
cf->checkpoint_userdata = NULL;
|
|
cf->begin_checkpoint_userdata = NULL;
|
|
cf->end_checkpoint_userdata = NULL;
|
|
cf->userdata = NULL;
|
|
cf->cachetable->cachefiles = remove_cf_from_list(cf, cf->cachetable->cachefiles);
|
|
cachetable_unlock(ct);
|
|
r = close(cf->fd);
|
|
assert(r == 0);
|
|
cf->fd = -1;
|
|
if (logger) {
|
|
//assert(cf->fname);
|
|
//BYTESTRING bs = {.len=strlen(cf->fname), .data=cf->fname};
|
|
//r = toku_log_cfclose(logger, 0, 0, bs, cf->filenum);
|
|
}
|
|
if (cf->fname) toku_free(cf->fname);
|
|
toku_free(cf);
|
|
*cfp=0;
|
|
return r;
|
|
} else {
|
|
cachetable_unlock(ct);
|
|
*cfp=0;
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
int toku_cachefile_flush (CACHEFILE cf) {
|
|
CACHETABLE ct = cf->cachetable;
|
|
cachetable_lock(ct);
|
|
int r = cachetable_flush_cachefile(ct, cf);
|
|
cachetable_unlock(ct);
|
|
return r;
|
|
}
|
|
|
|
// This hash function comes from Jenkins: http://burtleburtle.net/bob/c/lookup3.c
|
|
// The idea here is to mix the bits thoroughly so that we don't have to do modulo by a prime number.
|
|
// Instead we can use a bitmask on a table of size power of two.
|
|
// This hash function does yield improved performance on ./db-benchmark-test-tokudb and ./scanscan
|
|
static inline u_int32_t rot(u_int32_t x, u_int32_t k) {
|
|
return (x<<k) | (x>>(32-k));
|
|
}
|
|
static inline u_int32_t final (u_int32_t a, u_int32_t b, u_int32_t c) {
|
|
c ^= b; c -= rot(b,14);
|
|
a ^= c; a -= rot(c,11);
|
|
b ^= a; b -= rot(a,25);
|
|
c ^= b; c -= rot(b,16);
|
|
a ^= c; a -= rot(c,4);
|
|
b ^= a; b -= rot(a,14);
|
|
c ^= b; c -= rot(b,24);
|
|
return c;
|
|
}
|
|
|
|
u_int32_t toku_cachetable_hash (CACHEFILE cachefile, BLOCKNUM key)
|
|
// Effect: Return a 32-bit hash key. The hash key shall be suitable for using with bitmasking for a table of size power-of-two.
|
|
{
|
|
return final(cachefile->filenum.fileid, (u_int32_t)(key.b>>32), (u_int32_t)key.b);
|
|
}
|
|
|
|
#if 0
|
|
static unsigned int hashit (CACHETABLE ct, CACHEKEY key, CACHEFILE cachefile) {
|
|
assert(0==(ct->table_size & (ct->table_size -1))); // make sure table is power of two
|
|
return (toku_cachetable_hash(key,cachefile))&(ct->table_size-1);
|
|
}
|
|
#endif
|
|
|
|
static void cachetable_rehash (CACHETABLE ct, u_int32_t newtable_size) {
|
|
// printf("rehash %p %d %d %d\n", t, primeindexdelta, ct->n_in_table, ct->table_size);
|
|
|
|
assert(newtable_size>=4 && ((newtable_size & (newtable_size-1))==0));
|
|
PAIR *newtable = toku_calloc(newtable_size, sizeof(*ct->table));
|
|
u_int32_t i;
|
|
//printf("%s:%d newtable_size=%d\n", __FILE__, __LINE__, newtable_size);
|
|
assert(newtable!=0);
|
|
u_int32_t oldtable_size = ct->table_size;
|
|
ct->table_size=newtable_size;
|
|
for (i=0; i<newtable_size; i++) newtable[i]=0;
|
|
for (i=0; i<oldtable_size; i++) {
|
|
PAIR p;
|
|
while ((p=ct->table[i])!=0) {
|
|
unsigned int h = p->fullhash&(newtable_size-1);
|
|
ct->table[i] = p->hash_chain;
|
|
p->hash_chain = newtable[h];
|
|
newtable[h] = p;
|
|
}
|
|
}
|
|
toku_free(ct->table);
|
|
// printf("Freed\n");
|
|
ct->table=newtable;
|
|
//printf("Done growing or shrinking\n");
|
|
}
|
|
|
|
static void lru_remove (CACHETABLE ct, PAIR p) {
|
|
if (p->next) {
|
|
p->next->prev = p->prev;
|
|
} else {
|
|
assert(ct->tail==p);
|
|
ct->tail = p->prev;
|
|
}
|
|
if (p->prev) {
|
|
p->prev->next = p->next;
|
|
} else {
|
|
assert(ct->head==p);
|
|
ct->head = p->next;
|
|
}
|
|
p->prev = p->next = 0;
|
|
}
|
|
|
|
static void lru_add_to_list (CACHETABLE ct, PAIR p) {
|
|
// requires that touch_me is not currently in the table.
|
|
assert(p->prev==0);
|
|
p->prev = 0;
|
|
p->next = ct->head;
|
|
if (ct->head) {
|
|
ct->head->prev = p;
|
|
} else {
|
|
assert(!ct->tail);
|
|
ct->tail = p;
|
|
}
|
|
ct->head = p;
|
|
}
|
|
|
|
static void lru_touch (CACHETABLE ct, PAIR p) {
|
|
lru_remove(ct,p);
|
|
lru_add_to_list(ct,p);
|
|
}
|
|
|
|
static PAIR remove_from_hash_chain (PAIR remove_me, PAIR list) {
|
|
if (remove_me==list) return list->hash_chain;
|
|
list->hash_chain = remove_from_hash_chain(remove_me, list->hash_chain);
|
|
return list;
|
|
}
|
|
|
|
//Remove a pair from the list of pairs that were marked with the
|
|
//pending bit for the in-progress checkpoint.
|
|
//Requires: cachetable lock is held during duration.
|
|
static void
|
|
pending_pairs_remove (CACHETABLE ct, PAIR p) {
|
|
if (p->pending_next) {
|
|
p->pending_next->pending_prev = p->pending_prev;
|
|
}
|
|
if (p->pending_prev) {
|
|
p->pending_prev->pending_next = p->pending_next;
|
|
}
|
|
else if (ct->pending_head==p) {
|
|
ct->pending_head = p->pending_next;
|
|
}
|
|
p->pending_prev = p->pending_next = NULL;
|
|
}
|
|
|
|
// Remove a pair from the cachetable
|
|
// Effects: the pair is removed from the LRU list and from the cachetable's hash table.
|
|
// The size of the objects in the cachetable is adjusted by the size of the pair being
|
|
// removed.
|
|
|
|
static void cachetable_remove_pair (CACHETABLE ct, PAIR p) {
|
|
lru_remove(ct, p);
|
|
pending_pairs_remove(ct, p);
|
|
|
|
assert(ct->n_in_table>0);
|
|
ct->n_in_table--;
|
|
// Remove it from the hash chain.
|
|
{
|
|
unsigned int h = p->fullhash&(ct->table_size-1);
|
|
ct->table[h] = remove_from_hash_chain (p, ct->table[h]);
|
|
}
|
|
ct->size_current -= p->size; assert(ct->size_current >= 0);
|
|
p->already_removed = TRUE;
|
|
}
|
|
|
|
// Maybe remove a pair from the cachetable and free it, depending on whether
|
|
// or not there are any threads interested in the pair. The flush callback
|
|
// is called with write_me and keep_me both false, and the pair is destroyed.
|
|
|
|
static void cachetable_maybe_remove_and_free_pair (CACHETABLE ct, PAIR p) {
|
|
if (rwlock_users(&p->rwlock) == 0) {
|
|
cachetable_remove_pair(ct, p);
|
|
|
|
// helgrind
|
|
CACHETABLE_FLUSH_CALLBACK flush_callback = p->flush_callback;
|
|
CACHEFILE cachefile = p->cachefile;
|
|
CACHEKEY key = p->key;
|
|
void *value = p->value;
|
|
void *extraargs = p->extraargs;
|
|
long size = p->size;
|
|
|
|
cachetable_unlock(ct);
|
|
|
|
flush_callback(cachefile, key, value, extraargs, size, FALSE, FALSE, TRUE);
|
|
|
|
cachetable_lock(ct);
|
|
|
|
ctpair_destroy(p);
|
|
}
|
|
}
|
|
|
|
static void abort_fetch_pair(PAIR p) {
|
|
rwlock_write_unlock(&p->rwlock);
|
|
if (rwlock_users(&p->rwlock) == 0)
|
|
ctpair_destroy(p);
|
|
}
|
|
|
|
// Read a pair from a cachefile into memory using the pair's fetch callback
|
|
static int cachetable_fetch_pair(CACHETABLE ct, CACHEFILE cf, PAIR p) {
|
|
// helgrind
|
|
CACHETABLE_FETCH_CALLBACK fetch_callback = p->fetch_callback;
|
|
CACHEKEY key = p->key;
|
|
u_int32_t fullhash = p->fullhash;
|
|
void *extraargs = p->extraargs;
|
|
|
|
void *toku_value = 0;
|
|
long size = 0;
|
|
LSN written_lsn = ZERO_LSN;
|
|
|
|
WHEN_TRACE_CT(printf("%s:%d CT: fetch_callback(%lld...)\n", __FILE__, __LINE__, key));
|
|
|
|
cachetable_unlock(ct);
|
|
|
|
int r = fetch_callback(cf, key, fullhash, &toku_value, &size, extraargs, &written_lsn);
|
|
|
|
cachetable_lock(ct);
|
|
|
|
if (r) {
|
|
cachetable_remove_pair(ct, p);
|
|
p->state = CTPAIR_INVALID;
|
|
if (p->cq) {
|
|
workqueue_enq(p->cq, &p->asyncwork, 1);
|
|
return r;
|
|
}
|
|
abort_fetch_pair(p);
|
|
return r;
|
|
} else {
|
|
lru_touch(ct, p);
|
|
p->value = toku_value;
|
|
p->written_lsn = written_lsn;
|
|
p->size = size;
|
|
ct->size_current += size;
|
|
if (p->cq) {
|
|
workqueue_enq(p->cq, &p->asyncwork, 1);
|
|
return 0;
|
|
}
|
|
p->state = CTPAIR_IDLE;
|
|
rwlock_write_unlock(&p->rwlock);
|
|
if (0) printf("%s:%d %"PRId64" complete\n", __FUNCTION__, __LINE__, key.b);
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remove);
|
|
|
|
// Write a pair to storage
|
|
// Effects: an exclusive lock on the pair is obtained, the write callback is called,
|
|
// the pair dirty state is adjusted, and the write is completed. The write_me boolean
|
|
// is true when the pair is dirty and the pair is requested to be written. The keep_me
|
|
// boolean is true, so the pair is not yet evicted from the cachetable.
|
|
|
|
static void cachetable_write_pair(CACHETABLE ct, PAIR p) {
|
|
rwlock_read_lock(&ct->pending_lock, ct->mutex);
|
|
|
|
// helgrind
|
|
CACHETABLE_FLUSH_CALLBACK flush_callback = p->flush_callback;
|
|
CACHEFILE cachefile = p->cachefile;
|
|
CACHEKEY key = p->key;
|
|
void *value = p->value;
|
|
void *extraargs = p->extraargs;
|
|
long size = p->size;
|
|
BOOL dowrite = (BOOL)(p->dirty && p->write_me);
|
|
BOOL for_checkpoint = p->checkpoint_pending;
|
|
|
|
//Must set to FALSE before releasing cachetable lock
|
|
p->checkpoint_pending = FALSE; // This is the only place this flag is cleared.
|
|
cachetable_unlock(ct);
|
|
|
|
// write callback
|
|
flush_callback(cachefile, key, value, extraargs, size, dowrite, TRUE, for_checkpoint);
|
|
#if DO_CALLBACK_USLEEP
|
|
usleep(DO_CALLBACK_USLEEP);
|
|
#endif
|
|
#if DO_CALLBACK_BUSYWAIT
|
|
struct timeval tstart;
|
|
gettimeofday(&tstart, 0);
|
|
long long ltstart = tstart.tv_sec * 1000000 + tstart.tv_usec;
|
|
while (1) {
|
|
struct timeval t;
|
|
gettimeofday(&t, 0);
|
|
long long lt = t.tv_sec * 1000000 + t.tv_usec;
|
|
if (lt - ltstart > DO_CALLBACK_BUSYWAIT)
|
|
break;
|
|
}
|
|
#endif
|
|
|
|
cachetable_lock(ct);
|
|
|
|
// the pair is no longer dirty once written
|
|
if (p->dirty && p->write_me)
|
|
p->dirty = CACHETABLE_CLEAN;
|
|
|
|
assert(!p->checkpoint_pending);
|
|
rwlock_read_unlock(&ct->pending_lock);
|
|
|
|
// stuff it into a completion queue for delayed completion if a completion queue exists
|
|
// otherwise complete the write now
|
|
if (p->cq)
|
|
workqueue_enq(p->cq, &p->asyncwork, 1);
|
|
else
|
|
cachetable_complete_write_pair(ct, p, p->remove_me);
|
|
}
|
|
|
|
// complete the write of a pair by reseting the writing flag, adjusting the write
|
|
// pending size, and maybe removing the pair from the cachetable if there are no
|
|
// references to it
|
|
|
|
static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remove) {
|
|
p->cq = 0;
|
|
p->state = CTPAIR_IDLE;
|
|
|
|
// maybe wakeup any stalled writers when the pending writes fall below
|
|
// 1/8 of the size of the cachetable
|
|
ct->size_writing -= p->size;
|
|
assert(ct->size_writing >= 0);
|
|
if (8*ct->size_writing <= ct->size_current)
|
|
workqueue_wakeup_write(&ct->wq, 0);
|
|
|
|
rwlock_write_unlock(&p->rwlock);
|
|
if (do_remove)
|
|
cachetable_maybe_remove_and_free_pair(ct, p);
|
|
}
|
|
|
|
// flush and remove a pair from the cachetable. the callbacks are run by a thread in
|
|
// a thread pool.
|
|
|
|
static void flush_and_maybe_remove (CACHETABLE ct, PAIR p, BOOL write_me) {
|
|
rwlock_write_lock(&p->rwlock, ct->mutex);
|
|
p->state = CTPAIR_WRITING;
|
|
assert(ct->size_writing>=0);
|
|
ct->size_writing += p->size;
|
|
assert(ct->size_writing >= 0);
|
|
p->write_me = write_me;
|
|
p->remove_me = TRUE;
|
|
#if DO_WORKER_THREAD
|
|
WORKITEM wi = &p->asyncwork;
|
|
workitem_init(wi, cachetable_writer, p);
|
|
// evictions without a write or unpinned pair's that are clean
|
|
// can be run in the current thread
|
|
if (!p->write_me || (!rwlock_readers(&p->rwlock) && !p->dirty)) {
|
|
cachetable_write_pair(ct, p);
|
|
} else {
|
|
#if !TOKU_CACHETABLE_DO_EVICT_FROM_WRITER
|
|
p->remove_me = FALSE; // run the remove on the main thread
|
|
#endif
|
|
workqueue_enq(&ct->wq, wi, 0);
|
|
}
|
|
#else
|
|
cachetable_write_pair(ct, p);
|
|
#endif
|
|
}
|
|
|
|
static int maybe_flush_some (CACHETABLE ct, long size) {
|
|
int r = 0;
|
|
again:
|
|
if (size + ct->size_current > ct->size_limit + ct->size_writing) {
|
|
{
|
|
//unsigned long rss __attribute__((__unused__)) = check_max_rss();
|
|
//printf("this-size=%.6fMB projected size = %.2fMB limit=%2.fMB rss=%2.fMB\n", size/(1024.0*1024.0), (size+t->size_current)/(1024.0*1024.0), t->size_limit/(1024.0*1024.0), rss/256.0);
|
|
//struct mallinfo m = mallinfo();
|
|
//printf(" arena=%d hblks=%d hblkhd=%d\n", m.arena, m.hblks, m.hblkhd);
|
|
}
|
|
/* Try to remove one. */
|
|
PAIR remove_me;
|
|
for (remove_me = ct->tail; remove_me; remove_me = remove_me->prev) {
|
|
if (remove_me->state == CTPAIR_IDLE && !rwlock_users(&remove_me->rwlock)) {
|
|
flush_and_maybe_remove(ct, remove_me, TRUE);
|
|
goto again;
|
|
}
|
|
}
|
|
/* All were pinned. */
|
|
//printf("All are pinned\n");
|
|
return 0; // Don't indicate an error code. Instead let memory get overfull.
|
|
}
|
|
|
|
if ((4 * ct->n_in_table < ct->table_size) && ct->table_size > 4)
|
|
cachetable_rehash(ct, ct->table_size/2);
|
|
|
|
return r;
|
|
}
|
|
|
|
void toku_cachetable_maybe_flush_some(CACHETABLE ct) {
|
|
cachetable_lock(ct);
|
|
maybe_flush_some(ct, 0);
|
|
cachetable_unlock(ct);
|
|
}
|
|
|
|
static PAIR cachetable_insert_at(CACHETABLE ct,
|
|
CACHEFILE cachefile, CACHEKEY key, void *value,
|
|
enum ctpair_state state,
|
|
u_int32_t fullhash,
|
|
long size,
|
|
CACHETABLE_FLUSH_CALLBACK flush_callback,
|
|
CACHETABLE_FETCH_CALLBACK fetch_callback,
|
|
void *extraargs,
|
|
enum cachetable_dirty dirty,
|
|
LSN written_lsn) {
|
|
TAGMALLOC(PAIR, p);
|
|
assert(p);
|
|
memset(p, 0, sizeof *p);
|
|
ctpair_add_ref(p);
|
|
p->cachefile = cachefile;
|
|
p->key = key;
|
|
p->value = value;
|
|
p->fullhash = fullhash;
|
|
p->dirty = dirty;
|
|
p->size = size;
|
|
p->state = state;
|
|
p->flush_callback = flush_callback;
|
|
p->fetch_callback = fetch_callback;
|
|
p->extraargs = extraargs;
|
|
p->modified_lsn.lsn = 0;
|
|
p->written_lsn = written_lsn;
|
|
p->fullhash = fullhash;
|
|
p->next = p->prev = 0;
|
|
rwlock_init(&p->rwlock);
|
|
p->cq = 0;
|
|
lru_add_to_list(ct, p);
|
|
u_int32_t h = fullhash & (ct->table_size-1);
|
|
p->hash_chain = ct->table[h];
|
|
ct->table[h] = p;
|
|
ct->n_in_table++;
|
|
ct->size_current += size;
|
|
if (ct->n_in_table > ct->table_size) {
|
|
cachetable_rehash(ct, ct->table_size*2);
|
|
}
|
|
return p;
|
|
}
|
|
|
|
enum { hash_histogram_max = 100 };
|
|
static unsigned long long hash_histogram[hash_histogram_max];
|
|
void toku_cachetable_print_hash_histogram (void) {
|
|
int i;
|
|
for (i=0; i<hash_histogram_max; i++)
|
|
if (hash_histogram[i]) printf("%d:%llu ", i, hash_histogram[i]);
|
|
printf("\n");
|
|
printf("miss=%"PRIu64" hit=%"PRIu64" wait_reading=%"PRIu64" wait=%"PRIu64"\n",
|
|
cachetable_miss, cachetable_hit, cachetable_wait_reading, cachetable_wait);
|
|
}
|
|
|
|
static void
|
|
note_hash_count (int count) {
|
|
if (count>=hash_histogram_max) count=hash_histogram_max-1;
|
|
hash_histogram[count]++;
|
|
}
|
|
|
|
int toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void*value, long size,
|
|
CACHETABLE_FLUSH_CALLBACK flush_callback,
|
|
CACHETABLE_FETCH_CALLBACK fetch_callback, void *extraargs) {
|
|
WHEN_TRACE_CT(printf("%s:%d CT cachetable_put(%lld)=%p\n", __FILE__, __LINE__, key, value));
|
|
CACHETABLE ct = cachefile->cachetable;
|
|
int count=0;
|
|
cachetable_lock(ct);
|
|
cachetable_wait_write(ct);
|
|
{
|
|
PAIR p;
|
|
for (p=ct->table[fullhash&(cachefile->cachetable->table_size-1)]; p; p=p->hash_chain) {
|
|
count++;
|
|
if (p->key.b==key.b && p->cachefile==cachefile) {
|
|
// Semantically, these two asserts are not strictly right. After all, when are two functions eq?
|
|
// In practice, the functions better be the same.
|
|
assert(p->flush_callback==flush_callback);
|
|
assert(p->fetch_callback==fetch_callback);
|
|
rwlock_read_lock(&p->rwlock, ct->mutex);
|
|
note_hash_count(count);
|
|
cachetable_unlock(ct);
|
|
return -1; /* Already present. */
|
|
}
|
|
}
|
|
}
|
|
int r;
|
|
if ((r=maybe_flush_some(ct, size))) {
|
|
cachetable_unlock(ct);
|
|
return r;
|
|
}
|
|
// flushing could change the table size, but wont' change the fullhash
|
|
PAIR p = cachetable_insert_at(ct, cachefile, key, value, CTPAIR_IDLE, fullhash, size, flush_callback, fetch_callback, extraargs, CACHETABLE_DIRTY, ZERO_LSN);
|
|
assert(p);
|
|
rwlock_read_lock(&p->rwlock, ct->mutex);
|
|
note_hash_count(count);
|
|
cachetable_unlock(ct);
|
|
return 0;
|
|
}
|
|
|
|
#if TOKU_DO_WAIT_TIME
|
|
static u_int64_t tdelta(struct timeval *tnew, struct timeval *told) {
|
|
return (tnew->tv_sec * 1000000ULL + tnew->tv_usec) - (told->tv_sec * 1000000ULL + told->tv_usec);
|
|
}
|
|
#endif
|
|
|
|
// On entry: hold the ct lock and the rwlock_write_lock.
|
|
static void
|
|
write_pair_for_checkpoint (CACHETABLE ct, PAIR p)
|
|
{
|
|
assert(p->state!=CTPAIR_WRITING);
|
|
if (p->checkpoint_pending) {
|
|
// this is essentially a flush_and_maybe_remove except that
|
|
// we already have p->rwlock and we just do the write in our own thread.
|
|
assert(p->dirty); // it must be dirty if its pending.
|
|
#if 0
|
|
// TODO: Determine if this is legal, and/or required. Commented out for now
|
|
// I believe if it has a queue, removing it it will break whatever's waiting for it.
|
|
// p->cq = 0; // I don't want any delay, just do it.
|
|
#endif
|
|
|
|
p->state = CTPAIR_WRITING; //most of this code should run only if NOT ALREADY CTPAIR_WRITING
|
|
assert(ct->size_writing>=0);
|
|
ct->size_writing += p->size;
|
|
assert(ct->size_writing>=0);
|
|
p->write_me = TRUE;
|
|
p->remove_me = FALSE;
|
|
cachetable_write_pair(ct, p); // unlocks the pair
|
|
}
|
|
}
|
|
|
|
|
|
int toku_cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void**value, long *sizep,
|
|
CACHETABLE_FLUSH_CALLBACK flush_callback,
|
|
CACHETABLE_FETCH_CALLBACK fetch_callback, void *extraargs) {
|
|
CACHETABLE ct = cachefile->cachetable;
|
|
PAIR p;
|
|
int count=0;
|
|
cachetable_lock(ct);
|
|
cachetable_wait_write(ct);
|
|
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
|
|
count++;
|
|
if (p->key.b==key.b && p->cachefile==cachefile) {
|
|
#if TOKU_DO_WAIT_TIME
|
|
struct timeval t0;
|
|
int do_wait_time = 0;
|
|
#endif
|
|
if (p->rwlock.writer || p->rwlock.want_write) {
|
|
if (p->state == CTPAIR_READING)
|
|
cachetable_wait_reading++;
|
|
else
|
|
cachetable_wait++;
|
|
#if TOKU_DO_WAIT_TIME
|
|
do_wait_time = 1;
|
|
gettimeofday(&t0, NULL);
|
|
#endif
|
|
}
|
|
if (p->checkpoint_pending) {
|
|
rwlock_write_lock(&p->rwlock, ct->mutex);
|
|
write_pair_for_checkpoint(ct, p); // releases the pair_write_lock, but not the cachetable lock
|
|
}
|
|
// still have the cachetable lock
|
|
rwlock_read_lock(&p->rwlock, ct->mutex);
|
|
#if TOKU_DO_WAIT_TIME
|
|
if (do_wait_time) {
|
|
struct timeval tnow;
|
|
gettimeofday(&tnow, NULL);
|
|
cachetable_wait_time += tdelta(&tnow, &t0);
|
|
}
|
|
#endif
|
|
if (p->state == CTPAIR_INVALID) {
|
|
rwlock_read_unlock(&p->rwlock);
|
|
if (rwlock_users(&p->rwlock) == 0)
|
|
ctpair_destroy(p);
|
|
cachetable_unlock(ct);
|
|
return ENODEV;
|
|
}
|
|
lru_touch(ct,p);
|
|
*value = p->value;
|
|
if (sizep) *sizep = p->size;
|
|
cachetable_hit++;
|
|
note_hash_count(count);
|
|
cachetable_unlock(ct);
|
|
WHEN_TRACE_CT(printf("%s:%d cachtable_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value));
|
|
return 0;
|
|
}
|
|
}
|
|
note_hash_count(count);
|
|
int r;
|
|
// Note. hashit(t,key) may have changed as a result of flushing. But fullhash won't have changed.
|
|
{
|
|
p = cachetable_insert_at(ct, cachefile, key, zero_value, CTPAIR_READING, fullhash, zero_size, flush_callback, fetch_callback, extraargs, CACHETABLE_CLEAN, ZERO_LSN);
|
|
assert(p);
|
|
rwlock_write_lock(&p->rwlock, ct->mutex);
|
|
#if TOKU_DO_WAIT_TIME
|
|
struct timeval t0;
|
|
gettimeofday(&t0, NULL);
|
|
#endif
|
|
r = cachetable_fetch_pair(ct, cachefile, p);
|
|
if (r) {
|
|
cachetable_unlock(ct);
|
|
return r;
|
|
}
|
|
cachetable_miss++;
|
|
#if TOKU_DO_WAIT_TIME
|
|
struct timeval tnow;
|
|
gettimeofday(&tnow, NULL);
|
|
cachetable_miss_time += tdelta(&tnow, &t0);
|
|
#endif
|
|
rwlock_read_lock(&p->rwlock, ct->mutex);
|
|
assert(p->state == CTPAIR_IDLE);
|
|
|
|
*value = p->value;
|
|
if (sizep) *sizep = p->size;
|
|
}
|
|
r = maybe_flush_some(ct, 0);
|
|
cachetable_unlock(ct);
|
|
WHEN_TRACE_CT(printf("%s:%d did fetch: cachtable_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value));
|
|
return r;
|
|
}
|
|
|
|
// Lookup a key in the cachetable. If it is found and it is not being written, then
|
|
// acquire a read lock on the pair, update the LRU list, and return sucess. However,
|
|
// if it is being written, then allow the writer to evict it. This prevents writers
|
|
// being suspended on a block that was just selected for eviction.
|
|
int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void**value) {
|
|
CACHETABLE ct = cachefile->cachetable;
|
|
PAIR p;
|
|
int count = 0;
|
|
int r = -1;
|
|
cachetable_lock(ct);
|
|
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
|
|
count++;
|
|
if (p->key.b==key.b && p->cachefile==cachefile && p->state == CTPAIR_IDLE) {
|
|
|
|
if (p->checkpoint_pending) {
|
|
rwlock_write_lock(&p->rwlock, ct->mutex);
|
|
write_pair_for_checkpoint(ct, p); // releases the pair_write_lock, but not the cachetable lock
|
|
}
|
|
// still have the cachetable lock
|
|
|
|
*value = p->value;
|
|
rwlock_read_lock(&p->rwlock, ct->mutex);
|
|
lru_touch(ct,p);
|
|
r = 0;
|
|
//printf("%s:%d cachetable_maybe_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value);
|
|
break;
|
|
}
|
|
}
|
|
note_hash_count(count);
|
|
cachetable_unlock(ct);
|
|
return r;
|
|
}
|
|
|
|
|
|
int toku_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, enum cachetable_dirty dirty, long size) {
|
|
CACHETABLE ct = cachefile->cachetable;
|
|
PAIR p;
|
|
WHEN_TRACE_CT(printf("%s:%d unpin(%lld)", __FILE__, __LINE__, key));
|
|
//printf("%s:%d is dirty now=%d\n", __FILE__, __LINE__, dirty);
|
|
int count = 0;
|
|
int r = -1;
|
|
//assert(fullhash == toku_cachetable_hash(cachefile, key));
|
|
cachetable_lock(ct);
|
|
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
|
|
count++;
|
|
if (p->key.b==key.b && p->cachefile==cachefile) {
|
|
assert(rwlock_readers(&p->rwlock)>0);
|
|
rwlock_read_unlock(&p->rwlock);
|
|
if (dirty) p->dirty = CACHETABLE_DIRTY;
|
|
if (size != 0) {
|
|
ct->size_current -= p->size; if (p->state == CTPAIR_WRITING) ct->size_writing -= p->size;
|
|
p->size = size;
|
|
ct->size_current += p->size; if (p->state == CTPAIR_WRITING) ct->size_writing += p->size;
|
|
}
|
|
WHEN_TRACE_CT(printf("[count=%lld]\n", p->pinned));
|
|
{
|
|
if ((r=maybe_flush_some(ct, 0))) {
|
|
cachetable_unlock(ct);
|
|
return r;
|
|
}
|
|
}
|
|
r = 0; // we found one
|
|
break;
|
|
}
|
|
}
|
|
note_hash_count(count);
|
|
cachetable_unlock(ct);
|
|
return r;
|
|
}
|
|
|
|
int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
|
|
CACHETABLE_FLUSH_CALLBACK flush_callback,
|
|
CACHETABLE_FETCH_CALLBACK fetch_callback,
|
|
void *extraargs) {
|
|
if (0) printf("%s:%d %"PRId64"\n", __FUNCTION__, __LINE__, key.b);
|
|
CACHETABLE ct = cf->cachetable;
|
|
cachetable_lock(ct);
|
|
// lookup
|
|
PAIR p;
|
|
for (p = ct->table[fullhash&(ct->table_size-1)]; p; p = p->hash_chain)
|
|
if (p->key.b==key.b && p->cachefile==cf)
|
|
break;
|
|
|
|
// if not found then create a pair in the READING state and fetch it
|
|
if (p == 0) {
|
|
p = cachetable_insert_at(ct, cf, key, zero_value, CTPAIR_READING, fullhash, zero_size, flush_callback, fetch_callback, extraargs, CACHETABLE_CLEAN, ZERO_LSN);
|
|
assert(p);
|
|
rwlock_write_lock(&p->rwlock, ct->mutex);
|
|
#if DO_WORKER_THREAD
|
|
workitem_init(&p->asyncwork, cachetable_reader, p);
|
|
workqueue_enq(&ct->wq, &p->asyncwork, 0);
|
|
#else
|
|
cachetable_fetch_pair(ct, cf, p);
|
|
#endif
|
|
}
|
|
cachetable_unlock(ct);
|
|
return 0;
|
|
}
|
|
|
|
// effect: Move an object from one key to another key.
|
|
// requires: The object is pinned in the table
|
|
int toku_cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newkey) {
|
|
CACHETABLE ct = cachefile->cachetable;
|
|
PAIR *ptr_to_p,p;
|
|
int count = 0;
|
|
u_int32_t fullhash = toku_cachetable_hash(cachefile, oldkey);
|
|
cachetable_lock(ct);
|
|
for (ptr_to_p = &ct->table[fullhash&(ct->table_size-1)], p = *ptr_to_p;
|
|
p;
|
|
ptr_to_p = &p->hash_chain, p = *ptr_to_p) {
|
|
count++;
|
|
if (p->key.b==oldkey.b && p->cachefile==cachefile) {
|
|
note_hash_count(count);
|
|
*ptr_to_p = p->hash_chain;
|
|
p->key = newkey;
|
|
u_int32_t new_fullhash = toku_cachetable_hash(cachefile, newkey);
|
|
u_int32_t nh = new_fullhash&(ct->table_size-1);
|
|
p->fullhash = new_fullhash;
|
|
p->hash_chain = ct->table[nh];
|
|
ct->table[nh] = p;
|
|
cachetable_unlock(ct);
|
|
return 0;
|
|
}
|
|
}
|
|
note_hash_count(count);
|
|
cachetable_unlock(ct);
|
|
return -1;
|
|
}
|
|
|
|
void toku_cachefile_verify (CACHEFILE cf) {
|
|
toku_cachetable_verify(cf->cachetable);
|
|
}
|
|
|
|
void toku_cachetable_verify (CACHETABLE ct) {
|
|
cachetable_lock(ct);
|
|
|
|
// First clear all the verify flags by going through the hash chains
|
|
{
|
|
u_int32_t i;
|
|
for (i=0; i<ct->table_size; i++) {
|
|
PAIR p;
|
|
for (p=ct->table[i]; p; p=p->hash_chain) {
|
|
p->verify_flag=0;
|
|
}
|
|
}
|
|
}
|
|
// Now go through the LRU chain, make sure everything in the LRU chain is hashed, and set the verify flag.
|
|
{
|
|
PAIR p;
|
|
for (p=ct->head; p; p=p->next) {
|
|
assert(p->verify_flag==0);
|
|
PAIR p2;
|
|
u_int32_t fullhash = p->fullhash;
|
|
//assert(fullhash==toku_cachetable_hash(p->cachefile, p->key));
|
|
for (p2=ct->table[fullhash&(ct->table_size-1)]; p2; p2=p2->hash_chain) {
|
|
if (p2==p) {
|
|
/* found it */
|
|
goto next;
|
|
}
|
|
}
|
|
fprintf(stderr, "Something in the LRU chain is not hashed\n");
|
|
assert(0);
|
|
next:
|
|
p->verify_flag = 1;
|
|
}
|
|
}
|
|
// Now make sure everything in the hash chains has the verify_flag set to 1.
|
|
{
|
|
u_int32_t i;
|
|
for (i=0; i<ct->table_size; i++) {
|
|
PAIR p;
|
|
for (p=ct->table[i]; p; p=p->hash_chain) {
|
|
assert(p->verify_flag);
|
|
}
|
|
}
|
|
}
|
|
|
|
cachetable_unlock(ct);
|
|
}
|
|
|
|
static void assert_cachefile_is_flushed_and_removed (CACHETABLE ct, CACHEFILE cf) {
|
|
u_int32_t i;
|
|
// Check it two ways
|
|
// First way: Look through all the hash chains
|
|
for (i=0; i<ct->table_size; i++) {
|
|
PAIR p;
|
|
for (p=ct->table[i]; p; p=p->hash_chain) {
|
|
assert(p->cachefile!=cf);
|
|
}
|
|
}
|
|
// Second way: Look through the LRU list.
|
|
{
|
|
PAIR p;
|
|
for (p=ct->head; p; p=p->next) {
|
|
assert(p->cachefile!=cf);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Flush all of the pairs that belong to a cachefile (or all pairs if
|
|
// the cachefile is NULL.
|
|
static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
|
|
unsigned nfound = 0;
|
|
struct workqueue cq;
|
|
workqueue_init(&cq);
|
|
|
|
// find all of the pairs owned by a cachefile and redirect their completion
|
|
// to a completion queue. flush and remove pairs in the IDLE state if they
|
|
// are dirty. pairs in the READING or WRITING states are already in the
|
|
// work queue.
|
|
unsigned i;
|
|
|
|
//THIS LOOP IS NOT THREAD SAFE! Has race condition since flush_and_maybe_remove releases cachetable lock
|
|
|
|
unsigned num_pairs = 0;
|
|
unsigned list_size = 16;
|
|
PAIR *list = NULL;
|
|
XMALLOC_N(list_size, list);
|
|
//It is not safe to loop through the table (and hash chains) if you can
|
|
//release the cachetable lock at any point within.
|
|
|
|
//Make a list of pairs that belong to this cachefile.
|
|
//Add a reference to them.
|
|
for (i=0; i < ct->table_size; i++) {
|
|
PAIR p;
|
|
for (p = ct->table[i]; p; p = p->hash_chain) {
|
|
if (cf == 0 || p->cachefile==cf) {
|
|
ctpair_add_ref(p);
|
|
list[num_pairs] = p;
|
|
num_pairs++;
|
|
if (num_pairs == list_size) {
|
|
list_size *= 2;
|
|
XREALLOC_N(list_size, list);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
//Loop through the list.
|
|
//It is safe to access the memory (will not have been freed).
|
|
//If 'already_removed' is set, then we should release our reference
|
|
//and go to the next entry.
|
|
for (i=0; i < num_pairs; i++) {
|
|
PAIR p = list[i];
|
|
if (p->already_removed) {
|
|
ctpair_destroy(p); //Release our reference
|
|
continue;
|
|
}
|
|
assert(cf == 0 || p->cachefile==cf);
|
|
nfound++;
|
|
p->cq = &cq;
|
|
if (p->state == CTPAIR_IDLE)
|
|
flush_and_maybe_remove(ct, p, TRUE);
|
|
ctpair_destroy(p); //Release our reference
|
|
}
|
|
toku_free(list);
|
|
|
|
// wait for all of the pairs in the work queue to complete
|
|
for (i=0; i<nfound; i++) {
|
|
cachetable_unlock(ct);
|
|
WORKITEM wi = 0;
|
|
int r = workqueue_deq(&cq, &wi, 1); assert(r == 0);
|
|
cachetable_lock(ct);
|
|
PAIR p = workitem_arg(wi);
|
|
p->cq = 0;
|
|
if (p->state == CTPAIR_READING) {
|
|
rwlock_write_unlock(&p->rwlock);
|
|
cachetable_maybe_remove_and_free_pair(ct, p);
|
|
} else if (p->state == CTPAIR_WRITING) {
|
|
cachetable_complete_write_pair(ct, p, TRUE);
|
|
} else if (p->state == CTPAIR_INVALID) {
|
|
abort_fetch_pair(p);
|
|
} else
|
|
assert(0);
|
|
}
|
|
workqueue_destroy(&cq);
|
|
assert_cachefile_is_flushed_and_removed(ct, cf);
|
|
|
|
if ((4 * ct->n_in_table < ct->table_size) && (ct->table_size>4))
|
|
cachetable_rehash(ct, ct->table_size/2);
|
|
|
|
return 0;
|
|
}
|
|
|
|
/* Require that it all be flushed. */
|
|
int toku_cachetable_close (CACHETABLE *ctp) {
|
|
CACHETABLE ct=*ctp;
|
|
int r;
|
|
cachetable_lock(ct);
|
|
if ((r=cachetable_flush_cachefile(ct, 0))) {
|
|
cachetable_unlock(ct);
|
|
return r;
|
|
}
|
|
u_int32_t i;
|
|
for (i=0; i<ct->table_size; i++) {
|
|
if (ct->table[i]) return -1;
|
|
}
|
|
assert(ct->size_writing == 0);
|
|
rwlock_destroy(&ct->pending_lock);
|
|
cachetable_unlock(ct);
|
|
toku_destroy_workers(&ct->wq, &ct->threadpool);
|
|
toku_free(ct->table);
|
|
toku_free(ct);
|
|
*ctp = 0;
|
|
return 0;
|
|
}
|
|
|
|
int toku_cachetable_unpin_and_remove (CACHEFILE cachefile, CACHEKEY key) {
|
|
int r = ENOENT;
|
|
// Removing something already present is OK.
|
|
CACHETABLE ct = cachefile->cachetable;
|
|
PAIR p;
|
|
int count = 0;
|
|
cachetable_lock(ct);
|
|
u_int32_t fullhash = toku_cachetable_hash(cachefile, key);
|
|
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
|
|
count++;
|
|
if (p->key.b==key.b && p->cachefile==cachefile) {
|
|
p->dirty = CACHETABLE_CLEAN; // clear the dirty bit. We're just supposed to remove it.
|
|
assert(rwlock_readers(&p->rwlock)==1);
|
|
rwlock_read_unlock(&p->rwlock);
|
|
struct workqueue cq;
|
|
workqueue_init(&cq);
|
|
p->cq = &cq;
|
|
if (p->state == CTPAIR_IDLE)
|
|
flush_and_maybe_remove(ct, p, FALSE);
|
|
cachetable_unlock(ct);
|
|
WORKITEM wi = 0;
|
|
r = workqueue_deq(&cq, &wi, 1);
|
|
cachetable_lock(ct);
|
|
PAIR pp = workitem_arg(wi);
|
|
assert(r == 0 && pp == p);
|
|
cachetable_complete_write_pair(ct, p, TRUE);
|
|
workqueue_destroy(&cq);
|
|
r = 0;
|
|
goto done;
|
|
}
|
|
}
|
|
done:
|
|
note_hash_count(count);
|
|
cachetable_unlock(ct);
|
|
return r;
|
|
}
|
|
|
|
static int
|
|
log_open_txn (TOKULOGGER logger, TOKUTXN txn, void *UU(v))
|
|
{
|
|
if (toku_logger_txn_parent(txn)==NULL) { // only have to log the open root transactions
|
|
int r = toku_log_xstillopen(logger, NULL, 0, toku_txn_get_txnid(txn));
|
|
assert(r==0);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
|
|
// TODO: #1510 locking of cachetable is suspect
|
|
// verify correct algorithm overall
|
|
|
|
|
|
|
|
int
|
|
toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
|
|
// Requires: All three checkpoint-relevant locks must be held (see src/checkpoint.c).
|
|
// Algorithm: Write a checkpoint record to the log, noting the LSN of that record.
|
|
// Use the begin_checkpoint callback to take necessary snapshots (header, btt)
|
|
// Mark every dirty node as "pending." ("Pending" means that the node must be
|
|
// written to disk before it can be modified.)
|
|
|
|
{
|
|
unsigned i;
|
|
cachetable_lock(ct);
|
|
//Make list of cachefiles to be included in checkpoint.
|
|
//If refcount is 0, the cachefile is closing (performing a local checkpoint)
|
|
{
|
|
CACHEFILE cf;
|
|
assert(ct->cachefiles_in_checkpoint==NULL);
|
|
for (cf = ct->cachefiles; cf; cf=cf->next) {
|
|
if (cf->refcount>0) {
|
|
//Incremement reference count of cachefile because we're using it for the checkpoint.
|
|
//This will prevent closing during the checkpoint.
|
|
toku_cachefile_refup(cf);
|
|
cf->next_in_checkpoint = ct->cachefiles_in_checkpoint;
|
|
ct->cachefiles_in_checkpoint = cf;
|
|
cf->for_checkpoint = TRUE;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (logger) {
|
|
// The checkpoint must be performed after the lock is acquired.
|
|
{
|
|
LSN begin_lsn;
|
|
int r = toku_log_begin_checkpoint(logger, &begin_lsn, 0);
|
|
ct->lsn_of_checkpoint_in_progress = begin_lsn;
|
|
assert(r==0);
|
|
}
|
|
// Log all the open transactions
|
|
{
|
|
int r = toku_logger_iterate_over_live_txns (logger, log_open_txn, NULL);
|
|
assert(r==0);
|
|
}
|
|
// Log all the open files
|
|
{
|
|
//Must loop through ALL open files (even if not included in checkpoint).
|
|
CACHEFILE cf;
|
|
for (cf = ct->cachefiles; cf; cf=cf->next) {
|
|
BYTESTRING bs = { strlen(cf->fname), // don't include the NUL
|
|
cf->fname };
|
|
int r = toku_log_fassociate(logger, NULL, 0, cf->filenum, bs);
|
|
assert(r==0);
|
|
}
|
|
}
|
|
}
|
|
|
|
rwlock_write_lock(&ct->pending_lock, ct->mutex);
|
|
for (i=0; i < ct->table_size; i++) {
|
|
PAIR p;
|
|
for (p = ct->table[i]; p; p=p->hash_chain) {
|
|
assert(!p->checkpoint_pending);
|
|
//Only include pairs belonging to cachefiles in the checkpoint
|
|
if (!p->cachefile->for_checkpoint) continue;
|
|
if (p->state == CTPAIR_READING)
|
|
continue; // skip pairs being read as they will be clean
|
|
else if (p->state == CTPAIR_IDLE || p->state == CTPAIR_WRITING) {
|
|
if (p->dirty) {
|
|
p->checkpoint_pending = TRUE;
|
|
if (ct->pending_head)
|
|
ct->pending_head->pending_prev = p;
|
|
p->pending_next = ct->pending_head;
|
|
p->pending_prev = NULL;
|
|
ct->pending_head = p;
|
|
}
|
|
} else
|
|
assert(0);
|
|
}
|
|
}
|
|
rwlock_write_unlock(&ct->pending_lock);
|
|
|
|
//begin_checkpoint_userdata must be called AFTER all the pairs are marked as pending.
|
|
//Once marked as pending, we own write locks on the pairs, which means the writer threads can't conflict.
|
|
{
|
|
CACHEFILE cf;
|
|
for (cf = ct->cachefiles_in_checkpoint; cf; cf=cf->next_in_checkpoint) {
|
|
if (cf->begin_checkpoint_userdata) {
|
|
int r = cf->begin_checkpoint_userdata(cf, ct->lsn_of_checkpoint_in_progress, cf->userdata);
|
|
assert(r==0);
|
|
}
|
|
}
|
|
}
|
|
|
|
cachetable_unlock(ct);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
|
|
int
|
|
toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string) {
|
|
// Requires: The big checkpoint lock must be held (see src/checkpoint.c).
|
|
// Algorithm: Write all pending nodes to disk
|
|
// Use checkpoint callback to write snapshot information to disk (header, btt)
|
|
// Use end_checkpoint callback to fsync dictionary and log, and to free unused blocks
|
|
//
|
|
|
|
int retval = 0;
|
|
cachetable_lock(ct);
|
|
{
|
|
//
|
|
// #TODO: #1424 Long-lived get and pin (held by cursor) will cause a deadlock here.
|
|
// Need some solution (possibly modify requirement for write lock or something else).
|
|
PAIR p;
|
|
while ((p = ct->pending_head)!=0) {
|
|
ct->pending_head = ct->pending_head->pending_next;
|
|
pending_pairs_remove(ct, p);
|
|
|
|
rwlock_write_lock(&p->rwlock, ct->mutex); // grab an exclusive lock on the pair
|
|
// If it is no longer pending we don't have do do anything
|
|
if (p->checkpoint_pending) {
|
|
write_pair_for_checkpoint(ct, p); // clears the pending bit, writes it out, and unlocks the write pair,
|
|
} else {
|
|
// it was previously written, so we just have to release the lock.
|
|
rwlock_write_unlock(&p->rwlock); // didn't call cachetable_write_pair so we have to unlock it ourselves.
|
|
}
|
|
|
|
// Don't need to unlock and lock cachetable, because the cachetable was unlocked and locked while the flush callback ran.
|
|
}
|
|
}
|
|
assert(!ct->pending_head);
|
|
|
|
{ // have just written data blocks, so next write the translation and header for each open dictionary
|
|
CACHEFILE cf;
|
|
for (cf = ct->cachefiles_in_checkpoint; cf; cf=cf->next_in_checkpoint) {
|
|
if (cf->checkpoint_userdata) {
|
|
int r = cf->checkpoint_userdata(cf, cf->userdata);
|
|
assert(r==0);
|
|
}
|
|
}
|
|
}
|
|
|
|
{ // everything has been written to file (or at least OS internal buffer)...
|
|
// ... so fsync and call checkpoint-end function in block translator
|
|
// to free obsolete blocks on disk used by previous checkpoint
|
|
CACHEFILE cf;
|
|
for (cf = ct->cachefiles_in_checkpoint; cf; cf=cf->next_in_checkpoint) {
|
|
if (cf->end_checkpoint_userdata) {
|
|
int r = cf->end_checkpoint_userdata(cf, cf->userdata);
|
|
assert(r==0);
|
|
}
|
|
}
|
|
}
|
|
|
|
{
|
|
//Delete list of cachefiles in the checkpoint,
|
|
//remove reference
|
|
//clear bit saying they're in checkpoint
|
|
CACHEFILE cf;
|
|
while ((cf = ct->cachefiles_in_checkpoint)) {
|
|
ct->cachefiles_in_checkpoint = cf->next_in_checkpoint;
|
|
cf->next_in_checkpoint = NULL;
|
|
cf->for_checkpoint = FALSE;
|
|
cachetable_unlock(ct);
|
|
int r = toku_cachefile_close(&cf, logger, error_string);
|
|
if (r!=0) {
|
|
retval = r;
|
|
goto panic;
|
|
}
|
|
cachetable_lock(ct);
|
|
}
|
|
}
|
|
cachetable_unlock(ct);
|
|
|
|
if (logger) {
|
|
int r = toku_log_end_checkpoint(logger, NULL, 0, ct->lsn_of_checkpoint_in_progress.lsn);
|
|
assert(r==0);
|
|
toku_logger_note_checkpoint(logger, ct->lsn_of_checkpoint_in_progress);
|
|
}
|
|
|
|
panic:
|
|
return retval;
|
|
}
|
|
|
|
TOKULOGGER toku_cachefile_logger (CACHEFILE cf) {
|
|
return cf->cachetable->logger;
|
|
}
|
|
|
|
FILENUM toku_cachefile_filenum (CACHEFILE cf) {
|
|
return cf->filenum;
|
|
}
|
|
|
|
#if DO_WORKER_THREAD
|
|
|
|
// Worker thread function to write a pair from memory to its cachefile
|
|
static void cachetable_writer(WORKITEM wi) {
|
|
PAIR p = workitem_arg(wi);
|
|
CACHETABLE ct = p->cachefile->cachetable;
|
|
cachetable_lock(ct);
|
|
cachetable_write_pair(ct, p);
|
|
cachetable_unlock(ct);
|
|
}
|
|
|
|
// Worker thread function to read a pair from a cachefile to memory
|
|
static void cachetable_reader(WORKITEM wi) {
|
|
PAIR p = workitem_arg(wi);
|
|
CACHETABLE ct = p->cachefile->cachetable;
|
|
cachetable_lock(ct);
|
|
int r = cachetable_fetch_pair(ct, p->cachefile, p);
|
|
#define DO_FLUSH_FROM_READER 0
|
|
if (r == 0) {
|
|
#if DO_FLUSH_FROM_READER
|
|
maybe_flush_some(ct, 0);
|
|
#else
|
|
r = r;
|
|
#endif
|
|
}
|
|
cachetable_unlock(ct);
|
|
}
|
|
|
|
#endif
|
|
|
|
// debug functions
|
|
|
|
int toku_cachetable_assert_all_unpinned (CACHETABLE ct) {
|
|
u_int32_t i;
|
|
int some_pinned=0;
|
|
cachetable_lock(ct);
|
|
for (i=0; i<ct->table_size; i++) {
|
|
PAIR p;
|
|
for (p=ct->table[i]; p; p=p->hash_chain) {
|
|
assert(rwlock_readers(&p->rwlock)>=0);
|
|
if (rwlock_readers(&p->rwlock)) {
|
|
//printf("%s:%d pinned: %"PRId64" (%p)\n", __FILE__, __LINE__, p->key.b, p->value);
|
|
some_pinned=1;
|
|
}
|
|
}
|
|
}
|
|
cachetable_unlock(ct);
|
|
return some_pinned;
|
|
}
|
|
|
|
int toku_cachefile_count_pinned (CACHEFILE cf, int print_them) {
|
|
u_int32_t i;
|
|
int n_pinned=0;
|
|
CACHETABLE ct = cf->cachetable;
|
|
cachetable_lock(ct);
|
|
for (i=0; i<ct->table_size; i++) {
|
|
PAIR p;
|
|
for (p=ct->table[i]; p; p=p->hash_chain) {
|
|
assert(rwlock_readers(&p->rwlock)>=0);
|
|
if (rwlock_readers(&p->rwlock) && (cf==0 || p->cachefile==cf)) {
|
|
if (print_them) printf("%s:%d pinned: %"PRId64" (%p)\n", __FILE__, __LINE__, p->key.b, p->value);
|
|
n_pinned++;
|
|
}
|
|
}
|
|
}
|
|
cachetable_unlock(ct);
|
|
return n_pinned;
|
|
}
|
|
|
|
void toku_cachetable_print_state (CACHETABLE ct) {
|
|
u_int32_t i;
|
|
cachetable_lock(ct);
|
|
for (i=0; i<ct->table_size; i++) {
|
|
PAIR p = ct->table[i];
|
|
if (p != 0) {
|
|
printf("t[%u]=", i);
|
|
for (p=ct->table[i]; p; p=p->hash_chain) {
|
|
printf(" {%"PRId64", %p, dirty=%d, pin=%d, size=%ld}", p->key.b, p->cachefile, (int) p->dirty, rwlock_readers(&p->rwlock), p->size);
|
|
}
|
|
printf("\n");
|
|
}
|
|
}
|
|
cachetable_unlock(ct);
|
|
}
|
|
|
|
void toku_cachetable_get_state (CACHETABLE ct, int *num_entries_ptr, int *hash_size_ptr, long *size_current_ptr, long *size_limit_ptr) {
|
|
cachetable_lock(ct);
|
|
if (num_entries_ptr)
|
|
*num_entries_ptr = ct->n_in_table;
|
|
if (hash_size_ptr)
|
|
*hash_size_ptr = ct->table_size;
|
|
if (size_current_ptr)
|
|
*size_current_ptr = ct->size_current;
|
|
if (size_limit_ptr)
|
|
*size_limit_ptr = ct->size_limit;
|
|
cachetable_unlock(ct);
|
|
}
|
|
|
|
int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, void **value_ptr,
|
|
int *dirty_ptr, long long *pin_ptr, long *size_ptr) {
|
|
PAIR p;
|
|
int count = 0;
|
|
int r = -1;
|
|
u_int32_t fullhash = toku_cachetable_hash(cf, key);
|
|
cachetable_lock(ct);
|
|
for (p = ct->table[fullhash&(ct->table_size-1)]; p; p = p->hash_chain) {
|
|
count++;
|
|
if (p->key.b == key.b && p->cachefile == cf) {
|
|
note_hash_count(count);
|
|
if (value_ptr)
|
|
*value_ptr = p->value;
|
|
if (dirty_ptr)
|
|
*dirty_ptr = p->dirty;
|
|
if (pin_ptr)
|
|
*pin_ptr = rwlock_readers(&p->rwlock);
|
|
if (size_ptr)
|
|
*size_ptr = p->size;
|
|
r = 0;
|
|
break;
|
|
}
|
|
}
|
|
note_hash_count(count);
|
|
cachetable_unlock(ct);
|
|
return r;
|
|
}
|
|
|
|
void
|
|
toku_cachefile_set_userdata (CACHEFILE cf,
|
|
void *userdata,
|
|
int (*close_userdata)(CACHEFILE, void*, char**),
|
|
int (*checkpoint_userdata)(CACHEFILE, void*),
|
|
int (*begin_checkpoint_userdata)(CACHEFILE, LSN, void*),
|
|
int (*end_checkpoint_userdata)(CACHEFILE, void*)) {
|
|
cf->userdata = userdata;
|
|
cf->close_userdata = close_userdata;
|
|
cf->checkpoint_userdata = checkpoint_userdata;
|
|
cf->begin_checkpoint_userdata = begin_checkpoint_userdata;
|
|
cf->end_checkpoint_userdata = end_checkpoint_userdata;
|
|
}
|
|
|
|
void *toku_cachefile_get_userdata(CACHEFILE cf) {
|
|
return cf->userdata;
|
|
}
|
|
|
|
int
|
|
toku_cachefile_fsync(CACHEFILE cf) {
|
|
int r;
|
|
|
|
if (cf->fname==0) r = 0; //Don't fsync /dev/null
|
|
else r = fsync(cf->fd);
|
|
return r;
|
|
}
|
|
|
|
int toku_cachefile_redirect_nullfd (CACHEFILE cf) {
|
|
int null_fd;
|
|
struct fileid fileid;
|
|
|
|
null_fd = open(DEV_NULL_FILE, O_WRONLY+O_BINARY);
|
|
assert(null_fd>=0);
|
|
toku_os_get_unique_file_id(null_fd, &fileid);
|
|
close(cf->fd);
|
|
cf->fd = null_fd;
|
|
if (cf->fname) {
|
|
toku_free(cf->fname);
|
|
cf->fname = 0;
|
|
}
|
|
cachefile_init_filenum(cf, null_fd, NULL, fileid);
|
|
return 0;
|
|
}
|
|
|
|
u_int64_t
|
|
toku_cachefile_size_in_memory(CACHEFILE cf)
|
|
{
|
|
u_int64_t result=0;
|
|
CACHETABLE ct=cf->cachetable;
|
|
unsigned long i;
|
|
for (i=0; i<ct->table_size; i++) {
|
|
PAIR p;
|
|
for (p=ct->table[i]; p; p=p->hash_chain) {
|
|
if (p->cachefile==cf) {
|
|
result += p->size;
|
|
}
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|