2007-11-29 14:18:54 +00:00
/* -*- mode: C; c-basic-offset: 4 -*- */
2008-01-24 15:10:32 +00:00
# ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
2007-09-17 16:23:05 +00:00
2013-04-16 23:57:20 -04:00
# include "includes.h"
2008-02-08 03:17:38 +00:00
2013-04-16 23:57:17 -04:00
// execute the cachetable callbacks using a writer thread 0->no 1->yes
2013-04-16 23:57:25 -04:00
# define DO_WRITER_THREAD 1
2013-04-16 23:57:34 -04:00
2013-04-16 23:57:17 -04:00
# if DO_WRITER_THREAD
static void * cachetable_writer ( void * ) ;
# endif
2008-08-25 16:25:30 +00:00
2013-04-16 23:57:18 -04:00
// we use 4 threads since gunzip is 4 times faster than gzip
# define MAX_WRITER_THREADS 4
2013-04-16 23:57:17 -04:00
// use cachetable locks 0->no 1->yes
2013-04-16 23:57:17 -04:00
# define DO_CACHETABLE_LOCK 1
2007-07-13 19:37:47 +00:00
2013-04-16 23:57:17 -04:00
// unlock the cachetable while executing callbacks 0->no 1->yes
2013-04-16 23:57:17 -04:00
# define DO_CALLBACK_UNLOCK 1
2013-04-16 23:57:17 -04:00
// simulate long latency write operations with usleep. time in milliseconds.
# define DO_CALLBACK_USLEEP 0
# define DO_CALLBACK_BUSYWAIT 0
2007-07-13 19:37:47 +00:00
//#define TRACE_CACHETABLE
# ifdef TRACE_CACHETABLE
# define WHEN_TRACE_CT(x) x
# else
# define WHEN_TRACE_CT(x) ((void)0)
# endif
typedef struct ctpair * PAIR ;
struct ctpair {
2007-08-01 16:01:52 +00:00
enum typ_tag tag ;
2007-07-13 19:37:47 +00:00
char dirty ;
2013-04-16 23:57:17 -04:00
char verify_flag ; // Used in verify_cachetable()
char writing ; // writing back
char write_me ;
2007-07-13 19:37:47 +00:00
CACHEKEY key ;
void * value ;
2013-04-16 23:57:17 -04:00
long size ;
PAIR next , prev ; // In LRU list.
2007-07-13 19:37:47 +00:00
PAIR hash_chain ;
CACHEFILE cachefile ;
2013-04-16 23:57:17 -04:00
CACHETABLE_FLUSH_CALLBACK flush_callback ;
CACHETABLE_FETCH_CALLBACK fetch_callback ;
2007-11-14 17:58:38 +00:00
void * extraargs ;
2013-04-16 23:57:17 -04:00
LSN modified_lsn ; // What was the LSN when modified (undefined if not dirty)
LSN written_lsn ; // What was the LSN when written (we need to get this information when we fetch)
2008-06-17 17:05:19 +00:00
u_int32_t fullhash ;
2013-04-16 23:57:17 -04:00
PAIR next_wq ; // the ctpair's are linked into a write queue when evicted
struct ctpair_rwlock rwlock ; // reader writer lock used to grant an exclusive lock to the writeback thread
struct writequeue * cq ; // writers sometimes return ctpair's using this queue
2007-07-13 19:37:47 +00:00
} ;
2013-04-16 23:57:17 -04:00
# include "cachetable-writequeue.h"
static inline void ctpair_destroy ( PAIR p ) {
ctpair_rwlock_destroy ( & p - > rwlock ) ;
toku_free ( p ) ;
}
2007-11-14 17:58:38 +00:00
// The cachetable is as close to an ENV as we get.
2007-07-13 19:37:47 +00:00
struct cachetable {
enum typ_tag tag ;
2008-06-14 01:38:53 +00:00
u_int32_t n_in_table ;
u_int32_t table_size ;
2013-04-16 23:57:17 -04:00
PAIR * table ; // hash table
PAIR head , tail ; // of LRU list. head is the most recently used. tail is least recently used.
CACHEFILE cachefiles ; // list of cachefiles that use this cachetable
long size_current ; // the sum of the sizes of the pairs in the cachetable
long size_limit ; // the limit to the sum of the pair sizes
long size_writing ; // the sum of the sizes of the pairs being written
2007-11-14 17:58:38 +00:00
LSN lsn_of_checkpoint ; // the most recent checkpoint in the log.
TOKULOGGER logger ;
2013-04-16 23:57:27 -04:00
toku_pthread_mutex_t mutex ; // coarse lock that protects the cachetable, the cachefiles, and the pair's
2013-04-16 23:57:17 -04:00
struct writequeue wq ; // write queue for the writer threads
THREADPOOL threadpool ; // pool of writer threads
2013-04-16 23:57:18 -04:00
char checkpointing ; // checkpoint in progress
2007-07-13 19:37:47 +00:00
} ;
2008-08-25 16:25:30 +00:00
// lock the cachetable mutex
static inline void cachetable_lock ( CACHETABLE ct __attribute__ ( ( unused ) ) ) {
# if DO_CACHETABLE_LOCK
2013-04-16 23:57:27 -04:00
int r = toku_pthread_mutex_lock ( & ct - > mutex ) ; assert ( r = = 0 ) ;
2008-08-25 16:25:30 +00:00
# endif
}
// unlock the cachetable mutex
static inline void cachetable_unlock ( CACHETABLE ct __attribute__ ( ( unused ) ) ) {
# if DO_CACHETABLE_LOCK
2013-04-16 23:57:27 -04:00
int r = toku_pthread_mutex_unlock ( & ct - > mutex ) ; assert ( r = = 0 ) ;
2008-08-25 16:25:30 +00:00
# endif
}
2013-04-16 23:57:17 -04:00
// wait for writes to complete if the size in the write queue is 1/2 of
// the cachetable
static inline void cachetable_wait_write ( CACHETABLE ct ) {
while ( 2 * ct - > size_writing > ct - > size_current ) {
writequeue_wait_write ( & ct - > wq , & ct - > mutex ) ;
}
}
2007-07-13 19:37:47 +00:00
struct cachefile {
CACHEFILE next ;
2008-04-07 01:30:25 +00:00
u_int64_t refcount ; /* CACHEFILEs are shared. Use a refcount to decide when to really close it.
* The reference count is one for every open DB .
* Plus one for every commit / rollback record . ( It would be harder to keep a count for every open transaction ,
* because then we ' d have to figure out if the transaction was already counted . If we simply use a count for
* every record in the transaction , we ' ll be ok . Hence we use a 64 - bit counter to make sure we don ' t run out .
*/
2007-07-13 19:37:47 +00:00
int fd ; /* Bug: If a file is opened read-only, then it is stuck in read-only. If it is opened read-write, then subsequent writers can write to it too. */
CACHETABLE cachetable ;
struct fileid fileid ;
2007-11-14 17:58:38 +00:00
FILENUM filenum ;
2008-04-17 03:11:55 +00:00
char * fname ;
2013-04-16 23:57:18 -04:00
void * userdata ;
int ( * close_userdata ) ( CACHEFILE cf , void * userdata ) ; // when closing the last reference to a cachefile, first call this function.
2013-04-16 23:57:29 -04:00
int ( * checkpoint_userdata ) ( CACHEFILE cf , void * userdata ) ; // when checkpointing a cachefile, call this function.
2007-07-13 19:37:47 +00:00
} ;
2007-11-19 23:47:44 +00:00
int toku_create_cachetable ( CACHETABLE * result , long size_limit , LSN initial_lsn , TOKULOGGER logger ) {
2013-04-16 23:57:27 -04:00
# if defined __linux__
2008-05-28 01:22:51 +00:00
{
static int did_mallopt = 0 ;
if ( ! did_mallopt ) {
mallopt ( M_MMAP_THRESHOLD , 1024 * 64 ) ; // 64K and larger should be malloced with mmap().
did_mallopt = 1 ;
}
}
2013-04-16 23:57:27 -04:00
# endif
2007-07-24 03:28:48 +00:00
TAGMALLOC ( CACHETABLE , t ) ;
2013-04-16 23:57:17 -04:00
if ( t = = 0 ) return ENOMEM ;
2007-07-13 19:37:47 +00:00
t - > n_in_table = 0 ;
2008-06-14 01:38:53 +00:00
t - > table_size = 4 ;
2007-08-10 21:01:24 +00:00
MALLOC_N ( t - > table_size , t - > table ) ;
2007-07-13 19:37:47 +00:00
assert ( t - > table ) ;
t - > head = t - > tail = 0 ;
2013-04-16 23:57:17 -04:00
u_int32_t i ;
2007-07-13 19:37:47 +00:00
for ( i = 0 ; i < t - > table_size ; i + + ) {
t - > table [ i ] = 0 ;
}
t - > cachefiles = 0 ;
2007-09-21 17:55:49 +00:00
t - > size_current = 0 ;
t - > size_limit = size_limit ;
2013-04-16 23:57:17 -04:00
t - > size_writing = 0 ;
2007-11-14 17:58:38 +00:00
t - > lsn_of_checkpoint = initial_lsn ;
2013-04-16 23:57:17 -04:00
t - > logger = logger ;
2013-04-16 23:57:18 -04:00
t - > checkpointing = 0 ;
2013-04-16 23:57:17 -04:00
int r ;
writequeue_init ( & t - > wq ) ;
2013-04-16 23:57:27 -04:00
r = toku_pthread_mutex_init ( & t - > mutex , 0 ) ; assert ( r = = 0 ) ;
2013-04-16 23:57:17 -04:00
2013-04-16 23:57:18 -04:00
// set the max number of writeback threads to min(MAX_WRITER_THREADS,nprocs_online)
2013-04-16 23:57:28 -04:00
int nprocs = toku_os_get_number_active_processors ( ) ;
2013-04-16 23:57:18 -04:00
if ( nprocs > MAX_WRITER_THREADS ) nprocs = MAX_WRITER_THREADS ;
2013-04-16 23:57:17 -04:00
r = threadpool_create ( & t - > threadpool , nprocs ) ; assert ( r = = 0 ) ;
# if DO_WRITER_THREAD
2013-04-16 23:57:35 -04:00
for ( i = 0 ; i < ( u_int32_t ) nprocs ; i + + )
threadpool_maybe_add ( t - > threadpool , cachetable_writer , t ) ;
2008-08-25 16:25:30 +00:00
# endif
2007-07-13 19:37:47 +00:00
* result = t ;
return 0 ;
}
2008-01-11 22:24:43 +00:00
// What cachefile goes with particular fd?
2008-04-09 02:45:27 +00:00
int toku_cachefile_of_filenum ( CACHETABLE t , FILENUM filenum , CACHEFILE * cf ) {
2008-01-11 22:24:43 +00:00
CACHEFILE extant ;
for ( extant = t - > cachefiles ; extant ; extant = extant - > next ) {
2008-02-26 17:47:40 +00:00
if ( extant - > filenum . fileid = = filenum . fileid ) {
* cf = extant ;
return 0 ;
}
2008-01-11 22:24:43 +00:00
}
return ENOENT ;
}
2008-07-21 02:34:13 +00:00
static FILENUM next_filenum_to_use = { 0 } ;
static void cachefile_init_filenum ( CACHEFILE newcf , int fd , const char * fname , struct fileid fileid ) \
{
newcf - > fd = fd ;
newcf - > fileid = fileid ;
newcf - > fname = fname ? toku_strdup ( fname ) : 0 ;
}
2008-02-29 20:47:11 +00:00
// If something goes wrong, close the fd. After this, the caller shouldn't close the fd, but instead should close the cachefile.
2008-04-17 03:11:55 +00:00
int toku_cachetable_openfd ( CACHEFILE * cf , CACHETABLE t , int fd , const char * fname ) {
2007-07-13 19:37:47 +00:00
int r ;
CACHEFILE extant ;
struct fileid fileid ;
2013-04-16 23:57:28 -04:00
r = toku_os_get_unique_file_id ( fd , & fileid ) ;
2013-04-16 23:57:27 -04:00
if ( r ! = 0 ) {
r = errno ; close ( fd ) ;
return r ;
}
2007-07-13 19:37:47 +00:00
for ( extant = t - > cachefiles ; extant ; extant = extant - > next ) {
if ( memcmp ( & extant - > fileid , & fileid , sizeof ( fileid ) ) = = 0 ) {
2007-09-21 17:55:49 +00:00
r = close ( fd ) ;
assert ( r = = 0 ) ;
2007-07-13 19:37:47 +00:00
extant - > refcount + + ;
* cf = extant ;
return 0 ;
}
}
2008-04-18 21:30:39 +00:00
try_again :
for ( extant = t - > cachefiles ; extant ; extant = extant - > next ) {
if ( next_filenum_to_use . fileid = = extant - > filenum . fileid ) {
next_filenum_to_use . fileid + + ;
goto try_again ;
}
}
2007-07-13 19:37:47 +00:00
{
CACHEFILE MALLOC ( newcf ) ;
2008-07-21 02:34:13 +00:00
newcf - > cachetable = t ;
2008-07-21 17:48:26 +00:00
newcf - > filenum . fileid = next_filenum_to_use . fileid + + ;
2008-07-21 02:34:13 +00:00
cachefile_init_filenum ( newcf , fd , fname , fileid ) ;
newcf - > refcount = 1 ;
2007-07-13 19:37:47 +00:00
newcf - > next = t - > cachefiles ;
t - > cachefiles = newcf ;
2013-04-16 23:57:18 -04:00
newcf - > userdata = 0 ;
newcf - > close_userdata = 0 ;
2013-04-16 23:57:29 -04:00
newcf - > checkpoint_userdata = 0 ;
2013-04-16 23:57:18 -04:00
2007-07-13 19:37:47 +00:00
* cf = newcf ;
return 0 ;
}
}
2008-04-09 02:45:27 +00:00
int toku_cachetable_openf ( CACHEFILE * cf , CACHETABLE t , const char * fname , int flags , mode_t mode ) {
2013-04-16 23:57:27 -04:00
int fd = open ( fname , flags + O_BINARY , mode ) ;
2007-11-18 12:48:36 +00:00
if ( fd < 0 ) return errno ;
2008-04-17 03:11:55 +00:00
return toku_cachetable_openfd ( cf , t , fd , fname ) ;
2007-11-18 12:48:36 +00:00
}
2008-07-21 02:34:13 +00:00
int toku_cachefile_set_fd ( CACHEFILE cf , int fd , const char * fname ) {
int r ;
2013-04-16 23:57:30 -04:00
struct fileid fileid ;
r = toku_os_get_unique_file_id ( fd , & fileid ) ;
2008-07-21 02:34:13 +00:00
if ( r ! = 0 ) {
r = errno ; close ( fd ) ; return r ;
}
2013-04-16 23:57:18 -04:00
if ( cf - > close_userdata & & ( r = cf - > close_userdata ( cf , cf - > userdata ) ) ) {
return r ;
}
cf - > close_userdata = NULL ;
2013-04-16 23:57:29 -04:00
cf - > checkpoint_userdata = NULL ;
2013-04-16 23:57:18 -04:00
cf - > userdata = NULL ;
2008-07-21 02:34:13 +00:00
close ( cf - > fd ) ;
cf - > fd = - 1 ;
if ( cf - > fname ) {
toku_free ( cf - > fname ) ;
cf - > fname = 0 ;
}
cachefile_init_filenum ( cf , fd , fname , fileid ) ;
return 0 ;
}
int toku_cachefile_fd ( CACHEFILE cf ) {
return cf - > fd ;
}
2013-04-16 23:57:27 -04:00
int toku_cachefile_truncate0 ( CACHEFILE cf ) {
int r = ftruncate ( cf - > fd , 0 ) ;
if ( r ! = 0 )
r = errno ;
return r ;
}
2007-11-19 23:47:44 +00:00
static CACHEFILE remove_cf_from_list ( CACHEFILE cf , CACHEFILE list ) {
2007-07-13 19:37:47 +00:00
if ( list = = 0 ) return 0 ;
else if ( list = = cf ) {
return list - > next ;
} else {
list - > next = remove_cf_from_list ( cf , list - > next ) ;
return list ;
}
}
2013-04-16 23:57:18 -04:00
static int cachefile_write_maybe_remove ( CACHETABLE , CACHEFILE cf , BOOL do_remove ) ;
2007-07-13 19:37:47 +00:00
2008-04-09 02:45:27 +00:00
// Increment the reference count
void toku_cachefile_refup ( CACHEFILE cf ) {
cf - > refcount + + ;
}
2008-04-17 03:11:55 +00:00
int toku_cachefile_close ( CACHEFILE * cfp , TOKULOGGER logger ) {
2007-08-01 02:37:21 +00:00
CACHEFILE cf = * cfp ;
2013-04-16 23:57:17 -04:00
CACHETABLE ct = cf - > cachetable ;
cachetable_lock ( ct ) ;
2007-07-13 19:37:47 +00:00
assert ( cf - > refcount > 0 ) ;
cf - > refcount - - ;
if ( cf - > refcount = = 0 ) {
int r ;
2013-04-16 23:57:18 -04:00
if ( ( r = cachefile_write_maybe_remove ( ct , cf , TRUE ) ) ) {
2013-04-16 23:57:17 -04:00
cachetable_unlock ( ct ) ;
return r ;
}
2013-04-16 23:57:18 -04:00
if ( cf - > close_userdata & & ( r = cf - > close_userdata ( cf , cf - > userdata ) ) ) {
cachetable_unlock ( ct ) ;
return r ;
}
cf - > close_userdata = NULL ;
2013-04-16 23:57:29 -04:00
cf - > checkpoint_userdata = NULL ;
2013-04-16 23:57:18 -04:00
cf - > userdata = NULL ;
2013-04-16 23:57:17 -04:00
cf - > cachetable - > cachefiles = remove_cf_from_list ( cf , cf - > cachetable - > cachefiles ) ;
cachetable_unlock ( ct ) ;
2007-07-13 19:37:47 +00:00
r = close ( cf - > fd ) ;
2007-09-21 17:55:49 +00:00
assert ( r = = 0 ) ;
cf - > fd = - 1 ;
2008-04-17 03:11:55 +00:00
if ( logger ) {
2013-04-16 23:57:20 -04:00
//assert(cf->fname);
//BYTESTRING bs = {.len=strlen(cf->fname), .data=cf->fname};
//r = toku_log_cfclose(logger, 0, 0, bs, cf->filenum);
2008-04-17 03:11:55 +00:00
}
if ( cf - > fname )
toku_free ( cf - > fname ) ;
2007-07-20 18:00:14 +00:00
toku_free ( cf ) ;
2007-08-01 02:37:21 +00:00
* cfp = 0 ;
2007-07-13 19:37:47 +00:00
return r ;
} else {
2013-04-16 23:57:17 -04:00
cachetable_unlock ( ct ) ;
2007-08-01 02:37:21 +00:00
* cfp = 0 ;
2007-07-13 19:37:47 +00:00
return 0 ;
}
}
2008-07-21 02:34:13 +00:00
int toku_cachefile_flush ( CACHEFILE cf ) {
2013-04-16 23:57:17 -04:00
CACHETABLE ct = cf - > cachetable ;
cachetable_lock ( ct ) ;
2013-04-16 23:57:18 -04:00
int r = cachefile_write_maybe_remove ( ct , cf , TRUE ) ;
2013-04-16 23:57:17 -04:00
cachetable_unlock ( ct ) ;
return r ;
2008-07-21 02:34:13 +00:00
}
2008-06-14 01:38:53 +00:00
// This hash function comes from Jenkins: http://burtleburtle.net/bob/c/lookup3.c
// The idea here is to mix the bits thoroughly so that we don't have to do modulo by a prime number.
// Instead we can use a bitmask on a table of size power of two.
// This hash function does yield improved performance on ./db-benchmark-test-tokudb and ./scanscan
static inline u_int32_t rot ( u_int32_t x , u_int32_t k ) {
return ( x < < k ) | ( x > > ( 32 - k ) ) ;
}
static inline u_int32_t final ( u_int32_t a , u_int32_t b , u_int32_t c ) {
2013-04-16 23:57:17 -04:00
c ^ = b ; c - = rot ( b , 14 ) ;
a ^ = c ; a - = rot ( c , 11 ) ;
b ^ = a ; b - = rot ( a , 25 ) ;
c ^ = b ; c - = rot ( b , 16 ) ;
a ^ = c ; a - = rot ( c , 4 ) ;
b ^ = a ; b - = rot ( a , 14 ) ;
c ^ = b ; c - = rot ( b , 24 ) ;
return c ;
2008-06-14 01:38:53 +00:00
}
2013-04-16 23:57:18 -04:00
u_int32_t toku_cachetable_hash ( CACHEFILE cachefile , BLOCKNUM key )
2008-06-17 17:05:19 +00:00
// Effect: Return a 32-bit hash key. The hash key shall be suitable for using with bitmasking for a table of size power-of-two.
{
2013-04-16 23:57:18 -04:00
return final ( cachefile - > filenum . fileid , ( u_int32_t ) ( key . b > > 32 ) , ( u_int32_t ) key . b ) ;
2008-06-17 17:05:19 +00:00
}
#if 0
2008-06-05 22:09:59 +00:00
static unsigned int hashit ( CACHETABLE t , CACHEKEY key , CACHEFILE cachefile ) {
2008-06-17 17:05:19 +00:00
assert ( 0 = = ( t - > table_size & ( t - > table_size - 1 ) ) ) ; // make sure table is power of two
return ( toku_cachetable_hash ( key , cachefile ) ) & ( t - > table_size - 1 ) ;
2007-07-13 19:37:47 +00:00
}
2008-06-17 17:05:19 +00:00
# endif
2007-07-13 19:37:47 +00:00
2008-06-14 01:38:53 +00:00
static void cachetable_rehash ( CACHETABLE t , u_int32_t newtable_size ) {
2007-10-05 14:46:49 +00:00
// printf("rehash %p %d %d %d\n", t, primeindexdelta, t->n_in_table, t->table_size);
2008-06-14 01:38:53 +00:00
assert ( newtable_size > = 4 & & ( ( newtable_size & ( newtable_size - 1 ) ) = = 0 ) ) ;
2007-10-05 14:46:49 +00:00
PAIR * newtable = toku_calloc ( newtable_size , sizeof ( * t - > table ) ) ;
2008-06-14 01:38:53 +00:00
u_int32_t i ;
2007-10-05 14:46:49 +00:00
//printf("%s:%d newtable_size=%d\n", __FILE__, __LINE__, newtable_size);
assert ( newtable ! = 0 ) ;
2008-06-14 01:38:53 +00:00
u_int32_t oldtable_size = t - > table_size ;
2008-06-05 22:09:59 +00:00
t - > table_size = newtable_size ;
2007-10-05 14:46:49 +00:00
for ( i = 0 ; i < newtable_size ; i + + ) newtable [ i ] = 0 ;
2008-06-05 22:09:59 +00:00
for ( i = 0 ; i < oldtable_size ; i + + ) {
2007-10-05 14:46:49 +00:00
PAIR p ;
while ( ( p = t - > table [ i ] ) ! = 0 ) {
2008-06-17 17:05:19 +00:00
unsigned int h = p - > fullhash & ( newtable_size - 1 ) ;
2007-10-05 14:46:49 +00:00
t - > table [ i ] = p - > hash_chain ;
p - > hash_chain = newtable [ h ] ;
newtable [ h ] = p ;
}
}
toku_free ( t - > table ) ;
// printf("Freed\n");
t - > table = newtable ;
//printf("Done growing or shrinking\n");
}
2007-07-13 19:37:47 +00:00
static void lru_remove ( CACHETABLE t , PAIR p ) {
if ( p - > next ) {
p - > next - > prev = p - > prev ;
} else {
assert ( t - > tail = = p ) ;
t - > tail = p - > prev ;
}
if ( p - > prev ) {
p - > prev - > next = p - > next ;
} else {
assert ( t - > head = = p ) ;
t - > head = p - > next ;
}
p - > prev = p - > next = 0 ;
}
static void lru_add_to_list ( CACHETABLE t , PAIR p ) {
// requires that touch_me is not currently in the table.
assert ( p - > prev = = 0 ) ;
p - > prev = 0 ;
p - > next = t - > head ;
if ( t - > head ) {
t - > head - > prev = p ;
} else {
assert ( ! t - > tail ) ;
t - > tail = p ;
}
t - > head = p ;
}
static void lru_touch ( CACHETABLE t , PAIR p ) {
lru_remove ( t , p ) ;
lru_add_to_list ( t , p ) ;
}
static PAIR remove_from_hash_chain ( PAIR remove_me , PAIR list ) {
if ( remove_me = = list ) return list - > hash_chain ;
list - > hash_chain = remove_from_hash_chain ( remove_me , list - > hash_chain ) ;
return list ;
}
2007-11-14 17:58:38 +00:00
// Predicate to determine if a node must be renamed. Nodes are renamed on the time they are written
// after a checkpoint.
// Thus we need to rename it if it is dirty,
// if it has been modified within the current checkpoint regime (hence non-strict inequality)
// and the last time it was written was in a previous checkpoint regime (strict inequality)
static BOOL need_to_rename_p ( CACHETABLE t , PAIR p ) {
2013-04-16 23:57:20 -04:00
return ( BOOL ) ( p - > dirty
& & p - > modified_lsn . lsn > = t - > lsn_of_checkpoint . lsn // nonstrict
& & p - > written_lsn . lsn < t - > lsn_of_checkpoint . lsn ) ; // strict
2007-11-14 17:58:38 +00:00
}
2013-04-16 23:57:17 -04:00
// Remove a pair from the cachetable
// Effects: the pair is removed from the LRU list and from the cachetable's hash table.
// The size of the objects in the cachetable is adjusted by the size of the pair being
// removed.
static void cachetable_remove_pair ( CACHETABLE ct , PAIR p ) {
lru_remove ( ct , p ) ;
assert ( ct - > n_in_table > 0 ) ;
ct - > n_in_table - - ;
2007-07-13 19:37:47 +00:00
// Remove it from the hash chain.
2007-10-17 22:10:47 +00:00
{
2013-04-16 23:57:17 -04:00
unsigned int h = p - > fullhash & ( ct - > table_size - 1 ) ;
ct - > table [ h ] = remove_from_hash_chain ( p , ct - > table [ h ] ) ;
}
ct - > size_current - = p - > size ; assert ( ct - > size_current > = 0 ) ;
}
// Maybe remove a pair from the cachetable and free it, depending on whether
// or not there are any threads interested in the pair. The flush callback
// is called with write_me and keep_me both false, and the pair is destroyed.
static void cachetable_maybe_remove_and_free_pair ( CACHETABLE ct , PAIR p ) {
if ( ctpair_users ( & p - > rwlock ) = = 0 ) {
cachetable_remove_pair ( ct , p ) ;
# if DO_CALLBACK_UNLOCK
cachetable_unlock ( ct ) ;
# endif
2013-04-16 23:57:18 -04:00
p - > flush_callback ( p - > cachefile , p - > key , p - > value , p - > extraargs , p - > size , FALSE , FALSE ,
2013-04-16 23:57:17 -04:00
ct - > lsn_of_checkpoint , need_to_rename_p ( ct , p ) ) ;
# if DO_CALLBACK_UNLOCK
cachetable_lock ( ct ) ;
# endif
2013-04-16 23:57:35 -04:00
ctpair_destroy ( p ) ;
2007-10-17 22:10:47 +00:00
}
2013-04-16 23:57:17 -04:00
}
static void cachetable_complete_write_pair ( CACHETABLE ct , PAIR p , BOOL do_remove ) ;
// Write a pair to storage
// Effects: an exclusive lock on the pair is obtained, the write callback is called,
// the pair dirty state is adjusted, and the write is completed. The write_me boolean
// is true when the pair is dirty and the pair is requested to be written. The keep_me
// boolean is true, so the pair is not yet evicted from the cachetable.
static void cachetable_write_pair ( CACHETABLE ct , PAIR p ) {
ctpair_write_lock ( & p - > rwlock , & ct - > mutex ) ;
# if DO_CALLBACK_UNLOCK
cachetable_unlock ( ct ) ;
# endif
// write callback
2013-04-16 23:57:20 -04:00
p - > flush_callback ( p - > cachefile , p - > key , p - > value , p - > extraargs , p - > size , ( BOOL ) ( p - > dirty & & p - > write_me ) , TRUE ,
2013-04-16 23:57:17 -04:00
ct - > lsn_of_checkpoint , need_to_rename_p ( ct , p ) ) ;
# if DO_CALLBACK_USLEEP
usleep ( DO_CALLBACK_USLEEP ) ;
# endif
# if DO_CALLBACK_BUSYWAIT
struct timeval tstart ;
gettimeofday ( & tstart , 0 ) ;
long long ltstart = tstart . tv_sec * 1000000 + tstart . tv_usec ;
while ( 1 ) {
struct timeval t ;
gettimeofday ( & t , 0 ) ;
long long lt = t . tv_sec * 1000000 + t . tv_usec ;
if ( lt - ltstart > DO_CALLBACK_BUSYWAIT )
break ;
}
# endif
# if DO_CALLBACK_UNLOCK
cachetable_lock ( ct ) ;
# endif
// the pair is no longer dirty once written
if ( p - > dirty & & p - > write_me )
p - > dirty = FALSE ;
// stuff it into a completion queue for delayed completion if a completion queue exists
// otherwise complete the write now
if ( p - > cq )
writequeue_enq ( p - > cq , p ) ;
else
cachetable_complete_write_pair ( ct , p , TRUE ) ;
}
// complete the write of a pair by reseting the writing flag, adjusting the write
// pending size, and maybe removing the pair from the cachetable if there are no
// references to it
static void cachetable_complete_write_pair ( CACHETABLE ct , PAIR p , BOOL do_remove ) {
p - > cq = 0 ;
p - > writing = 0 ;
// maybe wakeup any stalled writers when the pending writes fall below
// 1/8 of the size of the cachetable
ct - > size_writing - = p - > size ;
assert ( ct - > size_writing > = 0 ) ;
if ( 8 * ct - > size_writing < = ct - > size_current )
writequeue_wakeup_write ( & ct - > wq ) ;
ctpair_write_unlock ( & p - > rwlock ) ;
if ( do_remove )
cachetable_maybe_remove_and_free_pair ( ct , p ) ;
}
// flush and remove a pair from the cachetable. the callbacks are run by a thread in
// a thread pool.
static void flush_and_remove ( CACHETABLE ct , PAIR p , int write_me ) {
p - > writing = 1 ;
ct - > size_writing + = p - > size ; assert ( ct - > size_writing > = 0 ) ;
2013-04-16 23:57:20 -04:00
p - > write_me = ( char ) ( write_me ? 1 : 0 ) ;
2013-04-16 23:57:17 -04:00
# if DO_WRITER_THREAD
2013-04-16 23:57:18 -04:00
if ( ! p - > dirty | | ! p - > write_me ) {
// evictions without a write can be run in the current thread
cachetable_write_pair ( ct , p ) ;
} else {
writequeue_enq ( & ct - > wq , p ) ;
}
2013-04-16 23:57:17 -04:00
# else
cachetable_write_pair ( ct , p ) ;
# endif
2007-07-13 19:37:47 +00:00
}
2013-04-16 23:57:17 -04:00
static int maybe_flush_some ( CACHETABLE t , long size ) {
2007-10-05 14:46:49 +00:00
int r = 0 ;
2007-09-21 17:55:49 +00:00
again :
2013-04-16 23:57:17 -04:00
if ( size + t - > size_current > t - > size_limit + t - > size_writing ) {
2008-05-27 21:08:31 +00:00
{
2013-04-16 23:57:27 -04:00
//unsigned long rss __attribute__((__unused__)) = check_max_rss();
2008-05-27 21:08:31 +00:00
//printf("this-size=%.6fMB projected size = %.2fMB limit=%2.fMB rss=%2.fMB\n", size/(1024.0*1024.0), (size+t->size_current)/(1024.0*1024.0), t->size_limit/(1024.0*1024.0), rss/256.0);
2008-05-28 01:22:51 +00:00
//struct mallinfo m = mallinfo();
//printf(" arena=%d hblks=%d hblkhd=%d\n", m.arena, m.hblks, m.hblkhd);
2008-05-27 21:08:31 +00:00
}
2007-09-21 17:55:49 +00:00
/* Try to remove one. */
2007-07-13 19:37:47 +00:00
PAIR remove_me ;
for ( remove_me = t - > tail ; remove_me ; remove_me = remove_me - > prev ) {
2013-04-16 23:57:17 -04:00
if ( ! ctpair_users ( & remove_me - > rwlock ) & & ! remove_me - > writing ) {
2007-07-13 19:37:47 +00:00
flush_and_remove ( t , remove_me , 1 ) ;
goto again ;
}
}
/* All were pinned. */
2007-12-22 23:12:40 +00:00
//printf("All are pinned\n");
2008-04-07 19:52:49 +00:00
return 0 ; // Don't indicate an error code. Instead let memory get overfull.
2007-07-13 19:37:47 +00:00
}
2007-10-05 14:46:49 +00:00
2008-06-14 01:38:53 +00:00
if ( ( 4 * t - > n_in_table < t - > table_size ) & & t - > table_size > 4 )
cachetable_rehash ( t , t - > table_size / 2 ) ;
2007-10-05 14:46:49 +00:00
return r ;
2007-07-13 19:37:47 +00:00
}
2008-06-17 17:05:19 +00:00
static int cachetable_insert_at ( CACHEFILE cachefile , u_int32_t fullhash , CACHEKEY key , void * value , long size ,
2013-04-16 23:57:17 -04:00
CACHETABLE_FLUSH_CALLBACK flush_callback ,
CACHETABLE_FETCH_CALLBACK fetch_callback ,
2007-11-14 17:58:38 +00:00
void * extraargs , int dirty ,
LSN written_lsn ) {
2007-09-17 16:23:05 +00:00
TAGMALLOC ( PAIR , p ) ;
2013-04-16 23:57:17 -04:00
memset ( p , 0 , sizeof * p ) ;
ctpair_rwlock_init ( & p - > rwlock ) ;
2008-06-17 17:05:19 +00:00
p - > fullhash = fullhash ;
2013-04-16 23:57:20 -04:00
p - > dirty = ( char ) ( dirty ? 1 : 0 ) ; //printf("%s:%d p=%p dirty=%d\n", __FILE__, __LINE__, p, p->dirty);
2007-09-21 17:55:49 +00:00
p - > size = size ;
2013-04-16 23:57:17 -04:00
p - > writing = 0 ;
2007-09-17 16:23:05 +00:00
p - > key = key ;
p - > value = value ;
p - > next = p - > prev = 0 ;
p - > cachefile = cachefile ;
p - > flush_callback = flush_callback ;
p - > fetch_callback = fetch_callback ;
p - > extraargs = extraargs ;
2007-11-14 17:58:38 +00:00
p - > modified_lsn . lsn = 0 ;
p - > written_lsn = written_lsn ;
2008-06-17 17:05:19 +00:00
p - > fullhash = fullhash ;
2007-09-21 17:55:49 +00:00
CACHETABLE ct = cachefile - > cachetable ;
2013-04-16 23:57:17 -04:00
ctpair_read_lock ( & p - > rwlock , & ct - > mutex ) ;
p - > cq = 0 ;
2007-09-21 17:55:49 +00:00
lru_add_to_list ( ct , p ) ;
2008-06-17 17:05:19 +00:00
u_int32_t h = fullhash & ( ct - > table_size - 1 ) ;
2007-09-21 17:55:49 +00:00
p - > hash_chain = ct - > table [ h ] ;
ct - > table [ h ] = p ;
ct - > n_in_table + + ;
ct - > size_current + = size ;
2007-10-17 22:10:47 +00:00
if ( ct - > n_in_table > ct - > table_size ) {
2008-06-14 01:38:53 +00:00
cachetable_rehash ( ct , ct - > table_size * 2 ) ;
2007-10-17 22:10:47 +00:00
}
2007-09-17 16:23:05 +00:00
return 0 ;
}
2008-06-14 01:38:53 +00:00
enum { hash_histogram_max = 100 } ;
static unsigned long long hash_histogram [ hash_histogram_max ] ;
void print_hash_histogram ( void ) {
int i ;
for ( i = 0 ; i < hash_histogram_max ; i + + )
2013-04-16 23:57:20 -04:00
if ( hash_histogram [ i ] ) printf ( " %d:%llu " , i , hash_histogram [ i ] ) ;
2008-06-14 01:38:53 +00:00
printf ( " \n " ) ;
}
2013-04-16 23:57:20 -04:00
static void
note_hash_count ( int count ) {
2008-06-14 01:38:53 +00:00
if ( count > = hash_histogram_max ) count = hash_histogram_max - 1 ;
hash_histogram [ count ] + + ;
}
2008-06-17 17:05:19 +00:00
int toku_cachetable_put ( CACHEFILE cachefile , CACHEKEY key , u_int32_t fullhash , void * value , long size ,
2013-04-16 23:57:17 -04:00
CACHETABLE_FLUSH_CALLBACK flush_callback ,
CACHETABLE_FETCH_CALLBACK fetch_callback , void * extraargs ) {
2007-07-13 19:37:47 +00:00
WHEN_TRACE_CT ( printf ( " %s:%d CT cachetable_put(%lld)=%p \n " , __FILE__ , __LINE__ , key , value ) ) ;
2008-08-25 16:25:30 +00:00
CACHETABLE ct = cachefile - > cachetable ;
2013-04-16 23:57:17 -04:00
int count = 0 ;
2008-08-25 16:25:30 +00:00
cachetable_lock ( ct ) ;
2013-04-16 23:57:17 -04:00
cachetable_wait_write ( ct ) ;
2007-08-01 16:01:52 +00:00
{
PAIR p ;
2013-04-16 23:57:17 -04:00
for ( p = ct - > table [ fullhash & ( cachefile - > cachetable - > table_size - 1 ) ] ; p ; p = p - > hash_chain ) {
2008-06-14 01:38:53 +00:00
count + + ;
2013-04-16 23:57:18 -04:00
if ( p - > key . b = = key . b & & p - > cachefile = = cachefile ) {
2007-08-01 16:01:52 +00:00
// Semantically, these two asserts are not strictly right. After all, when are two functions eq?
// In practice, the functions better be the same.
assert ( p - > flush_callback = = flush_callback ) ;
assert ( p - > fetch_callback = = fetch_callback ) ;
2013-04-16 23:57:17 -04:00
ctpair_read_lock ( & p - > rwlock , & ct - > mutex ) ;
2008-08-25 16:25:30 +00:00
cachetable_unlock ( ct ) ;
2013-04-16 23:57:17 -04:00
note_hash_count ( count ) ;
2007-08-01 16:01:52 +00:00
return - 1 ; /* Already present. */
}
2007-07-13 19:37:47 +00:00
}
}
2008-04-07 19:52:49 +00:00
int r ;
2013-04-16 23:57:17 -04:00
if ( ( r = maybe_flush_some ( ct , size ) ) ) {
2008-08-25 16:25:30 +00:00
cachetable_unlock ( ct ) ;
return r ;
}
2008-06-17 17:05:19 +00:00
// flushing could change the table size, but wont' change the fullhash
r = cachetable_insert_at ( cachefile , fullhash , key , value , size , flush_callback , fetch_callback , extraargs , 1 , ZERO_LSN ) ;
2008-08-25 16:25:30 +00:00
cachetable_unlock ( ct ) ;
2013-04-16 23:57:17 -04:00
note_hash_count ( count ) ;
2007-10-17 22:10:47 +00:00
return r ;
2007-07-13 19:37:47 +00:00
}
2008-06-17 17:05:19 +00:00
int toku_cachetable_get_and_pin ( CACHEFILE cachefile , CACHEKEY key , u_int32_t fullhash , void * * value , long * sizep ,
2013-04-16 23:57:17 -04:00
CACHETABLE_FLUSH_CALLBACK flush_callback ,
CACHETABLE_FETCH_CALLBACK fetch_callback , void * extraargs ) {
2007-07-13 19:37:47 +00:00
CACHETABLE t = cachefile - > cachetable ;
PAIR p ;
2008-06-14 01:38:53 +00:00
int count = 0 ;
2013-04-16 23:57:17 -04:00
cachetable_lock ( t ) ;
cachetable_wait_write ( t ) ;
2008-06-17 17:05:19 +00:00
for ( p = t - > table [ fullhash & ( t - > table_size - 1 ) ] ; p ; p = p - > hash_chain ) {
2008-06-14 01:38:53 +00:00
count + + ;
2013-04-16 23:57:18 -04:00
if ( p - > key . b = = key . b & & p - > cachefile = = cachefile ) {
2007-07-13 19:37:47 +00:00
* value = p - > value ;
2007-10-19 14:07:41 +00:00
if ( sizep ) * sizep = p - > size ;
2013-04-16 23:57:17 -04:00
ctpair_read_lock ( & p - > rwlock , & t - > mutex ) ;
2007-07-13 19:37:47 +00:00
lru_touch ( t , p ) ;
2008-08-25 16:25:30 +00:00
cachetable_unlock ( t ) ;
2013-04-16 23:57:17 -04:00
note_hash_count ( count ) ;
2007-07-13 19:37:47 +00:00
WHEN_TRACE_CT ( printf ( " %s:%d cachtable_get_and_pin(%lld)--> %p \n " , __FILE__ , __LINE__ , key , * value ) ) ;
return 0 ;
}
}
2008-06-14 01:38:53 +00:00
note_hash_count ( count ) ;
2008-04-07 19:52:49 +00:00
int r ;
2008-06-17 17:05:19 +00:00
// Note. hashit(t,key) may have changed as a result of flushing. But fullhash won't have changed.
2007-07-13 19:37:47 +00:00
{
2007-09-21 17:55:49 +00:00
void * toku_value ;
long size = 1 ; // compat
2007-11-14 17:58:38 +00:00
LSN written_lsn ;
2007-07-13 19:37:47 +00:00
WHEN_TRACE_CT ( printf ( " %s:%d CT: fetch_callback(%lld...) \n " , __FILE__ , __LINE__ , key ) ) ;
2008-06-17 17:05:19 +00:00
if ( ( r = fetch_callback ( cachefile , key , fullhash , & toku_value , & size , extraargs , & written_lsn ) ) ) {
2013-04-16 23:57:33 -04:00
if ( r = = DB_BADFORMAT ) toku_db_badformat ( ) ;
2008-08-25 16:25:30 +00:00
cachetable_unlock ( t ) ;
2007-09-21 17:55:49 +00:00
return r ;
2008-01-23 18:06:23 +00:00
}
2008-06-17 17:05:19 +00:00
cachetable_insert_at ( cachefile , fullhash , key , toku_value , size , flush_callback , fetch_callback , extraargs , 0 , written_lsn ) ;
2007-07-20 18:00:14 +00:00
* value = toku_value ;
2007-09-21 17:55:49 +00:00
if ( sizep )
* sizep = size ;
2008-08-25 16:25:30 +00:00
}
2013-04-16 23:57:17 -04:00
r = maybe_flush_some ( t , 0 ) ;
2013-04-16 23:57:33 -04:00
if ( r = = DB_BADFORMAT ) toku_db_badformat ( ) ;
2008-08-25 16:25:30 +00:00
cachetable_unlock ( t ) ;
2007-07-13 19:37:47 +00:00
WHEN_TRACE_CT ( printf ( " %s:%d did fetch: cachtable_get_and_pin(%lld)--> %p \n " , __FILE__ , __LINE__ , key , * value ) ) ;
2013-04-16 23:57:17 -04:00
return r ;
2007-07-13 19:37:47 +00:00
}
2013-04-16 23:57:18 -04:00
// Lookup a key in the cachetable. If it is found and it is not being written, then
// acquire a read lock on the pair, update the LRU list, and return sucess. However,
// if it is being written, then allow the writer to evict it. This prevents writers
// being suspended on a block that was just selected for eviction.
2008-06-17 17:05:19 +00:00
int toku_cachetable_maybe_get_and_pin ( CACHEFILE cachefile , CACHEKEY key , u_int32_t fullhash , void * * value ) {
2007-07-13 19:37:47 +00:00
CACHETABLE t = cachefile - > cachetable ;
PAIR p ;
2008-06-14 01:38:53 +00:00
int count = 0 ;
2008-08-25 16:25:30 +00:00
cachetable_lock ( t ) ;
2008-06-17 17:05:19 +00:00
for ( p = t - > table [ fullhash & ( t - > table_size - 1 ) ] ; p ; p = p - > hash_chain ) {
2008-06-14 01:38:53 +00:00
count + + ;
2013-04-16 23:57:18 -04:00
if ( p - > key . b = = key . b & & p - > cachefile = = cachefile & & ! p - > writing ) {
2007-07-13 19:37:47 +00:00
* value = p - > value ;
2013-04-16 23:57:17 -04:00
ctpair_read_lock ( & p - > rwlock , & t - > mutex ) ;
2007-07-13 19:37:47 +00:00
lru_touch ( t , p ) ;
2008-08-25 16:25:30 +00:00
cachetable_unlock ( t ) ;
2013-04-16 23:57:17 -04:00
note_hash_count ( count ) ;
2007-09-12 18:12:31 +00:00
//printf("%s:%d cachetable_maybe_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value);
2007-07-13 19:37:47 +00:00
return 0 ;
}
}
2008-08-25 16:25:30 +00:00
cachetable_unlock ( t ) ;
2008-06-14 01:38:53 +00:00
note_hash_count ( count ) ;
2007-07-13 19:37:47 +00:00
return - 1 ;
}
2008-06-17 17:05:19 +00:00
int toku_cachetable_unpin ( CACHEFILE cachefile , CACHEKEY key , u_int32_t fullhash , int dirty , long size ) {
2007-07-13 19:37:47 +00:00
CACHETABLE t = cachefile - > cachetable ;
PAIR p ;
2007-07-20 14:20:58 +00:00
WHEN_TRACE_CT ( printf ( " %s:%d unpin(%lld) " , __FILE__ , __LINE__ , key ) ) ;
2007-08-01 16:01:52 +00:00
//printf("%s:%d is dirty now=%d\n", __FILE__, __LINE__, dirty);
2008-06-14 01:38:53 +00:00
int count = 0 ;
2008-06-17 17:05:19 +00:00
//assert(fullhash == toku_cachetable_hash(cachefile, key));
2008-08-25 16:25:30 +00:00
cachetable_lock ( t ) ;
2008-06-17 17:05:19 +00:00
for ( p = t - > table [ fullhash & ( t - > table_size - 1 ) ] ; p ; p = p - > hash_chain ) {
2008-06-14 01:38:53 +00:00
count + + ;
2013-04-16 23:57:18 -04:00
if ( p - > key . b = = key . b & & p - > cachefile = = cachefile ) {
2013-04-16 23:57:17 -04:00
assert ( p - > rwlock . pinned > 0 ) ;
ctpair_read_unlock ( & p - > rwlock ) ;
2013-04-16 23:57:20 -04:00
if ( dirty ) p - > dirty = TRUE ;
2007-09-21 17:55:49 +00:00
if ( size ! = 0 ) {
2013-04-16 23:57:17 -04:00
t - > size_current - = p - > size ; if ( p - > writing ) t - > size_writing - = p - > size ;
2007-09-21 17:55:49 +00:00
p - > size = size ;
2013-04-16 23:57:17 -04:00
t - > size_current + = p - > size ; if ( p - > writing ) t - > size_writing + = p - > size ;
2007-09-21 17:55:49 +00:00
}
2007-07-20 14:20:58 +00:00
WHEN_TRACE_CT ( printf ( " [count=%lld] \n " , p - > pinned ) ) ;
2008-05-27 21:08:31 +00:00
{
int r ;
2008-08-25 16:25:30 +00:00
if ( ( r = maybe_flush_some ( t , 0 ) ) ) {
2013-04-16 23:57:17 -04:00
cachetable_unlock ( t ) ;
return r ;
2008-08-25 16:25:30 +00:00
}
2008-05-27 21:08:31 +00:00
}
2008-08-25 16:25:30 +00:00
cachetable_unlock ( t ) ;
2013-04-16 23:57:17 -04:00
note_hash_count ( count ) ;
2007-07-13 19:37:47 +00:00
return 0 ;
}
}
2008-08-25 16:25:30 +00:00
cachetable_unlock ( t ) ;
2008-06-14 01:38:53 +00:00
note_hash_count ( count ) ;
2008-08-22 17:50:05 +00:00
return - 1 ;
2007-07-13 19:37:47 +00:00
}
2007-11-14 17:58:38 +00:00
// effect: Move an object from one key to another key.
// requires: The object is pinned in the table
2007-11-19 23:47:44 +00:00
int toku_cachetable_rename ( CACHEFILE cachefile , CACHEKEY oldkey , CACHEKEY newkey ) {
2013-04-16 23:57:17 -04:00
CACHETABLE t = cachefile - > cachetable ;
PAIR * ptr_to_p , p ;
int count = 0 ;
u_int32_t fullhash = toku_cachetable_hash ( cachefile , oldkey ) ;
cachetable_lock ( t ) ;
for ( ptr_to_p = & t - > table [ fullhash & ( t - > table_size - 1 ) ] , p = * ptr_to_p ;
p ;
ptr_to_p = & p - > hash_chain , p = * ptr_to_p ) {
count + + ;
2013-04-16 23:57:18 -04:00
if ( p - > key . b = = oldkey . b & & p - > cachefile = = cachefile ) {
2013-04-16 23:57:17 -04:00
note_hash_count ( count ) ;
* ptr_to_p = p - > hash_chain ;
p - > key = newkey ;
u_int32_t new_fullhash = toku_cachetable_hash ( cachefile , newkey ) ;
u_int32_t nh = new_fullhash & ( t - > table_size - 1 ) ;
p - > fullhash = new_fullhash ;
p - > hash_chain = t - > table [ nh ] ;
t - > table [ nh ] = p ;
cachetable_unlock ( t ) ;
return 0 ;
}
}
cachetable_unlock ( t ) ;
note_hash_count ( count ) ;
return - 1 ;
2007-07-13 19:37:47 +00:00
}
2007-11-19 23:47:44 +00:00
void toku_cachefile_verify ( CACHEFILE cf ) {
toku_cachetable_verify ( cf - > cachetable ) ;
2007-10-17 22:10:47 +00:00
}
2007-11-19 23:47:44 +00:00
void toku_cachetable_verify ( CACHETABLE t ) {
2013-04-16 23:57:17 -04:00
cachetable_lock ( t ) ;
2007-10-17 22:10:47 +00:00
// First clear all the verify flags by going through the hash chains
{
2008-06-14 01:38:53 +00:00
u_int32_t i ;
2007-10-17 22:10:47 +00:00
for ( i = 0 ; i < t - > table_size ; i + + ) {
PAIR p ;
for ( p = t - > table [ i ] ; p ; p = p - > hash_chain ) {
p - > verify_flag = 0 ;
}
}
}
// Now go through the LRU chain, make sure everything in the LRU chain is hashed, and set the verify flag.
{
PAIR p ;
for ( p = t - > head ; p ; p = p - > next ) {
assert ( p - > verify_flag = = 0 ) ;
PAIR p2 ;
2008-06-17 17:05:19 +00:00
u_int32_t fullhash = p - > fullhash ;
//assert(fullhash==toku_cachetable_hash(p->cachefile, p->key));
for ( p2 = t - > table [ fullhash & ( t - > table_size - 1 ) ] ; p2 ; p2 = p2 - > hash_chain ) {
2007-10-17 22:10:47 +00:00
if ( p2 = = p ) {
/* found it */
goto next ;
}
}
fprintf ( stderr , " Something in the LRU chain is not hashed \n " ) ;
assert ( 0 ) ;
next :
p - > verify_flag = 1 ;
}
}
// Now make sure everything in the hash chains has the verify_flag set to 1.
{
2008-06-14 01:38:53 +00:00
u_int32_t i ;
2007-10-17 22:10:47 +00:00
for ( i = 0 ; i < t - > table_size ; i + + ) {
PAIR p ;
for ( p = t - > table [ i ] ; p ; p = p - > hash_chain ) {
assert ( p - > verify_flag ) ;
}
}
}
2013-04-16 23:57:17 -04:00
cachetable_unlock ( t ) ;
2007-10-17 22:10:47 +00:00
}
2013-04-16 23:57:17 -04:00
static void assert_cachefile_is_flushed_and_removed ( CACHETABLE t , CACHEFILE cf ) {
2008-06-14 01:38:53 +00:00
u_int32_t i ;
2007-09-12 18:12:31 +00:00
// Check it two ways
// First way: Look through all the hash chains
for ( i = 0 ; i < t - > table_size ; i + + ) {
PAIR p ;
for ( p = t - > table [ i ] ; p ; p = p - > hash_chain ) {
assert ( p - > cachefile ! = cf ) ;
}
}
// Second way: Look through the LRU list.
{
PAIR p ;
for ( p = t - > head ; p ; p = p - > next ) {
assert ( p - > cachefile ! = cf ) ;
}
}
}
2013-04-16 23:57:18 -04:00
// Write all of the pairs associated with a cachefile to storage. Maybe remove
// these pairs from the cachetable after they have been written.
2007-09-12 18:12:31 +00:00
2013-04-16 23:57:18 -04:00
static int cachefile_write_maybe_remove ( CACHETABLE ct , CACHEFILE cf , BOOL do_remove ) {
2013-04-16 23:57:17 -04:00
unsigned nfound = 0 ;
struct writequeue cq ;
writequeue_init ( & cq ) ;
unsigned i ;
for ( i = 0 ; i < ct - > table_size ; i + + ) {
2007-07-13 19:37:47 +00:00
PAIR p ;
2013-04-16 23:57:18 -04:00
for ( p = ct - > table [ i ] ; p ; p = p - > hash_chain ) {
2013-04-16 23:57:17 -04:00
if ( cf = = 0 | | p - > cachefile = = cf ) {
nfound + + ;
p - > cq = & cq ;
if ( ! p - > writing )
flush_and_remove ( ct , p , 1 ) ;
2007-07-13 19:37:47 +00:00
}
}
}
2013-04-16 23:57:17 -04:00
for ( i = 0 ; i < nfound ; i + + ) {
PAIR p = 0 ;
int r = writequeue_deq ( & cq , & ct - > mutex , & p ) ; assert ( r = = 0 ) ;
cachetable_complete_write_pair ( ct , p , do_remove ) ;
}
writequeue_destroy ( & cq ) ;
if ( do_remove )
assert_cachefile_is_flushed_and_removed ( ct , cf ) ;
2007-10-05 14:46:49 +00:00
2013-04-16 23:57:17 -04:00
if ( ( 4 * ct - > n_in_table < ct - > table_size ) & & ( ct - > table_size > 4 ) )
cachetable_rehash ( ct , ct - > table_size / 2 ) ;
2007-10-05 14:46:49 +00:00
2007-07-13 19:37:47 +00:00
return 0 ;
}
/* Require that it all be flushed. */
2007-11-19 23:47:44 +00:00
int toku_cachetable_close ( CACHETABLE * tp ) {
2007-08-01 02:37:21 +00:00
CACHETABLE t = * tp ;
2007-07-13 19:37:47 +00:00
int r ;
2013-04-16 23:57:17 -04:00
cachetable_lock ( t ) ;
2013-04-16 23:57:18 -04:00
if ( ( r = cachefile_write_maybe_remove ( t , 0 , TRUE ) ) ) {
2013-04-16 23:57:17 -04:00
cachetable_unlock ( t ) ;
return r ;
}
u_int32_t i ;
2007-07-13 19:37:47 +00:00
for ( i = 0 ; i < t - > table_size ; i + + ) {
if ( t - > table [ i ] ) return - 1 ;
}
2013-04-16 23:57:17 -04:00
assert ( t - > size_writing = = 0 ) ;
writequeue_set_closed ( & t - > wq ) ;
cachetable_unlock ( t ) ;
threadpool_destroy ( & t - > threadpool ) ;
writequeue_destroy ( & t - > wq ) ;
2013-04-16 23:57:27 -04:00
r = toku_pthread_mutex_destroy ( & t - > mutex ) ; assert ( r = = 0 ) ;
2007-07-20 18:00:14 +00:00
toku_free ( t - > table ) ;
toku_free ( t ) ;
2007-08-01 02:37:21 +00:00
* tp = 0 ;
2007-07-13 19:37:47 +00:00
return 0 ;
}
2013-04-16 23:57:24 -04:00
int toku_cachetable_unpin_and_remove ( CACHEFILE cachefile , CACHEKEY key ) {
2013-04-16 23:57:24 -04:00
int r = ENOENT ;
// Removing something already present is OK.
2007-07-13 19:37:47 +00:00
CACHETABLE t = cachefile - > cachetable ;
PAIR p ;
2008-06-14 01:38:53 +00:00
int count = 0 ;
2013-04-16 23:57:17 -04:00
cachetable_lock ( t ) ;
2013-04-16 23:57:24 -04:00
u_int32_t fullhash = toku_cachetable_hash ( cachefile , key ) ;
2008-06-17 17:05:19 +00:00
for ( p = t - > table [ fullhash & ( t - > table_size - 1 ) ] ; p ; p = p - > hash_chain ) {
2013-04-16 23:57:24 -04:00
count + + ;
2013-04-16 23:57:18 -04:00
if ( p - > key . b = = key . b & & p - > cachefile = = cachefile ) {
2013-04-16 23:57:24 -04:00
p - > dirty = 0 ; // clear the dirty bit. We're just supposed to remove it.
assert ( p - > rwlock . pinned = = 1 ) ;
ctpair_read_unlock ( & p - > rwlock ) ;
2013-04-16 23:57:24 -04:00
struct writequeue cq ;
writequeue_init ( & cq ) ;
p - > cq = & cq ;
if ( ! p - > writing )
flush_and_remove ( t , p , 0 ) ;
PAIR pp = 0 ;
r = writequeue_deq ( & cq , & t - > mutex , & pp ) ;
assert ( r = = 0 & & pp = = p ) ;
cachetable_complete_write_pair ( t , p , TRUE ) ;
writequeue_destroy ( & cq ) ;
r = 0 ;
2007-10-17 22:10:47 +00:00
goto done ;
2007-07-13 19:37:47 +00:00
}
}
2007-10-17 22:10:47 +00:00
done :
2013-04-16 23:57:17 -04:00
cachetable_unlock ( t ) ;
2008-06-14 01:38:53 +00:00
note_hash_count ( count ) ;
2013-04-16 23:57:24 -04:00
return r ;
2007-07-13 19:37:47 +00:00
}
2007-11-14 17:58:38 +00:00
#if 0
static void flush_and_keep ( PAIR flush_me ) {
if ( flush_me - > dirty ) {
WHEN_TRACE_CT ( printf ( " %s:%d CT flush_callback(%lld, %p, dirty=1, 0) \n " , __FILE__ , __LINE__ , flush_me - > key , flush_me - > value ) ) ;
flush_me - > flush_callback ( flush_me - > cachefile , flush_me - > key , flush_me - > value , flush_me - > size , 1 , 1 ) ;
flush_me - > dirty = 0 ;
}
}
2007-07-13 19:37:47 +00:00
static int cachetable_fsync_pairs ( CACHETABLE t , PAIR p ) {
if ( p ) {
int r = cachetable_fsync_pairs ( t , p - > hash_chain ) ;
if ( r ! = 0 ) return r ;
flush_and_keep ( p ) ;
}
return 0 ;
}
int cachetable_fsync ( CACHETABLE t ) {
int i ;
int r ;
for ( i = 0 ; i < t - > table_size ; i + + ) {
r = cachetable_fsync_pairs ( t , t - > table [ i ] ) ;
if ( r ! = 0 ) return r ;
}
return 0 ;
}
2007-11-14 17:58:38 +00:00
# endif
2007-07-13 19:37:47 +00:00
#if 0
2013-04-16 23:57:27 -04:00
int cachefile_pwrite ( CACHEFILE cf , const void * buf , size_t count , toku_off_t offset ) {
2007-07-13 19:37:47 +00:00
ssize_t r = pwrite ( cf - > fd , buf , count , offset ) ;
if ( r = = - 1 ) return errno ;
assert ( ( size_t ) r = = count ) ;
return 0 ;
}
2013-04-16 23:57:27 -04:00
int cachefile_pread ( CACHEFILE cf , void * buf , size_t count , toku_off_t offset ) {
2007-07-13 19:37:47 +00:00
ssize_t r = pread ( cf - > fd , buf , count , offset ) ;
if ( r = = - 1 ) return errno ;
if ( r = = 0 ) return - 1 ; /* No error for EOF ??? */
assert ( ( size_t ) r = = count ) ;
return 0 ;
}
# endif
2013-04-16 23:57:17 -04:00
int toku_cachetable_checkpoint ( CACHETABLE ct ) {
// Requires: Everything is unpinned. (In the multithreaded version we have to wait for things to get unpinned and then
// grab them (or else the unpinner has to do something.)
// Algorithm: Write a checkpoint record to the log, noting the LSN of that record.
// Note the LSN of the previous checkpoint (stored in lsn_of_checkpoint)
// For every (unpinnned) dirty node in which the LSN is newer than the prev checkpoint LSN:
// flush the node (giving it a new nodeid, and fixing up the downpointer in the parent)
// Watch out since evicting the node modifies the hash table.
2013-04-16 23:57:18 -04:00
//?? This is a skeleton. It compiles, but doesn't do anything reasonable yet.
//?? log_the_checkpoint();
2013-04-16 23:57:17 -04:00
struct writequeue cq ;
writequeue_init ( & cq ) ;
2013-04-16 23:57:18 -04:00
2013-04-16 23:57:17 -04:00
cachetable_lock ( ct ) ;
2013-04-16 23:57:18 -04:00
// set the checkpoint in progress flag. if already set then just return.
if ( ! ct - > checkpointing ) {
ct - > checkpointing = 1 ;
unsigned nfound = 0 ;
unsigned i ;
for ( i = 0 ; i < ct - > table_size ; i + + ) {
PAIR p ;
for ( p = ct - > table [ i ] ; p ; p = p - > hash_chain ) {
// p->dirty && p->modified_lsn.lsn>ct->lsn_of_checkpoint.lsn
if ( 1 ) {
nfound + + ;
p - > cq = & cq ;
if ( ! p - > writing )
flush_and_remove ( ct , p , 1 ) ;
}
}
}
for ( i = 0 ; i < nfound ; i + + ) {
PAIR p = 0 ;
int r = writequeue_deq ( & cq , & ct - > mutex , & p ) ; assert ( r = = 0 ) ;
cachetable_complete_write_pair ( ct , p , FALSE ) ;
}
2013-04-16 23:57:29 -04:00
{
CACHEFILE cf ;
for ( cf = ct - > cachefiles ; cf ; cf = cf - > next ) {
if ( cf - > checkpoint_userdata ) {
int r = cf - > checkpoint_userdata ( cf , cf - > userdata ) ;
assert ( r = = 0 ) ;
}
}
}
2013-04-16 23:57:18 -04:00
ct - > checkpointing = 0 ; // clear the checkpoint in progress flag
2013-04-16 23:57:17 -04:00
}
2013-04-16 23:57:18 -04:00
2013-04-16 23:57:17 -04:00
cachetable_unlock ( ct ) ;
writequeue_destroy ( & cq ) ;
return 0 ;
}
TOKULOGGER toku_cachefile_logger ( CACHEFILE cf ) {
return cf - > cachetable - > logger ;
}
FILENUM toku_cachefile_filenum ( CACHEFILE cf ) {
return cf - > filenum ;
}
# if DO_WRITER_THREAD
// The writer thread waits for work in the write queue and writes the pair
static void * cachetable_writer ( void * arg ) {
2013-04-16 23:57:27 -04:00
// printf("%lu:%s:start %p\n", toku_pthread_self(), __FUNCTION__, arg);
2013-04-16 23:57:17 -04:00
CACHETABLE ct = arg ;
int r ;
cachetable_lock ( ct ) ;
while ( 1 ) {
PAIR p = 0 ;
r = writequeue_deq ( & ct - > wq , & ct - > mutex , & p ) ;
if ( r ! = 0 )
break ;
cachetable_write_pair ( ct , p ) ;
}
cachetable_unlock ( ct ) ;
2013-04-16 23:57:27 -04:00
// printf("%lu:%s:exit %p\n", toku_pthread_self(), __FUNCTION__, arg);
2013-04-16 23:57:17 -04:00
return arg ;
}
# endif
// debug functions
2007-09-21 17:55:49 +00:00
2013-04-16 23:57:18 -04:00
int toku_cachetable_assert_all_unpinned ( CACHETABLE t ) {
u_int32_t i ;
int some_pinned = 0 ;
cachetable_lock ( t ) ;
for ( i = 0 ; i < t - > table_size ; i + + ) {
PAIR p ;
for ( p = t - > table [ i ] ; p ; p = p - > hash_chain ) {
assert ( ctpair_pinned ( & p - > rwlock ) > = 0 ) ;
if ( ctpair_pinned ( & p - > rwlock ) ) {
2013-04-16 23:57:25 -04:00
//printf("%s:%d pinned: %"PRId64" (%p)\n", __FILE__, __LINE__, p->key.b, p->value);
2013-04-16 23:57:18 -04:00
some_pinned = 1 ;
}
}
}
cachetable_unlock ( t ) ;
return some_pinned ;
}
int toku_cachefile_count_pinned ( CACHEFILE cf , int print_them ) {
u_int32_t i ;
int n_pinned = 0 ;
CACHETABLE t = cf - > cachetable ;
cachetable_lock ( t ) ;
for ( i = 0 ; i < t - > table_size ; i + + ) {
PAIR p ;
for ( p = t - > table [ i ] ; p ; p = p - > hash_chain ) {
assert ( ctpair_pinned ( & p - > rwlock ) > = 0 ) ;
if ( ctpair_pinned ( & p - > rwlock ) & & ( cf = = 0 | | p - > cachefile = = cf ) ) {
if ( print_them ) printf ( " %s:%d pinned: % " PRId64 " (%p) \n " , __FILE__ , __LINE__ , p - > key . b , p - > value ) ;
n_pinned + + ;
}
}
}
cachetable_unlock ( t ) ;
return n_pinned ;
}
2007-11-19 23:47:44 +00:00
void toku_cachetable_print_state ( CACHETABLE ct ) {
2013-04-16 23:57:18 -04:00
u_int32_t i ;
cachetable_lock ( ct ) ;
for ( i = 0 ; i < ct - > table_size ; i + + ) {
PAIR p = ct - > table [ i ] ;
if ( p ! = 0 ) {
2013-04-16 23:57:20 -04:00
printf ( " t[%u]= " , i ) ;
2013-04-16 23:57:18 -04:00
for ( p = ct - > table [ i ] ; p ; p = p - > hash_chain ) {
printf ( " {% " PRId64 " , %p, dirty=%d, pin=%d, size=%ld} " , p - > key . b , p - > cachefile , p - > dirty , p - > rwlock . pinned , p - > size ) ;
}
printf ( " \n " ) ;
}
}
cachetable_unlock ( ct ) ;
}
2007-09-21 17:55:49 +00:00
2007-11-19 23:47:44 +00:00
void toku_cachetable_get_state ( CACHETABLE ct , int * num_entries_ptr , int * hash_size_ptr , long * size_current_ptr , long * size_limit_ptr ) {
2013-04-16 23:57:17 -04:00
cachetable_lock ( ct ) ;
2007-09-21 17:55:49 +00:00
if ( num_entries_ptr )
* num_entries_ptr = ct - > n_in_table ;
if ( hash_size_ptr )
* hash_size_ptr = ct - > table_size ;
if ( size_current_ptr )
* size_current_ptr = ct - > size_current ;
if ( size_limit_ptr )
* size_limit_ptr = ct - > size_limit ;
2013-04-16 23:57:17 -04:00
cachetable_unlock ( ct ) ;
2007-09-21 17:55:49 +00:00
}
2008-06-05 22:09:59 +00:00
int toku_cachetable_get_key_state ( CACHETABLE ct , CACHEKEY key , CACHEFILE cf , void * * value_ptr ,
2007-11-19 23:47:44 +00:00
int * dirty_ptr , long long * pin_ptr , long * size_ptr ) {
2007-09-17 16:23:05 +00:00
PAIR p ;
2008-06-14 01:38:53 +00:00
int count = 0 ;
2013-04-16 23:57:17 -04:00
int r = - 1 ;
2008-06-17 17:05:19 +00:00
u_int32_t fullhash = toku_cachetable_hash ( cf , key ) ;
2013-04-16 23:57:17 -04:00
cachetable_lock ( ct ) ;
2008-06-17 17:05:19 +00:00
for ( p = ct - > table [ fullhash & ( ct - > table_size - 1 ) ] ; p ; p = p - > hash_chain ) {
2008-06-14 01:38:53 +00:00
count + + ;
2013-04-16 23:57:18 -04:00
if ( p - > key . b = = key . b & & p - > cachefile = = cf ) {
note_hash_count ( count ) ;
2007-09-21 17:55:49 +00:00
if ( value_ptr )
* value_ptr = p - > value ;
if ( dirty_ptr )
* dirty_ptr = p - > dirty ;
if ( pin_ptr )
2013-04-16 23:57:17 -04:00
* pin_ptr = p - > rwlock . pinned ;
2007-09-21 17:55:49 +00:00
if ( size_ptr )
* size_ptr = p - > size ;
2013-04-16 23:57:17 -04:00
r = 0 ;
break ;
2007-09-17 16:23:05 +00:00
}
}
2013-04-16 23:57:17 -04:00
cachetable_unlock ( ct ) ;
2008-06-14 01:38:53 +00:00
note_hash_count ( count ) ;
2013-04-16 23:57:17 -04:00
return r ;
2008-06-17 17:05:19 +00:00
}
2013-04-16 23:57:18 -04:00
2013-04-16 23:57:29 -04:00
void
toku_cachefile_set_userdata ( CACHEFILE cf ,
void * userdata ,
int ( * close_userdata ) ( CACHEFILE , void * ) ,
int ( * checkpoint_userdata ) ( CACHEFILE , void * ) )
{
2013-04-16 23:57:18 -04:00
cf - > userdata = userdata ;
cf - > close_userdata = close_userdata ;
2013-04-16 23:57:29 -04:00
cf - > checkpoint_userdata = checkpoint_userdata ;
2013-04-16 23:57:18 -04:00
}
2013-04-16 23:57:24 -04:00
2013-04-16 23:57:18 -04:00
void * toku_cachefile_get_userdata ( CACHEFILE cf ) {
return cf - > userdata ;
}
2013-04-16 23:57:27 -04:00
int toku_cachefile_redirect_nullfd ( CACHEFILE cf ) {
int null_fd ;
struct fileid fileid ;
null_fd = open ( DEV_NULL_FILE , O_WRONLY + O_BINARY ) ;
assert ( null_fd > = 0 ) ;
2013-04-16 23:57:28 -04:00
toku_os_get_unique_file_id ( null_fd , & fileid ) ;
2013-04-16 23:57:27 -04:00
close ( cf - > fd ) ;
cf - > fd = null_fd ;
if ( cf - > fname ) {
toku_free ( cf - > fname ) ;
cf - > fname = 0 ;
}
cachefile_init_filenum ( cf , null_fd , NULL , fileid ) ;
return 0 ;
}