2007-11-29 14:18:54 +00:00
/* -*- mode: C; c-basic-offset: 4 -*- */
2008-01-24 15:10:32 +00:00
# ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
2007-09-17 16:23:05 +00:00
2013-04-16 23:57:38 -04:00
# include <stdlib.h>
# include <string.h>
# include <malloc.h>
2013-04-16 23:57:39 -04:00
# include <time.h>
2013-04-16 23:57:38 -04:00
# include "toku_portability.h"
# include "memory.h"
# include "workqueue.h"
# include "threadpool.h"
# include "cachetable.h"
# include "cachetable-rwlock.h"
# include "toku_worker.h"
2013-04-16 23:57:47 -04:00
# include "log_header.h"
2013-04-16 23:57:38 -04:00
2013-04-16 23:57:41 -04:00
# if !defined(TOKU_CACHETABLE_DO_EVICT_FROM_WRITER)
# error
# endif
2013-04-16 23:57:38 -04:00
// use worker threads 0->no 1->yes
# define DO_WORKER_THREAD 1
# if DO_WORKER_THREAD
static void cachetable_writer ( WORKITEM ) ;
static void cachetable_reader ( WORKITEM ) ;
2013-04-16 23:57:17 -04:00
# endif
2008-08-25 16:25:30 +00:00
2013-04-16 23:57:17 -04:00
// use cachetable locks 0->no 1->yes
2013-04-16 23:57:17 -04:00
# define DO_CACHETABLE_LOCK 1
2007-07-13 19:37:47 +00:00
2013-04-16 23:57:17 -04:00
// simulate long latency write operations with usleep. time in milliseconds.
# define DO_CALLBACK_USLEEP 0
# define DO_CALLBACK_BUSYWAIT 0
2013-04-16 23:57:38 -04:00
# define TRACE_CACHETABLE 0
# if TRACE_CACHETABLE
2007-07-13 19:37:47 +00:00
# define WHEN_TRACE_CT(x) x
# else
# define WHEN_TRACE_CT(x) ((void)0)
# endif
2013-04-16 23:57:42 -04:00
# define TOKU_DO_WAIT_TIME 0
2013-04-16 23:57:41 -04:00
static u_int64_t cachetable_hit ;
static u_int64_t cachetable_miss ;
static u_int64_t cachetable_wait_reading ;
static u_int64_t cachetable_wait ;
2013-04-16 23:57:42 -04:00
# if TOKU_DO_WAIT_TIME
static u_int64_t cachetable_miss_time ;
static u_int64_t cachetable_wait_time ;
# endif
2013-04-16 23:57:41 -04:00
2013-04-16 23:57:38 -04:00
enum ctpair_state {
2013-04-16 23:57:38 -04:00
CTPAIR_INVALID = 0 , // invalid
CTPAIR_IDLE = 1 , // in memory
CTPAIR_READING = 2 , // being read into memory
CTPAIR_WRITING = 3 , // being written from memory
2013-04-16 23:57:38 -04:00
} ;
2007-07-13 19:37:47 +00:00
typedef struct ctpair * PAIR ;
struct ctpair {
2007-08-01 16:01:52 +00:00
enum typ_tag tag ;
2013-04-16 23:57:38 -04:00
CACHEFILE cachefile ;
2007-07-13 19:37:47 +00:00
CACHEKEY key ;
void * value ;
2013-04-16 23:57:17 -04:00
long size ;
2013-04-16 23:57:38 -04:00
enum ctpair_state state ;
2013-04-16 23:57:38 -04:00
enum cachetable_dirty dirty ;
2013-04-16 23:57:41 -04:00
char verify_flag ; // Used in verify_cachetable()
BOOL write_me ; // write_pair
BOOL remove_me ; // write_pair
2013-04-16 23:57:38 -04:00
u_int32_t fullhash ;
2013-04-16 23:57:17 -04:00
CACHETABLE_FLUSH_CALLBACK flush_callback ;
CACHETABLE_FETCH_CALLBACK fetch_callback ;
2007-11-14 17:58:38 +00:00
void * extraargs ;
2013-04-16 23:57:38 -04:00
2013-04-16 23:57:41 -04:00
PAIR next , prev ; // In LRU list.
2013-04-16 23:57:38 -04:00
PAIR hash_chain ;
2013-04-16 23:57:17 -04:00
LSN modified_lsn ; // What was the LSN when modified (undefined if not dirty)
LSN written_lsn ; // What was the LSN when written (we need to get this information when we fetch)
2013-04-16 23:57:47 -04:00
BOOL checkpoint_pending ; // If this is on, then we have got to write the pair out to disk before modifying it.
PAIR checkpoint_list ;
2013-04-16 23:57:38 -04:00
struct ctpair_rwlock rwlock ; // multiple get's, single writer
struct workqueue * cq ; // writers sometimes return ctpair's using this queue
struct workitem asyncwork ; // work item for the worker threads
2007-07-13 19:37:47 +00:00
} ;
2013-04-16 23:57:38 -04:00
static void * const zero_value = 0 ;
static int const zero_size = 0 ;
2013-04-16 23:57:17 -04:00
static inline void ctpair_destroy ( PAIR p ) {
ctpair_rwlock_destroy ( & p - > rwlock ) ;
toku_free ( p ) ;
}
2007-11-14 17:58:38 +00:00
// The cachetable is as close to an ENV as we get.
2007-07-13 19:37:47 +00:00
struct cachetable {
enum typ_tag tag ;
2013-04-16 23:57:38 -04:00
u_int32_t n_in_table ; // number of pairs in the hash table
u_int32_t table_size ; // number of buckets in the hash table
PAIR * table ; // hash table
PAIR head , tail ; // of LRU list. head is the most recently used. tail is least recently used.
CACHEFILE cachefiles ; // list of cachefiles that use this cachetable
long size_current ; // the sum of the sizes of the pairs in the cachetable
long size_limit ; // the limit to the sum of the pair sizes
long size_writing ; // the sum of the sizes of the pairs being written
LSN lsn_of_checkpoint ; // the most recent checkpoint in the log.
2007-11-14 17:58:38 +00:00
TOKULOGGER logger ;
2013-04-16 23:57:38 -04:00
toku_pthread_mutex_t * mutex ; // coarse lock that protects the cachetable, the cachefiles, and the pair's
2013-04-16 23:57:38 -04:00
struct workqueue wq ; // async work queue
THREADPOOL threadpool ; // pool of worker threads
char checkpointing ; // checkpoint in progress
2007-07-13 19:37:47 +00:00
} ;
2013-04-16 23:57:38 -04:00
// Lock the cachetable
2008-08-25 16:25:30 +00:00
static inline void cachetable_lock ( CACHETABLE ct __attribute__ ( ( unused ) ) ) {
# if DO_CACHETABLE_LOCK
2013-04-16 23:57:38 -04:00
int r = toku_pthread_mutex_lock ( ct - > mutex ) ; assert ( r = = 0 ) ;
2008-08-25 16:25:30 +00:00
# endif
}
2013-04-16 23:57:38 -04:00
// Unlock the cachetable
2008-08-25 16:25:30 +00:00
static inline void cachetable_unlock ( CACHETABLE ct __attribute__ ( ( unused ) ) ) {
# if DO_CACHETABLE_LOCK
2013-04-16 23:57:38 -04:00
int r = toku_pthread_mutex_unlock ( ct - > mutex ) ; assert ( r = = 0 ) ;
2008-08-25 16:25:30 +00:00
# endif
}
2013-04-16 23:57:44 -04:00
// Wait for cache table space to become available
2013-04-16 23:57:17 -04:00
static inline void cachetable_wait_write ( CACHETABLE ct ) {
while ( 2 * ct - > size_writing > ct - > size_current ) {
2013-04-16 23:57:38 -04:00
workqueue_wait_write ( & ct - > wq , 0 ) ;
2013-04-16 23:57:17 -04:00
}
}
2007-07-13 19:37:47 +00:00
struct cachefile {
CACHEFILE next ;
2008-04-07 01:30:25 +00:00
u_int64_t refcount ; /* CACHEFILEs are shared. Use a refcount to decide when to really close it.
* The reference count is one for every open DB .
* Plus one for every commit / rollback record . ( It would be harder to keep a count for every open transaction ,
* because then we ' d have to figure out if the transaction was already counted . If we simply use a count for
* every record in the transaction , we ' ll be ok . Hence we use a 64 - bit counter to make sure we don ' t run out .
*/
2007-07-13 19:37:47 +00:00
int fd ; /* Bug: If a file is opened read-only, then it is stuck in read-only. If it is opened read-write, then subsequent writers can write to it too. */
CACHETABLE cachetable ;
struct fileid fileid ;
2007-11-14 17:58:38 +00:00
FILENUM filenum ;
2008-04-17 03:11:55 +00:00
char * fname ;
2013-04-16 23:57:18 -04:00
void * userdata ;
2013-04-16 23:57:47 -04:00
int ( * close_userdata ) ( CACHEFILE cf , void * userdata , char * * error_string ) ; // when closing the last reference to a cachefile, first call this function.
int ( * begin_checkpoint_userdata ) ( CACHEFILE cf , LSN lsn_of_checkpoint , void * userdata ) ; // before checkpointing cachefiles call this function.
2013-04-16 23:57:29 -04:00
int ( * checkpoint_userdata ) ( CACHEFILE cf , void * userdata ) ; // when checkpointing a cachefile, call this function.
2013-04-16 23:57:47 -04:00
int ( * end_checkpoint_userdata ) ( CACHEFILE cf , void * userdata ) ; // after checkpointing cachefiles call this function.
2007-07-13 19:37:47 +00:00
} ;
2007-11-19 23:47:44 +00:00
int toku_create_cachetable ( CACHETABLE * result , long size_limit , LSN initial_lsn , TOKULOGGER logger ) {
2013-04-16 23:57:27 -04:00
# if defined __linux__
2008-05-28 01:22:51 +00:00
{
static int did_mallopt = 0 ;
if ( ! did_mallopt ) {
mallopt ( M_MMAP_THRESHOLD , 1024 * 64 ) ; // 64K and larger should be malloced with mmap().
did_mallopt = 1 ;
}
}
2013-04-16 23:57:27 -04:00
# endif
2013-04-16 23:57:38 -04:00
TAGMALLOC ( CACHETABLE , ct ) ;
if ( ct = = 0 ) return ENOMEM ;
ct - > n_in_table = 0 ;
ct - > table_size = 4 ;
MALLOC_N ( ct - > table_size , ct - > table ) ;
assert ( ct - > table ) ;
ct - > head = ct - > tail = 0 ;
2013-04-16 23:57:17 -04:00
u_int32_t i ;
2013-04-16 23:57:38 -04:00
for ( i = 0 ; i < ct - > table_size ; i + + ) {
ct - > table [ i ] = 0 ;
2007-07-13 19:37:47 +00:00
}
2013-04-16 23:57:38 -04:00
ct - > cachefiles = 0 ;
ct - > size_current = 0 ;
ct - > size_limit = size_limit ;
ct - > size_writing = 0 ;
ct - > lsn_of_checkpoint = initial_lsn ;
ct - > logger = logger ;
ct - > checkpointing = 0 ;
toku_init_workers ( & ct - > wq , & ct - > threadpool ) ;
ct - > mutex = workqueue_lock_ref ( & ct - > wq ) ;
* result = ct ;
2007-07-13 19:37:47 +00:00
return 0 ;
}
2008-01-11 22:24:43 +00:00
// What cachefile goes with particular fd?
2013-04-16 23:57:38 -04:00
int toku_cachefile_of_filenum ( CACHETABLE ct , FILENUM filenum , CACHEFILE * cf ) {
2008-01-11 22:24:43 +00:00
CACHEFILE extant ;
2013-04-16 23:57:38 -04:00
for ( extant = ct - > cachefiles ; extant ; extant = extant - > next ) {
2008-02-26 17:47:40 +00:00
if ( extant - > filenum . fileid = = filenum . fileid ) {
* cf = extant ;
return 0 ;
}
2008-01-11 22:24:43 +00:00
}
return ENOENT ;
}
2008-07-21 02:34:13 +00:00
static FILENUM next_filenum_to_use = { 0 } ;
2013-04-16 23:57:38 -04:00
static void cachefile_init_filenum ( CACHEFILE cf , int fd , const char * fname , struct fileid fileid ) \
2008-07-21 02:34:13 +00:00
{
2013-04-16 23:57:38 -04:00
cf - > fd = fd ;
cf - > fileid = fileid ;
cf - > fname = fname ? toku_strdup ( fname ) : 0 ;
2008-07-21 02:34:13 +00:00
}
2008-02-29 20:47:11 +00:00
// If something goes wrong, close the fd. After this, the caller shouldn't close the fd, but instead should close the cachefile.
2013-04-16 23:57:38 -04:00
int toku_cachetable_openfd ( CACHEFILE * cfptr , CACHETABLE ct , int fd , const char * fname ) {
2007-07-13 19:37:47 +00:00
int r ;
CACHEFILE extant ;
struct fileid fileid ;
2013-04-16 23:57:28 -04:00
r = toku_os_get_unique_file_id ( fd , & fileid ) ;
2013-04-16 23:57:27 -04:00
if ( r ! = 0 ) {
r = errno ; close ( fd ) ;
return r ;
}
2013-04-16 23:57:38 -04:00
for ( extant = ct - > cachefiles ; extant ; extant = extant - > next ) {
2007-07-13 19:37:47 +00:00
if ( memcmp ( & extant - > fileid , & fileid , sizeof ( fileid ) ) = = 0 ) {
2007-09-21 17:55:49 +00:00
r = close ( fd ) ;
assert ( r = = 0 ) ;
2007-07-13 19:37:47 +00:00
extant - > refcount + + ;
2013-04-16 23:57:38 -04:00
* cfptr = extant ;
2007-07-13 19:37:47 +00:00
return 0 ;
}
}
2008-04-18 21:30:39 +00:00
try_again :
2013-04-16 23:57:38 -04:00
for ( extant = ct - > cachefiles ; extant ; extant = extant - > next ) {
2008-04-18 21:30:39 +00:00
if ( next_filenum_to_use . fileid = = extant - > filenum . fileid ) {
next_filenum_to_use . fileid + + ;
goto try_again ;
}
}
2007-07-13 19:37:47 +00:00
{
2013-04-16 23:57:39 -04:00
CACHEFILE MALLOC ( newcf ) ;
2013-04-16 23:57:38 -04:00
newcf - > cachetable = ct ;
2008-07-21 17:48:26 +00:00
newcf - > filenum . fileid = next_filenum_to_use . fileid + + ;
2008-07-21 02:34:13 +00:00
cachefile_init_filenum ( newcf , fd , fname , fileid ) ;
newcf - > refcount = 1 ;
2013-04-16 23:57:38 -04:00
newcf - > next = ct - > cachefiles ;
ct - > cachefiles = newcf ;
2013-04-16 23:57:18 -04:00
newcf - > userdata = 0 ;
newcf - > close_userdata = 0 ;
2013-04-16 23:57:29 -04:00
newcf - > checkpoint_userdata = 0 ;
2013-04-16 23:57:47 -04:00
newcf - > begin_checkpoint_userdata = 0 ;
newcf - > end_checkpoint_userdata = 0 ;
2013-04-16 23:57:18 -04:00
2013-04-16 23:57:38 -04:00
* cfptr = newcf ;
2007-07-13 19:37:47 +00:00
return 0 ;
}
}
2013-04-16 23:57:38 -04:00
int toku_cachetable_openf ( CACHEFILE * cfptr , CACHETABLE ct , const char * fname , int flags , mode_t mode ) {
2013-04-16 23:57:27 -04:00
int fd = open ( fname , flags + O_BINARY , mode ) ;
2007-11-18 12:48:36 +00:00
if ( fd < 0 ) return errno ;
2013-04-16 23:57:38 -04:00
return toku_cachetable_openfd ( cfptr , ct , fd , fname ) ;
}
WORKQUEUE toku_cachetable_get_workqueue ( CACHETABLE ct ) {
return & ct - > wq ;
2007-11-18 12:48:36 +00:00
}
2013-04-16 23:57:38 -04:00
void toku_cachefile_get_workqueue_load ( CACHEFILE cf , int * n_in_queue , int * n_threads ) {
CACHETABLE ct = cf - > cachetable ;
* n_in_queue = workqueue_n_in_queue ( & ct - > wq , 1 ) ;
* n_threads = threadpool_get_current_threads ( ct - > threadpool ) ;
}
2008-07-21 02:34:13 +00:00
int toku_cachefile_set_fd ( CACHEFILE cf , int fd , const char * fname ) {
int r ;
2013-04-16 23:57:30 -04:00
struct fileid fileid ;
r = toku_os_get_unique_file_id ( fd , & fileid ) ;
2008-07-21 02:34:13 +00:00
if ( r ! = 0 ) {
r = errno ; close ( fd ) ; return r ;
}
2013-04-16 23:57:38 -04:00
if ( cf - > close_userdata & & ( r = cf - > close_userdata ( cf , cf - > userdata , 0 ) ) ) {
2013-04-16 23:57:18 -04:00
return r ;
}
cf - > close_userdata = NULL ;
2013-04-16 23:57:29 -04:00
cf - > checkpoint_userdata = NULL ;
2013-04-16 23:57:47 -04:00
cf - > begin_checkpoint_userdata = NULL ;
cf - > end_checkpoint_userdata = NULL ;
2013-04-16 23:57:18 -04:00
cf - > userdata = NULL ;
2008-07-21 02:34:13 +00:00
close ( cf - > fd ) ;
cf - > fd = - 1 ;
if ( cf - > fname ) {
toku_free ( cf - > fname ) ;
cf - > fname = 0 ;
}
cachefile_init_filenum ( cf , fd , fname , fileid ) ;
return 0 ;
}
int toku_cachefile_fd ( CACHEFILE cf ) {
return cf - > fd ;
}
2013-04-16 23:57:27 -04:00
int toku_cachefile_truncate0 ( CACHEFILE cf ) {
2013-04-16 23:57:39 -04:00
int r ;
r = ftruncate ( cf - > fd , 0 ) ;
2013-04-16 23:57:27 -04:00
if ( r ! = 0 )
r = errno ;
return r ;
}
2007-11-19 23:47:44 +00:00
static CACHEFILE remove_cf_from_list ( CACHEFILE cf , CACHEFILE list ) {
2007-07-13 19:37:47 +00:00
if ( list = = 0 ) return 0 ;
else if ( list = = cf ) {
return list - > next ;
} else {
list - > next = remove_cf_from_list ( cf , list - > next ) ;
return list ;
}
}
2013-04-16 23:57:38 -04:00
static int cachetable_flush_cachefile ( CACHETABLE , CACHEFILE cf ) ;
2007-07-13 19:37:47 +00:00
2008-04-09 02:45:27 +00:00
// Increment the reference count
void toku_cachefile_refup ( CACHEFILE cf ) {
cf - > refcount + + ;
}
2013-04-16 23:57:38 -04:00
int toku_cachefile_close ( CACHEFILE * cfp , TOKULOGGER logger , char * * error_string ) {
2007-08-01 02:37:21 +00:00
CACHEFILE cf = * cfp ;
2013-04-16 23:57:17 -04:00
CACHETABLE ct = cf - > cachetable ;
cachetable_lock ( ct ) ;
2007-07-13 19:37:47 +00:00
assert ( cf - > refcount > 0 ) ;
cf - > refcount - - ;
if ( cf - > refcount = = 0 ) {
2013-04-16 23:57:38 -04:00
int r ;
if ( ( r = cachetable_flush_cachefile ( ct , cf ) ) ) {
2013-04-16 23:57:47 -04:00
error :
2013-04-16 23:57:44 -04:00
cf - > cachetable - > cachefiles = remove_cf_from_list ( cf , cf - > cachetable - > cachefiles ) ;
if ( cf - > fname ) toku_free ( cf - > fname ) ;
int r2 = close ( cf - > fd ) ;
if ( r2 ! = 0 ) fprintf ( stderr , " %s:%d During error handling, could not close file r=%d errno=%d \n " , __FILE__ , __LINE__ , r2 , errno ) ;
//assert(r == 0);
toku_free ( cf ) ;
2013-04-16 23:57:38 -04:00
cachetable_unlock ( ct ) ;
return r ;
2013-04-16 23:57:44 -04:00
}
if ( cf - > close_userdata & & ( r = cf - > close_userdata ( cf , cf - > userdata , error_string ) ) ) {
2013-04-16 23:57:47 -04:00
goto error ;
2013-04-16 23:57:18 -04:00
}
2013-04-16 23:57:47 -04:00
cf - > close_userdata = NULL ;
2013-04-16 23:57:29 -04:00
cf - > checkpoint_userdata = NULL ;
2013-04-16 23:57:47 -04:00
cf - > begin_checkpoint_userdata = NULL ;
cf - > end_checkpoint_userdata = NULL ;
2013-04-16 23:57:18 -04:00
cf - > userdata = NULL ;
2013-04-16 23:57:17 -04:00
cf - > cachetable - > cachefiles = remove_cf_from_list ( cf , cf - > cachetable - > cachefiles ) ;
cachetable_unlock ( ct ) ;
2013-04-16 23:57:38 -04:00
r = close ( cf - > fd ) ;
assert ( r = = 0 ) ;
2007-09-21 17:55:49 +00:00
cf - > fd = - 1 ;
2008-04-17 03:11:55 +00:00
if ( logger ) {
2013-04-16 23:57:20 -04:00
//assert(cf->fname);
//BYTESTRING bs = {.len=strlen(cf->fname), .data=cf->fname};
//r = toku_log_cfclose(logger, 0, 0, bs, cf->filenum);
2008-04-17 03:11:55 +00:00
}
2013-04-16 23:57:39 -04:00
if ( cf - > fname ) toku_free ( cf - > fname ) ;
2007-07-20 18:00:14 +00:00
toku_free ( cf ) ;
2007-08-01 02:37:21 +00:00
* cfp = 0 ;
2007-07-13 19:37:47 +00:00
return r ;
} else {
2013-04-16 23:57:17 -04:00
cachetable_unlock ( ct ) ;
2007-08-01 02:37:21 +00:00
* cfp = 0 ;
2007-07-13 19:37:47 +00:00
return 0 ;
}
}
2008-07-21 02:34:13 +00:00
int toku_cachefile_flush ( CACHEFILE cf ) {
2013-04-16 23:57:17 -04:00
CACHETABLE ct = cf - > cachetable ;
cachetable_lock ( ct ) ;
2013-04-16 23:57:38 -04:00
int r = cachetable_flush_cachefile ( ct , cf ) ;
2013-04-16 23:57:17 -04:00
cachetable_unlock ( ct ) ;
return r ;
2008-07-21 02:34:13 +00:00
}
2008-06-14 01:38:53 +00:00
// This hash function comes from Jenkins: http://burtleburtle.net/bob/c/lookup3.c
// The idea here is to mix the bits thoroughly so that we don't have to do modulo by a prime number.
// Instead we can use a bitmask on a table of size power of two.
// This hash function does yield improved performance on ./db-benchmark-test-tokudb and ./scanscan
static inline u_int32_t rot ( u_int32_t x , u_int32_t k ) {
return ( x < < k ) | ( x > > ( 32 - k ) ) ;
}
static inline u_int32_t final ( u_int32_t a , u_int32_t b , u_int32_t c ) {
2013-04-16 23:57:17 -04:00
c ^ = b ; c - = rot ( b , 14 ) ;
a ^ = c ; a - = rot ( c , 11 ) ;
b ^ = a ; b - = rot ( a , 25 ) ;
c ^ = b ; c - = rot ( b , 16 ) ;
a ^ = c ; a - = rot ( c , 4 ) ;
b ^ = a ; b - = rot ( a , 14 ) ;
c ^ = b ; c - = rot ( b , 24 ) ;
return c ;
2008-06-14 01:38:53 +00:00
}
2013-04-16 23:57:18 -04:00
u_int32_t toku_cachetable_hash ( CACHEFILE cachefile , BLOCKNUM key )
2008-06-17 17:05:19 +00:00
// Effect: Return a 32-bit hash key. The hash key shall be suitable for using with bitmasking for a table of size power-of-two.
{
2013-04-16 23:57:18 -04:00
return final ( cachefile - > filenum . fileid , ( u_int32_t ) ( key . b > > 32 ) , ( u_int32_t ) key . b ) ;
2008-06-17 17:05:19 +00:00
}
#if 0
2013-04-16 23:57:38 -04:00
static unsigned int hashit ( CACHETABLE ct , CACHEKEY key , CACHEFILE cachefile ) {
assert ( 0 = = ( ct - > table_size & ( ct - > table_size - 1 ) ) ) ; // make sure table is power of two
return ( toku_cachetable_hash ( key , cachefile ) ) & ( ct - > table_size - 1 ) ;
2007-07-13 19:37:47 +00:00
}
2008-06-17 17:05:19 +00:00
# endif
2007-07-13 19:37:47 +00:00
2013-04-16 23:57:38 -04:00
static void cachetable_rehash ( CACHETABLE ct , u_int32_t newtable_size ) {
// printf("rehash %p %d %d %d\n", t, primeindexdelta, ct->n_in_table, ct->table_size);
2007-10-05 14:46:49 +00:00
2008-06-14 01:38:53 +00:00
assert ( newtable_size > = 4 & & ( ( newtable_size & ( newtable_size - 1 ) ) = = 0 ) ) ;
2013-04-16 23:57:38 -04:00
PAIR * newtable = toku_calloc ( newtable_size , sizeof ( * ct - > table ) ) ;
2008-06-14 01:38:53 +00:00
u_int32_t i ;
2007-10-05 14:46:49 +00:00
//printf("%s:%d newtable_size=%d\n", __FILE__, __LINE__, newtable_size);
assert ( newtable ! = 0 ) ;
2013-04-16 23:57:38 -04:00
u_int32_t oldtable_size = ct - > table_size ;
ct - > table_size = newtable_size ;
2007-10-05 14:46:49 +00:00
for ( i = 0 ; i < newtable_size ; i + + ) newtable [ i ] = 0 ;
2008-06-05 22:09:59 +00:00
for ( i = 0 ; i < oldtable_size ; i + + ) {
2007-10-05 14:46:49 +00:00
PAIR p ;
2013-04-16 23:57:38 -04:00
while ( ( p = ct - > table [ i ] ) ! = 0 ) {
2008-06-17 17:05:19 +00:00
unsigned int h = p - > fullhash & ( newtable_size - 1 ) ;
2013-04-16 23:57:38 -04:00
ct - > table [ i ] = p - > hash_chain ;
2007-10-05 14:46:49 +00:00
p - > hash_chain = newtable [ h ] ;
newtable [ h ] = p ;
}
}
2013-04-16 23:57:38 -04:00
toku_free ( ct - > table ) ;
2007-10-05 14:46:49 +00:00
// printf("Freed\n");
2013-04-16 23:57:38 -04:00
ct - > table = newtable ;
2007-10-05 14:46:49 +00:00
//printf("Done growing or shrinking\n");
}
2007-07-13 19:37:47 +00:00
2013-04-16 23:57:38 -04:00
static void lru_remove ( CACHETABLE ct , PAIR p ) {
2007-07-13 19:37:47 +00:00
if ( p - > next ) {
p - > next - > prev = p - > prev ;
} else {
2013-04-16 23:57:38 -04:00
assert ( ct - > tail = = p ) ;
ct - > tail = p - > prev ;
2007-07-13 19:37:47 +00:00
}
if ( p - > prev ) {
p - > prev - > next = p - > next ;
} else {
2013-04-16 23:57:38 -04:00
assert ( ct - > head = = p ) ;
ct - > head = p - > next ;
2007-07-13 19:37:47 +00:00
}
p - > prev = p - > next = 0 ;
}
2013-04-16 23:57:38 -04:00
static void lru_add_to_list ( CACHETABLE ct , PAIR p ) {
2007-07-13 19:37:47 +00:00
// requires that touch_me is not currently in the table.
assert ( p - > prev = = 0 ) ;
p - > prev = 0 ;
2013-04-16 23:57:38 -04:00
p - > next = ct - > head ;
if ( ct - > head ) {
ct - > head - > prev = p ;
2007-07-13 19:37:47 +00:00
} else {
2013-04-16 23:57:38 -04:00
assert ( ! ct - > tail ) ;
ct - > tail = p ;
2007-07-13 19:37:47 +00:00
}
2013-04-16 23:57:38 -04:00
ct - > head = p ;
2007-07-13 19:37:47 +00:00
}
2013-04-16 23:57:38 -04:00
static void lru_touch ( CACHETABLE ct , PAIR p ) {
lru_remove ( ct , p ) ;
lru_add_to_list ( ct , p ) ;
2007-07-13 19:37:47 +00:00
}
static PAIR remove_from_hash_chain ( PAIR remove_me , PAIR list ) {
if ( remove_me = = list ) return list - > hash_chain ;
list - > hash_chain = remove_from_hash_chain ( remove_me , list - > hash_chain ) ;
return list ;
}
2007-11-14 17:58:38 +00:00
// Predicate to determine if a node must be renamed. Nodes are renamed on the time they are written
// after a checkpoint.
// Thus we need to rename it if it is dirty,
// if it has been modified within the current checkpoint regime (hence non-strict inequality)
// and the last time it was written was in a previous checkpoint regime (strict inequality)
2013-04-16 23:57:38 -04:00
2013-04-16 23:57:38 -04:00
static BOOL need_to_rename_p ( CACHETABLE ct , PAIR p ) {
2013-04-16 23:57:20 -04:00
return ( BOOL ) ( p - > dirty
2013-04-16 23:57:38 -04:00
& & p - > modified_lsn . lsn > = ct - > lsn_of_checkpoint . lsn // nonstrict
& & p - > written_lsn . lsn < ct - > lsn_of_checkpoint . lsn ) ; // strict
2007-11-14 17:58:38 +00:00
}
2013-04-16 23:57:17 -04:00
// Remove a pair from the cachetable
// Effects: the pair is removed from the LRU list and from the cachetable's hash table.
// The size of the objects in the cachetable is adjusted by the size of the pair being
// removed.
static void cachetable_remove_pair ( CACHETABLE ct , PAIR p ) {
lru_remove ( ct , p ) ;
assert ( ct - > n_in_table > 0 ) ;
ct - > n_in_table - - ;
2007-07-13 19:37:47 +00:00
// Remove it from the hash chain.
2007-10-17 22:10:47 +00:00
{
2013-04-16 23:57:17 -04:00
unsigned int h = p - > fullhash & ( ct - > table_size - 1 ) ;
ct - > table [ h ] = remove_from_hash_chain ( p , ct - > table [ h ] ) ;
}
ct - > size_current - = p - > size ; assert ( ct - > size_current > = 0 ) ;
}
// Maybe remove a pair from the cachetable and free it, depending on whether
// or not there are any threads interested in the pair. The flush callback
// is called with write_me and keep_me both false, and the pair is destroyed.
static void cachetable_maybe_remove_and_free_pair ( CACHETABLE ct , PAIR p ) {
if ( ctpair_users ( & p - > rwlock ) = = 0 ) {
cachetable_remove_pair ( ct , p ) ;
2013-04-16 23:57:38 -04:00
2013-04-16 23:57:38 -04:00
// helgrind
CACHETABLE_FLUSH_CALLBACK flush_callback = p - > flush_callback ;
CACHEFILE cachefile = p - > cachefile ;
CACHEKEY key = p - > key ;
void * value = p - > value ;
void * extraargs = p - > extraargs ;
long size = p - > size ;
LSN lsn_of_checkpoint = ct - > lsn_of_checkpoint ;
BOOL need_to_rename = need_to_rename_p ( ct , p ) ;
2013-04-16 23:57:17 -04:00
cachetable_unlock ( ct ) ;
2013-04-16 23:57:38 -04:00
2013-04-16 23:57:38 -04:00
flush_callback ( cachefile , key , value , extraargs , size , FALSE , FALSE ,
2013-04-16 23:57:47 -04:00
lsn_of_checkpoint , need_to_rename , TRUE ) ;
2013-04-16 23:57:38 -04:00
2013-04-16 23:57:17 -04:00
cachetable_lock ( ct ) ;
2013-04-16 23:57:38 -04:00
2013-04-16 23:57:35 -04:00
ctpair_destroy ( p ) ;
2007-10-17 22:10:47 +00:00
}
2013-04-16 23:57:17 -04:00
}
2013-04-16 23:57:41 -04:00
static void abort_fetch_pair ( PAIR p ) {
2013-04-16 23:57:38 -04:00
ctpair_write_unlock ( & p - > rwlock ) ;
if ( ctpair_users ( & p - > rwlock ) = = 0 )
ctpair_destroy ( p ) ;
}
2013-04-16 23:57:38 -04:00
// Read a pair from a cachefile into memory using the pair's fetch callback
static int cachetable_fetch_pair ( CACHETABLE ct , CACHEFILE cf , PAIR p ) {
2013-04-16 23:57:38 -04:00
// helgrind
CACHETABLE_FETCH_CALLBACK fetch_callback = p - > fetch_callback ;
CACHEKEY key = p - > key ;
u_int32_t fullhash = p - > fullhash ;
void * extraargs = p - > extraargs ;
2013-04-16 23:57:38 -04:00
void * toku_value = 0 ;
long size = 0 ;
LSN written_lsn = ZERO_LSN ;
WHEN_TRACE_CT ( printf ( " %s:%d CT: fetch_callback(%lld...) \n " , __FILE__ , __LINE__ , key ) ) ;
2013-04-16 23:57:38 -04:00
cachetable_unlock ( ct ) ;
int r = fetch_callback ( cf , key , fullhash , & toku_value , & size , extraargs , & written_lsn ) ;
2013-04-16 23:57:38 -04:00
cachetable_lock ( ct ) ;
if ( r ) {
2013-04-16 23:57:41 -04:00
cachetable_remove_pair ( ct , p ) ;
p - > state = CTPAIR_INVALID ;
2013-04-16 23:57:38 -04:00
if ( p - > cq ) {
workqueue_enq ( p - > cq , & p - > asyncwork , 1 ) ;
return r ;
}
2013-04-16 23:57:41 -04:00
abort_fetch_pair ( p ) ;
2013-04-16 23:57:38 -04:00
return r ;
2013-04-16 23:57:41 -04:00
} else {
lru_touch ( ct , p ) ;
p - > value = toku_value ;
p - > written_lsn = written_lsn ;
p - > size = size ;
ct - > size_current + = size ;
if ( p - > cq ) {
workqueue_enq ( p - > cq , & p - > asyncwork , 1 ) ;
return 0 ;
}
p - > state = CTPAIR_IDLE ;
ctpair_write_unlock ( & p - > rwlock ) ;
if ( 0 ) printf ( " %s:%d % " PRId64 " complete \n " , __FUNCTION__ , __LINE__ , key . b ) ;
2013-04-16 23:57:38 -04:00
return 0 ;
}
2013-04-16 23:57:38 -04:00
}
2013-04-16 23:57:17 -04:00
static void cachetable_complete_write_pair ( CACHETABLE ct , PAIR p , BOOL do_remove ) ;
// Write a pair to storage
// Effects: an exclusive lock on the pair is obtained, the write callback is called,
// the pair dirty state is adjusted, and the write is completed. The write_me boolean
// is true when the pair is dirty and the pair is requested to be written. The keep_me
// boolean is true, so the pair is not yet evicted from the cachetable.
static void cachetable_write_pair ( CACHETABLE ct , PAIR p ) {
2013-04-16 23:57:38 -04:00
// helgrind
CACHETABLE_FLUSH_CALLBACK flush_callback = p - > flush_callback ;
CACHEFILE cachefile = p - > cachefile ;
CACHEKEY key = p - > key ;
void * value = p - > value ;
void * extraargs = p - > extraargs ;
long size = p - > size ;
BOOL dowrite = ( BOOL ) ( p - > dirty & & p - > write_me ) ;
LSN lsn_of_checkpoint = ct - > lsn_of_checkpoint ;
BOOL need_to_rename = need_to_rename_p ( ct , p ) ;
2013-04-16 23:57:47 -04:00
BOOL for_checkpoint = p - > checkpoint_pending ;
//Must set to FALSE before releasing cachetable lock
p - > checkpoint_pending = FALSE ;
2013-04-16 23:57:17 -04:00
cachetable_unlock ( ct ) ;
2013-04-16 23:57:38 -04:00
2013-04-16 23:57:17 -04:00
// write callback
2013-04-16 23:57:47 -04:00
flush_callback ( cachefile , key , value , extraargs , size , dowrite , TRUE , lsn_of_checkpoint , need_to_rename , for_checkpoint ) ;
2013-04-16 23:57:17 -04:00
# if DO_CALLBACK_USLEEP
usleep ( DO_CALLBACK_USLEEP ) ;
# endif
# if DO_CALLBACK_BUSYWAIT
struct timeval tstart ;
gettimeofday ( & tstart , 0 ) ;
long long ltstart = tstart . tv_sec * 1000000 + tstart . tv_usec ;
while ( 1 ) {
struct timeval t ;
gettimeofday ( & t , 0 ) ;
long long lt = t . tv_sec * 1000000 + t . tv_usec ;
if ( lt - ltstart > DO_CALLBACK_BUSYWAIT )
break ;
}
# endif
2013-04-16 23:57:38 -04:00
2013-04-16 23:57:17 -04:00
cachetable_lock ( ct ) ;
// the pair is no longer dirty once written
if ( p - > dirty & & p - > write_me )
2013-04-16 23:57:38 -04:00
p - > dirty = CACHETABLE_CLEAN ;
2013-04-16 23:57:17 -04:00
// stuff it into a completion queue for delayed completion if a completion queue exists
// otherwise complete the write now
if ( p - > cq )
2013-04-16 23:57:38 -04:00
workqueue_enq ( p - > cq , & p - > asyncwork , 1 ) ;
2013-04-16 23:57:17 -04:00
else
2013-04-16 23:57:41 -04:00
cachetable_complete_write_pair ( ct , p , p - > remove_me ) ;
2013-04-16 23:57:17 -04:00
}
// complete the write of a pair by reseting the writing flag, adjusting the write
// pending size, and maybe removing the pair from the cachetable if there are no
// references to it
static void cachetable_complete_write_pair ( CACHETABLE ct , PAIR p , BOOL do_remove ) {
p - > cq = 0 ;
2013-04-16 23:57:38 -04:00
p - > state = CTPAIR_IDLE ;
2013-04-16 23:57:17 -04:00
// maybe wakeup any stalled writers when the pending writes fall below
// 1/8 of the size of the cachetable
ct - > size_writing - = p - > size ;
assert ( ct - > size_writing > = 0 ) ;
if ( 8 * ct - > size_writing < = ct - > size_current )
2013-04-16 23:57:38 -04:00
workqueue_wakeup_write ( & ct - > wq , 0 ) ;
2013-04-16 23:57:17 -04:00
ctpair_write_unlock ( & p - > rwlock ) ;
if ( do_remove )
cachetable_maybe_remove_and_free_pair ( ct , p ) ;
}
// flush and remove a pair from the cachetable. the callbacks are run by a thread in
// a thread pool.
2013-04-16 23:57:38 -04:00
static void flush_and_maybe_remove ( CACHETABLE ct , PAIR p , BOOL write_me ) {
2013-04-16 23:57:44 -04:00
ctpair_write_lock ( & p - > rwlock , ct - > mutex ) ;
2013-04-16 23:57:38 -04:00
p - > state = CTPAIR_WRITING ;
2013-04-16 23:57:17 -04:00
ct - > size_writing + = p - > size ; assert ( ct - > size_writing > = 0 ) ;
2013-04-16 23:57:38 -04:00
p - > write_me = write_me ;
2013-04-16 23:57:41 -04:00
p - > remove_me = TRUE ;
2013-04-16 23:57:38 -04:00
# if DO_WORKER_THREAD
WORKITEM wi = & p - > asyncwork ;
workitem_init ( wi , cachetable_writer , p ) ;
2013-04-16 23:57:44 -04:00
// evictions without a write or unpinned pair's that are clean
2013-04-16 23:57:38 -04:00
// can be run in the current thread
if ( ! p - > write_me | | ( ! ctpair_pinned ( & p - > rwlock ) & & ! p - > dirty ) ) {
2013-04-16 23:57:18 -04:00
cachetable_write_pair ( ct , p ) ;
} else {
2013-04-16 23:57:41 -04:00
# if !TOKU_CACHETABLE_DO_EVICT_FROM_WRITER
p - > remove_me = FALSE ; // run the remove on the main thread
# endif
2013-04-16 23:57:38 -04:00
workqueue_enq ( & ct - > wq , wi , 0 ) ;
2013-04-16 23:57:18 -04:00
}
2013-04-16 23:57:17 -04:00
# else
cachetable_write_pair ( ct , p ) ;
# endif
2007-07-13 19:37:47 +00:00
}
2013-04-16 23:57:38 -04:00
static int maybe_flush_some ( CACHETABLE ct , long size ) {
2007-10-05 14:46:49 +00:00
int r = 0 ;
2007-09-21 17:55:49 +00:00
again :
2013-04-16 23:57:38 -04:00
if ( size + ct - > size_current > ct - > size_limit + ct - > size_writing ) {
2008-05-27 21:08:31 +00:00
{
2013-04-16 23:57:27 -04:00
//unsigned long rss __attribute__((__unused__)) = check_max_rss();
2008-05-27 21:08:31 +00:00
//printf("this-size=%.6fMB projected size = %.2fMB limit=%2.fMB rss=%2.fMB\n", size/(1024.0*1024.0), (size+t->size_current)/(1024.0*1024.0), t->size_limit/(1024.0*1024.0), rss/256.0);
2008-05-28 01:22:51 +00:00
//struct mallinfo m = mallinfo();
//printf(" arena=%d hblks=%d hblkhd=%d\n", m.arena, m.hblks, m.hblkhd);
2008-05-27 21:08:31 +00:00
}
2007-09-21 17:55:49 +00:00
/* Try to remove one. */
2007-07-13 19:37:47 +00:00
PAIR remove_me ;
2013-04-16 23:57:38 -04:00
for ( remove_me = ct - > tail ; remove_me ; remove_me = remove_me - > prev ) {
2013-04-16 23:57:38 -04:00
if ( remove_me - > state = = CTPAIR_IDLE & & ! ctpair_users ( & remove_me - > rwlock ) ) {
flush_and_maybe_remove ( ct , remove_me , TRUE ) ;
2007-07-13 19:37:47 +00:00
goto again ;
}
}
/* All were pinned. */
2007-12-22 23:12:40 +00:00
//printf("All are pinned\n");
2008-04-07 19:52:49 +00:00
return 0 ; // Don't indicate an error code. Instead let memory get overfull.
2007-07-13 19:37:47 +00:00
}
2007-10-05 14:46:49 +00:00
2013-04-16 23:57:38 -04:00
if ( ( 4 * ct - > n_in_table < ct - > table_size ) & & ct - > table_size > 4 )
cachetable_rehash ( ct , ct - > table_size / 2 ) ;
2007-10-05 14:46:49 +00:00
return r ;
2007-07-13 19:37:47 +00:00
}
2013-04-16 23:57:41 -04:00
void toku_cachetable_maybe_flush_some ( CACHETABLE ct ) {
cachetable_lock ( ct ) ;
maybe_flush_some ( ct , 0 ) ;
cachetable_unlock ( ct ) ;
}
2013-04-16 23:57:38 -04:00
static PAIR cachetable_insert_at ( CACHETABLE ct ,
CACHEFILE cachefile , CACHEKEY key , void * value ,
enum ctpair_state state ,
u_int32_t fullhash ,
long size ,
CACHETABLE_FLUSH_CALLBACK flush_callback ,
CACHETABLE_FETCH_CALLBACK fetch_callback ,
void * extraargs ,
2013-04-16 23:57:38 -04:00
enum cachetable_dirty dirty ,
2013-04-16 23:57:38 -04:00
LSN written_lsn ) {
2007-09-17 16:23:05 +00:00
TAGMALLOC ( PAIR , p ) ;
2013-04-16 23:57:38 -04:00
assert ( p ) ;
2013-04-16 23:57:17 -04:00
memset ( p , 0 , sizeof * p ) ;
2013-04-16 23:57:38 -04:00
p - > cachefile = cachefile ;
p - > key = key ;
p - > value = value ;
2008-06-17 17:05:19 +00:00
p - > fullhash = fullhash ;
2013-04-16 23:57:38 -04:00
p - > dirty = dirty ;
2007-09-21 17:55:49 +00:00
p - > size = size ;
2013-04-16 23:57:38 -04:00
p - > state = state ;
2007-09-17 16:23:05 +00:00
p - > flush_callback = flush_callback ;
p - > fetch_callback = fetch_callback ;
p - > extraargs = extraargs ;
2007-11-14 17:58:38 +00:00
p - > modified_lsn . lsn = 0 ;
p - > written_lsn = written_lsn ;
2008-06-17 17:05:19 +00:00
p - > fullhash = fullhash ;
2013-04-16 23:57:38 -04:00
p - > next = p - > prev = 0 ;
ctpair_rwlock_init ( & p - > rwlock ) ;
2013-04-16 23:57:17 -04:00
p - > cq = 0 ;
2007-09-21 17:55:49 +00:00
lru_add_to_list ( ct , p ) ;
2008-06-17 17:05:19 +00:00
u_int32_t h = fullhash & ( ct - > table_size - 1 ) ;
2007-09-21 17:55:49 +00:00
p - > hash_chain = ct - > table [ h ] ;
ct - > table [ h ] = p ;
ct - > n_in_table + + ;
ct - > size_current + = size ;
2007-10-17 22:10:47 +00:00
if ( ct - > n_in_table > ct - > table_size ) {
2008-06-14 01:38:53 +00:00
cachetable_rehash ( ct , ct - > table_size * 2 ) ;
2007-10-17 22:10:47 +00:00
}
2013-04-16 23:57:38 -04:00
return p ;
2007-09-17 16:23:05 +00:00
}
2008-06-14 01:38:53 +00:00
enum { hash_histogram_max = 100 } ;
static unsigned long long hash_histogram [ hash_histogram_max ] ;
2013-04-16 23:57:41 -04:00
void toku_cachetable_print_hash_histogram ( void ) {
2008-06-14 01:38:53 +00:00
int i ;
for ( i = 0 ; i < hash_histogram_max ; i + + )
2013-04-16 23:57:20 -04:00
if ( hash_histogram [ i ] ) printf ( " %d:%llu " , i , hash_histogram [ i ] ) ;
2008-06-14 01:38:53 +00:00
printf ( " \n " ) ;
2013-04-16 23:57:41 -04:00
printf ( " miss=% " PRIu64 " hit=% " PRIu64 " wait_reading=% " PRIu64 " wait=% " PRIu64 " \n " ,
2013-04-16 23:57:41 -04:00
cachetable_miss , cachetable_hit , cachetable_wait_reading , cachetable_wait ) ;
2008-06-14 01:38:53 +00:00
}
2013-04-16 23:57:20 -04:00
static void
note_hash_count ( int count ) {
2008-06-14 01:38:53 +00:00
if ( count > = hash_histogram_max ) count = hash_histogram_max - 1 ;
hash_histogram [ count ] + + ;
}
2008-06-17 17:05:19 +00:00
int toku_cachetable_put ( CACHEFILE cachefile , CACHEKEY key , u_int32_t fullhash , void * value , long size ,
2013-04-16 23:57:17 -04:00
CACHETABLE_FLUSH_CALLBACK flush_callback ,
CACHETABLE_FETCH_CALLBACK fetch_callback , void * extraargs ) {
2007-07-13 19:37:47 +00:00
WHEN_TRACE_CT ( printf ( " %s:%d CT cachetable_put(%lld)=%p \n " , __FILE__ , __LINE__ , key , value ) ) ;
2008-08-25 16:25:30 +00:00
CACHETABLE ct = cachefile - > cachetable ;
2013-04-16 23:57:17 -04:00
int count = 0 ;
2008-08-25 16:25:30 +00:00
cachetable_lock ( ct ) ;
2013-04-16 23:57:17 -04:00
cachetable_wait_write ( ct ) ;
2007-08-01 16:01:52 +00:00
{
PAIR p ;
2013-04-16 23:57:17 -04:00
for ( p = ct - > table [ fullhash & ( cachefile - > cachetable - > table_size - 1 ) ] ; p ; p = p - > hash_chain ) {
2008-06-14 01:38:53 +00:00
count + + ;
2013-04-16 23:57:18 -04:00
if ( p - > key . b = = key . b & & p - > cachefile = = cachefile ) {
2007-08-01 16:01:52 +00:00
// Semantically, these two asserts are not strictly right. After all, when are two functions eq?
// In practice, the functions better be the same.
assert ( p - > flush_callback = = flush_callback ) ;
assert ( p - > fetch_callback = = fetch_callback ) ;
2013-04-16 23:57:38 -04:00
ctpair_read_lock ( & p - > rwlock , ct - > mutex ) ;
2013-04-16 23:57:17 -04:00
note_hash_count ( count ) ;
2013-04-16 23:57:41 -04:00
cachetable_unlock ( ct ) ;
2007-08-01 16:01:52 +00:00
return - 1 ; /* Already present. */
}
2007-07-13 19:37:47 +00:00
}
}
2008-04-07 19:52:49 +00:00
int r ;
2013-04-16 23:57:17 -04:00
if ( ( r = maybe_flush_some ( ct , size ) ) ) {
2008-08-25 16:25:30 +00:00
cachetable_unlock ( ct ) ;
return r ;
}
2008-06-17 17:05:19 +00:00
// flushing could change the table size, but wont' change the fullhash
2013-04-16 23:57:38 -04:00
PAIR p = cachetable_insert_at ( ct , cachefile , key , value , CTPAIR_IDLE , fullhash , size , flush_callback , fetch_callback , extraargs , CACHETABLE_DIRTY , ZERO_LSN ) ;
assert ( p ) ;
ctpair_read_lock ( & p - > rwlock , ct - > mutex ) ;
2013-04-16 23:57:17 -04:00
note_hash_count ( count ) ;
2013-04-16 23:57:41 -04:00
cachetable_unlock ( ct ) ;
2013-04-16 23:57:38 -04:00
return 0 ;
2007-07-13 19:37:47 +00:00
}
2013-04-16 23:57:42 -04:00
# if TOKU_DO_WAIT_TIME
static u_int64_t tdelta ( struct timeval * tnew , struct timeval * told ) {
return ( tnew - > tv_sec * 1000000ULL + tnew - > tv_usec ) - ( told - > tv_sec * 1000000ULL + told - > tv_usec ) ;
}
# endif
2013-04-16 23:57:47 -04:00
// On entry: hold the ct lock and the ctpair_write_lock.
static void
write_pair_for_checkpoint ( CACHETABLE ct , PAIR p )
{
// this is essentially a flush_and_maybe_remove except that we already have p->rwlock and we just do the write in
// our own thread.
assert ( p - > dirty ) ; // it must be dirty if its pending.
assert ( p - > checkpoint_pending ) ;
p - > cq = 0 ; // I don't want any delay, just do it.
p - > state = CTPAIR_WRITING ;
assert ( ct - > size_writing > = 0 ) ;
ct - > size_writing + = p - > size ;
p - > write_me = TRUE ;
p - > remove_me = FALSE ;
cachetable_write_pair ( ct , p ) ; // unlocks the pair
}
2008-06-17 17:05:19 +00:00
int toku_cachetable_get_and_pin ( CACHEFILE cachefile , CACHEKEY key , u_int32_t fullhash , void * * value , long * sizep ,
2013-04-16 23:57:17 -04:00
CACHETABLE_FLUSH_CALLBACK flush_callback ,
CACHETABLE_FETCH_CALLBACK fetch_callback , void * extraargs ) {
2013-04-16 23:57:38 -04:00
CACHETABLE ct = cachefile - > cachetable ;
2007-07-13 19:37:47 +00:00
PAIR p ;
2008-06-14 01:38:53 +00:00
int count = 0 ;
2013-04-16 23:57:38 -04:00
cachetable_lock ( ct ) ;
cachetable_wait_write ( ct ) ;
for ( p = ct - > table [ fullhash & ( ct - > table_size - 1 ) ] ; p ; p = p - > hash_chain ) {
2008-06-14 01:38:53 +00:00
count + + ;
2013-04-16 23:57:18 -04:00
if ( p - > key . b = = key . b & & p - > cachefile = = cachefile ) {
2013-04-16 23:57:42 -04:00
# if TOKU_DO_WAIT_TIME
struct timeval t0 ;
int do_wait_time = 0 ;
# endif
2013-04-16 23:57:41 -04:00
if ( p - > rwlock . writer | | p - > rwlock . want_write ) {
if ( p - > state = = CTPAIR_READING )
cachetable_wait_reading + + ;
else
cachetable_wait + + ;
2013-04-16 23:57:42 -04:00
# if TOKU_DO_WAIT_TIME
do_wait_time = 1 ;
gettimeofday ( & t0 , NULL ) ;
# endif
2013-04-16 23:57:41 -04:00
}
2013-04-16 23:57:47 -04:00
if ( p - > checkpoint_pending ) {
ctpair_write_lock ( & p - > rwlock , ct - > mutex ) ;
write_pair_for_checkpoint ( ct , p ) ; // releases the pair_write_lock, but not the cachetable lock
}
// still have the cachetable lock
2013-04-16 23:57:38 -04:00
ctpair_read_lock ( & p - > rwlock , ct - > mutex ) ;
2013-04-16 23:57:42 -04:00
# if TOKU_DO_WAIT_TIME
if ( do_wait_time ) {
struct timeval tnow ;
gettimeofday ( & tnow , NULL ) ;
cachetable_wait_time + = tdelta ( & tnow , & t0 ) ;
}
# endif
2013-04-16 23:57:38 -04:00
if ( p - > state = = CTPAIR_INVALID ) {
2013-04-16 23:57:39 -04:00
ctpair_read_unlock ( & p - > rwlock ) ;
2013-04-16 23:57:38 -04:00
if ( ctpair_users ( & p - > rwlock ) = = 0 )
ctpair_destroy ( p ) ;
cachetable_unlock ( ct ) ;
return ENODEV ;
}
lru_touch ( ct , p ) ;
2007-07-13 19:37:47 +00:00
* value = p - > value ;
2007-10-19 14:07:41 +00:00
if ( sizep ) * sizep = p - > size ;
2013-04-16 23:57:41 -04:00
cachetable_hit + + ;
2013-04-16 23:57:17 -04:00
note_hash_count ( count ) ;
2013-04-16 23:57:41 -04:00
cachetable_unlock ( ct ) ;
2007-07-13 19:37:47 +00:00
WHEN_TRACE_CT ( printf ( " %s:%d cachtable_get_and_pin(%lld)--> %p \n " , __FILE__ , __LINE__ , key , * value ) ) ;
return 0 ;
}
}
2008-06-14 01:38:53 +00:00
note_hash_count ( count ) ;
2008-04-07 19:52:49 +00:00
int r ;
2008-06-17 17:05:19 +00:00
// Note. hashit(t,key) may have changed as a result of flushing. But fullhash won't have changed.
2007-07-13 19:37:47 +00:00
{
2013-04-16 23:57:38 -04:00
p = cachetable_insert_at ( ct , cachefile , key , zero_value , CTPAIR_READING , fullhash , zero_size , flush_callback , fetch_callback , extraargs , CACHETABLE_CLEAN , ZERO_LSN ) ;
assert ( p ) ;
ctpair_write_lock ( & p - > rwlock , ct - > mutex ) ;
2013-04-16 23:57:42 -04:00
# if TOKU_DO_WAIT_TIME
struct timeval t0 ;
gettimeofday ( & t0 , NULL ) ;
# endif
2013-04-16 23:57:38 -04:00
r = cachetable_fetch_pair ( ct , cachefile , p ) ;
if ( r ) {
cachetable_unlock ( ct ) ;
2007-09-21 17:55:49 +00:00
return r ;
2013-04-16 23:57:38 -04:00
}
2013-04-16 23:57:42 -04:00
cachetable_miss + + ;
# if TOKU_DO_WAIT_TIME
struct timeval tnow ;
gettimeofday ( & tnow , NULL ) ;
cachetable_miss_time + = tdelta ( & tnow , & t0 ) ;
# endif
2013-04-16 23:57:38 -04:00
ctpair_read_lock ( & p - > rwlock , ct - > mutex ) ;
assert ( p - > state = = CTPAIR_IDLE ) ;
* value = p - > value ;
if ( sizep ) * sizep = p - > size ;
2008-08-25 16:25:30 +00:00
}
2013-04-16 23:57:38 -04:00
r = maybe_flush_some ( ct , 0 ) ;
cachetable_unlock ( ct ) ;
2007-07-13 19:37:47 +00:00
WHEN_TRACE_CT ( printf ( " %s:%d did fetch: cachtable_get_and_pin(%lld)--> %p \n " , __FILE__ , __LINE__ , key , * value ) ) ;
2013-04-16 23:57:17 -04:00
return r ;
2007-07-13 19:37:47 +00:00
}
2013-04-16 23:57:18 -04:00
// Lookup a key in the cachetable. If it is found and it is not being written, then
// acquire a read lock on the pair, update the LRU list, and return sucess. However,
// if it is being written, then allow the writer to evict it. This prevents writers
// being suspended on a block that was just selected for eviction.
2008-06-17 17:05:19 +00:00
int toku_cachetable_maybe_get_and_pin ( CACHEFILE cachefile , CACHEKEY key , u_int32_t fullhash , void * * value ) {
2013-04-16 23:57:38 -04:00
CACHETABLE ct = cachefile - > cachetable ;
2007-07-13 19:37:47 +00:00
PAIR p ;
2008-06-14 01:38:53 +00:00
int count = 0 ;
2013-04-16 23:57:41 -04:00
int r = - 1 ;
2013-04-16 23:57:38 -04:00
cachetable_lock ( ct ) ;
for ( p = ct - > table [ fullhash & ( ct - > table_size - 1 ) ] ; p ; p = p - > hash_chain ) {
2008-06-14 01:38:53 +00:00
count + + ;
2013-04-16 23:57:38 -04:00
if ( p - > key . b = = key . b & & p - > cachefile = = cachefile & & p - > state = = CTPAIR_IDLE ) {
2013-04-16 23:57:47 -04:00
if ( p - > checkpoint_pending ) {
ctpair_write_lock ( & p - > rwlock , ct - > mutex ) ;
write_pair_for_checkpoint ( ct , p ) ; // releases the pair_write_lock, but not the cachetable lock
}
// still have the cachetable lock
2007-07-13 19:37:47 +00:00
* value = p - > value ;
2013-04-16 23:57:38 -04:00
ctpair_read_lock ( & p - > rwlock , ct - > mutex ) ;
lru_touch ( ct , p ) ;
2013-04-16 23:57:41 -04:00
r = 0 ;
2007-09-12 18:12:31 +00:00
//printf("%s:%d cachetable_maybe_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value);
2013-04-16 23:57:41 -04:00
break ;
2007-07-13 19:37:47 +00:00
}
}
2008-06-14 01:38:53 +00:00
note_hash_count ( count ) ;
2013-04-16 23:57:41 -04:00
cachetable_unlock ( ct ) ;
2013-04-16 23:57:41 -04:00
return r ;
2007-07-13 19:37:47 +00:00
}
2013-04-16 23:57:38 -04:00
int toku_cachetable_unpin ( CACHEFILE cachefile , CACHEKEY key , u_int32_t fullhash , enum cachetable_dirty dirty , long size ) {
2013-04-16 23:57:38 -04:00
CACHETABLE ct = cachefile - > cachetable ;
2007-07-13 19:37:47 +00:00
PAIR p ;
2007-07-20 14:20:58 +00:00
WHEN_TRACE_CT ( printf ( " %s:%d unpin(%lld) " , __FILE__ , __LINE__ , key ) ) ;
2007-08-01 16:01:52 +00:00
//printf("%s:%d is dirty now=%d\n", __FILE__, __LINE__, dirty);
2008-06-14 01:38:53 +00:00
int count = 0 ;
2013-04-16 23:57:41 -04:00
int r = - 1 ;
2008-06-17 17:05:19 +00:00
//assert(fullhash == toku_cachetable_hash(cachefile, key));
2013-04-16 23:57:38 -04:00
cachetable_lock ( ct ) ;
for ( p = ct - > table [ fullhash & ( ct - > table_size - 1 ) ] ; p ; p = p - > hash_chain ) {
2008-06-14 01:38:53 +00:00
count + + ;
2013-04-16 23:57:18 -04:00
if ( p - > key . b = = key . b & & p - > cachefile = = cachefile ) {
2013-04-16 23:57:17 -04:00
assert ( p - > rwlock . pinned > 0 ) ;
ctpair_read_unlock ( & p - > rwlock ) ;
2013-04-16 23:57:38 -04:00
if ( dirty ) p - > dirty = CACHETABLE_DIRTY ;
2007-09-21 17:55:49 +00:00
if ( size ! = 0 ) {
2013-04-16 23:57:38 -04:00
ct - > size_current - = p - > size ; if ( p - > state = = CTPAIR_WRITING ) ct - > size_writing - = p - > size ;
2007-09-21 17:55:49 +00:00
p - > size = size ;
2013-04-16 23:57:38 -04:00
ct - > size_current + = p - > size ; if ( p - > state = = CTPAIR_WRITING ) ct - > size_writing + = p - > size ;
2007-09-21 17:55:49 +00:00
}
2007-07-20 14:20:58 +00:00
WHEN_TRACE_CT ( printf ( " [count=%lld] \n " , p - > pinned ) ) ;
2008-05-27 21:08:31 +00:00
{
2013-04-16 23:57:38 -04:00
if ( ( r = maybe_flush_some ( ct , 0 ) ) ) {
cachetable_unlock ( ct ) ;
2013-04-16 23:57:17 -04:00
return r ;
2008-08-25 16:25:30 +00:00
}
2008-05-27 21:08:31 +00:00
}
2013-04-16 23:57:41 -04:00
r = 0 ; // we found one
break ;
2007-07-13 19:37:47 +00:00
}
}
2008-06-14 01:38:53 +00:00
note_hash_count ( count ) ;
2013-04-16 23:57:41 -04:00
cachetable_unlock ( ct ) ;
2013-04-16 23:57:41 -04:00
return r ;
2007-07-13 19:37:47 +00:00
}
2013-04-16 23:57:38 -04:00
int toku_cachefile_prefetch ( CACHEFILE cf , CACHEKEY key , u_int32_t fullhash ,
CACHETABLE_FLUSH_CALLBACK flush_callback ,
CACHETABLE_FETCH_CALLBACK fetch_callback ,
void * extraargs ) {
2013-04-16 23:57:39 -04:00
if ( 0 ) printf ( " %s:%d % " PRId64 " \n " , __FUNCTION__ , __LINE__ , key . b ) ;
2013-04-16 23:57:38 -04:00
CACHETABLE ct = cf - > cachetable ;
cachetable_lock ( ct ) ;
// lookup
PAIR p ;
for ( p = ct - > table [ fullhash & ( ct - > table_size - 1 ) ] ; p ; p = p - > hash_chain )
if ( p - > key . b = = key . b & & p - > cachefile = = cf )
break ;
// if not found then create a pair in the READING state and fetch it
if ( p = = 0 ) {
p = cachetable_insert_at ( ct , cf , key , zero_value , CTPAIR_READING , fullhash , zero_size , flush_callback , fetch_callback , extraargs , CACHETABLE_CLEAN , ZERO_LSN ) ;
assert ( p ) ;
ctpair_write_lock ( & p - > rwlock , ct - > mutex ) ;
# if DO_WORKER_THREAD
workitem_init ( & p - > asyncwork , cachetable_reader , p ) ;
workqueue_enq ( & ct - > wq , & p - > asyncwork , 0 ) ;
# else
cachetable_fetch_pair ( ct , cf , p ) ;
# endif
}
cachetable_unlock ( ct ) ;
return 0 ;
}
2007-11-14 17:58:38 +00:00
// effect: Move an object from one key to another key.
// requires: The object is pinned in the table
2007-11-19 23:47:44 +00:00
int toku_cachetable_rename ( CACHEFILE cachefile , CACHEKEY oldkey , CACHEKEY newkey ) {
2013-04-16 23:57:38 -04:00
CACHETABLE ct = cachefile - > cachetable ;
2013-04-16 23:57:17 -04:00
PAIR * ptr_to_p , p ;
int count = 0 ;
u_int32_t fullhash = toku_cachetable_hash ( cachefile , oldkey ) ;
2013-04-16 23:57:38 -04:00
cachetable_lock ( ct ) ;
for ( ptr_to_p = & ct - > table [ fullhash & ( ct - > table_size - 1 ) ] , p = * ptr_to_p ;
2013-04-16 23:57:17 -04:00
p ;
ptr_to_p = & p - > hash_chain , p = * ptr_to_p ) {
count + + ;
2013-04-16 23:57:18 -04:00
if ( p - > key . b = = oldkey . b & & p - > cachefile = = cachefile ) {
2013-04-16 23:57:17 -04:00
note_hash_count ( count ) ;
* ptr_to_p = p - > hash_chain ;
p - > key = newkey ;
u_int32_t new_fullhash = toku_cachetable_hash ( cachefile , newkey ) ;
2013-04-16 23:57:38 -04:00
u_int32_t nh = new_fullhash & ( ct - > table_size - 1 ) ;
2013-04-16 23:57:17 -04:00
p - > fullhash = new_fullhash ;
2013-04-16 23:57:38 -04:00
p - > hash_chain = ct - > table [ nh ] ;
ct - > table [ nh ] = p ;
cachetable_unlock ( ct ) ;
2013-04-16 23:57:17 -04:00
return 0 ;
}
}
note_hash_count ( count ) ;
2013-04-16 23:57:41 -04:00
cachetable_unlock ( ct ) ;
2013-04-16 23:57:17 -04:00
return - 1 ;
2007-07-13 19:37:47 +00:00
}
2007-11-19 23:47:44 +00:00
void toku_cachefile_verify ( CACHEFILE cf ) {
toku_cachetable_verify ( cf - > cachetable ) ;
2007-10-17 22:10:47 +00:00
}
2013-04-16 23:57:38 -04:00
void toku_cachetable_verify ( CACHETABLE ct ) {
cachetable_lock ( ct ) ;
2013-04-16 23:57:17 -04:00
2007-10-17 22:10:47 +00:00
// First clear all the verify flags by going through the hash chains
{
2008-06-14 01:38:53 +00:00
u_int32_t i ;
2013-04-16 23:57:38 -04:00
for ( i = 0 ; i < ct - > table_size ; i + + ) {
2007-10-17 22:10:47 +00:00
PAIR p ;
2013-04-16 23:57:38 -04:00
for ( p = ct - > table [ i ] ; p ; p = p - > hash_chain ) {
2007-10-17 22:10:47 +00:00
p - > verify_flag = 0 ;
}
}
}
// Now go through the LRU chain, make sure everything in the LRU chain is hashed, and set the verify flag.
{
PAIR p ;
2013-04-16 23:57:38 -04:00
for ( p = ct - > head ; p ; p = p - > next ) {
2007-10-17 22:10:47 +00:00
assert ( p - > verify_flag = = 0 ) ;
PAIR p2 ;
2008-06-17 17:05:19 +00:00
u_int32_t fullhash = p - > fullhash ;
//assert(fullhash==toku_cachetable_hash(p->cachefile, p->key));
2013-04-16 23:57:38 -04:00
for ( p2 = ct - > table [ fullhash & ( ct - > table_size - 1 ) ] ; p2 ; p2 = p2 - > hash_chain ) {
2007-10-17 22:10:47 +00:00
if ( p2 = = p ) {
/* found it */
goto next ;
}
}
fprintf ( stderr , " Something in the LRU chain is not hashed \n " ) ;
assert ( 0 ) ;
next :
p - > verify_flag = 1 ;
}
}
// Now make sure everything in the hash chains has the verify_flag set to 1.
{
2008-06-14 01:38:53 +00:00
u_int32_t i ;
2013-04-16 23:57:38 -04:00
for ( i = 0 ; i < ct - > table_size ; i + + ) {
2007-10-17 22:10:47 +00:00
PAIR p ;
2013-04-16 23:57:38 -04:00
for ( p = ct - > table [ i ] ; p ; p = p - > hash_chain ) {
2007-10-17 22:10:47 +00:00
assert ( p - > verify_flag ) ;
}
}
}
2013-04-16 23:57:17 -04:00
2013-04-16 23:57:38 -04:00
cachetable_unlock ( ct ) ;
2007-10-17 22:10:47 +00:00
}
2013-04-16 23:57:38 -04:00
static void assert_cachefile_is_flushed_and_removed ( CACHETABLE ct , CACHEFILE cf ) {
2008-06-14 01:38:53 +00:00
u_int32_t i ;
2007-09-12 18:12:31 +00:00
// Check it two ways
// First way: Look through all the hash chains
2013-04-16 23:57:38 -04:00
for ( i = 0 ; i < ct - > table_size ; i + + ) {
2007-09-12 18:12:31 +00:00
PAIR p ;
2013-04-16 23:57:38 -04:00
for ( p = ct - > table [ i ] ; p ; p = p - > hash_chain ) {
2007-09-12 18:12:31 +00:00
assert ( p - > cachefile ! = cf ) ;
}
}
// Second way: Look through the LRU list.
{
PAIR p ;
2013-04-16 23:57:38 -04:00
for ( p = ct - > head ; p ; p = p - > next ) {
2007-09-12 18:12:31 +00:00
assert ( p - > cachefile ! = cf ) ;
}
}
}
2013-04-16 23:57:38 -04:00
// Flush all of the pairs that belong to a cachefile (or all pairs if
// the cachefile is NULL.
static int cachetable_flush_cachefile ( CACHETABLE ct , CACHEFILE cf ) {
2013-04-16 23:57:17 -04:00
unsigned nfound = 0 ;
2013-04-16 23:57:38 -04:00
struct workqueue cq ;
workqueue_init ( & cq ) ;
2013-04-16 23:57:38 -04:00
// find all of the pairs owned by a cachefile and redirect their completion
// to a completion queue. flush and remove pairs in the IDLE state if they
// are dirty. pairs in the READING or WRITING states are already in the
// work queue.
2013-04-16 23:57:17 -04:00
unsigned i ;
2013-04-16 23:57:39 -04:00
2013-04-16 23:57:17 -04:00
for ( i = 0 ; i < ct - > table_size ; i + + ) {
2007-07-13 19:37:47 +00:00
PAIR p ;
2013-04-16 23:57:18 -04:00
for ( p = ct - > table [ i ] ; p ; p = p - > hash_chain ) {
2013-04-16 23:57:17 -04:00
if ( cf = = 0 | | p - > cachefile = = cf ) {
nfound + + ;
p - > cq = & cq ;
2013-04-16 23:57:38 -04:00
if ( p - > state = = CTPAIR_IDLE )
2013-04-16 23:57:38 -04:00
flush_and_maybe_remove ( ct , p , TRUE ) ;
2007-07-13 19:37:47 +00:00
}
}
}
2013-04-16 23:57:38 -04:00
// wait for all of the pairs in the work queue to complete
2013-04-16 23:57:17 -04:00
for ( i = 0 ; i < nfound ; i + + ) {
2013-04-16 23:57:38 -04:00
cachetable_unlock ( ct ) ;
WORKITEM wi = 0 ;
int r = workqueue_deq ( & cq , & wi , 1 ) ; assert ( r = = 0 ) ;
cachetable_lock ( ct ) ;
2013-04-16 23:57:38 -04:00
PAIR p = workitem_arg ( wi ) ;
p - > cq = 0 ;
2013-04-16 23:57:41 -04:00
if ( p - > state = = CTPAIR_READING ) {
ctpair_write_unlock ( & p - > rwlock ) ;
cachetable_maybe_remove_and_free_pair ( ct , p ) ;
} else if ( p - > state = = CTPAIR_WRITING ) {
2013-04-16 23:57:38 -04:00
cachetable_complete_write_pair ( ct , p , TRUE ) ;
2013-04-16 23:57:41 -04:00
} else if ( p - > state = = CTPAIR_INVALID ) {
abort_fetch_pair ( p ) ;
} else
2013-04-16 23:57:38 -04:00
assert ( 0 ) ;
2013-04-16 23:57:17 -04:00
}
2013-04-16 23:57:38 -04:00
workqueue_destroy ( & cq ) ;
2013-04-16 23:57:38 -04:00
assert_cachefile_is_flushed_and_removed ( ct , cf ) ;
2007-10-05 14:46:49 +00:00
2013-04-16 23:57:17 -04:00
if ( ( 4 * ct - > n_in_table < ct - > table_size ) & & ( ct - > table_size > 4 ) )
cachetable_rehash ( ct , ct - > table_size / 2 ) ;
2007-10-05 14:46:49 +00:00
2007-07-13 19:37:47 +00:00
return 0 ;
}
/* Require that it all be flushed. */
2013-04-16 23:57:38 -04:00
int toku_cachetable_close ( CACHETABLE * ctp ) {
CACHETABLE ct = * ctp ;
2007-07-13 19:37:47 +00:00
int r ;
2013-04-16 23:57:38 -04:00
cachetable_lock ( ct ) ;
2013-04-16 23:57:38 -04:00
if ( ( r = cachetable_flush_cachefile ( ct , 0 ) ) ) {
2013-04-16 23:57:38 -04:00
cachetable_unlock ( ct ) ;
2013-04-16 23:57:17 -04:00
return r ;
}
u_int32_t i ;
2013-04-16 23:57:38 -04:00
for ( i = 0 ; i < ct - > table_size ; i + + ) {
if ( ct - > table [ i ] ) return - 1 ;
2007-07-13 19:37:47 +00:00
}
2013-04-16 23:57:38 -04:00
assert ( ct - > size_writing = = 0 ) ;
cachetable_unlock ( ct ) ;
toku_destroy_workers ( & ct - > wq , & ct - > threadpool ) ;
toku_free ( ct - > table ) ;
toku_free ( ct ) ;
* ctp = 0 ;
2007-07-13 19:37:47 +00:00
return 0 ;
}
2013-04-16 23:57:24 -04:00
int toku_cachetable_unpin_and_remove ( CACHEFILE cachefile , CACHEKEY key ) {
2013-04-16 23:57:24 -04:00
int r = ENOENT ;
// Removing something already present is OK.
2013-04-16 23:57:38 -04:00
CACHETABLE ct = cachefile - > cachetable ;
2007-07-13 19:37:47 +00:00
PAIR p ;
2008-06-14 01:38:53 +00:00
int count = 0 ;
2013-04-16 23:57:38 -04:00
cachetable_lock ( ct ) ;
2013-04-16 23:57:24 -04:00
u_int32_t fullhash = toku_cachetable_hash ( cachefile , key ) ;
2013-04-16 23:57:38 -04:00
for ( p = ct - > table [ fullhash & ( ct - > table_size - 1 ) ] ; p ; p = p - > hash_chain ) {
2013-04-16 23:57:24 -04:00
count + + ;
2013-04-16 23:57:18 -04:00
if ( p - > key . b = = key . b & & p - > cachefile = = cachefile ) {
2013-04-16 23:57:38 -04:00
p - > dirty = CACHETABLE_CLEAN ; // clear the dirty bit. We're just supposed to remove it.
2013-04-16 23:57:24 -04:00
assert ( p - > rwlock . pinned = = 1 ) ;
ctpair_read_unlock ( & p - > rwlock ) ;
2013-04-16 23:57:38 -04:00
struct workqueue cq ;
workqueue_init ( & cq ) ;
2013-04-16 23:57:24 -04:00
p - > cq = & cq ;
2013-04-16 23:57:38 -04:00
if ( p - > state = = CTPAIR_IDLE )
2013-04-16 23:57:38 -04:00
flush_and_maybe_remove ( ct , p , FALSE ) ;
2013-04-16 23:57:38 -04:00
cachetable_unlock ( ct ) ;
WORKITEM wi = 0 ;
r = workqueue_deq ( & cq , & wi , 1 ) ;
cachetable_lock ( ct ) ;
PAIR pp = workitem_arg ( wi ) ;
2013-04-16 23:57:24 -04:00
assert ( r = = 0 & & pp = = p ) ;
2013-04-16 23:57:38 -04:00
cachetable_complete_write_pair ( ct , p , TRUE ) ;
workqueue_destroy ( & cq ) ;
2013-04-16 23:57:24 -04:00
r = 0 ;
2007-10-17 22:10:47 +00:00
goto done ;
2007-07-13 19:37:47 +00:00
}
}
2007-10-17 22:10:47 +00:00
done :
2008-06-14 01:38:53 +00:00
note_hash_count ( count ) ;
2013-04-16 23:57:41 -04:00
cachetable_unlock ( ct ) ;
2013-04-16 23:57:24 -04:00
return r ;
2007-07-13 19:37:47 +00:00
}
2013-04-16 23:57:47 -04:00
static int
log_open_txn ( TOKULOGGER logger , TOKUTXN txn , void * UU ( v ) )
{
if ( toku_logger_txn_parent ( txn ) = = NULL ) { // only have to log the open root transactions
int r = toku_log_xstillopen ( logger , NULL , 0 , toku_txn_get_txnid ( txn ) ) ;
assert ( r = = 0 ) ;
}
return 0 ;
}
int toku_cachetable_checkpoint ( CACHETABLE ct , TOKULOGGER logger ) {
2013-04-16 23:57:17 -04:00
// Requires: Everything is unpinned. (In the multithreaded version we have to wait for things to get unpinned and then
// grab them (or else the unpinner has to do something.)
// Algorithm: Write a checkpoint record to the log, noting the LSN of that record.
// Note the LSN of the previous checkpoint (stored in lsn_of_checkpoint)
// For every (unpinnned) dirty node in which the LSN is newer than the prev checkpoint LSN:
// flush the node (giving it a new nodeid, and fixing up the downpointer in the parent)
// Watch out since evicting the node modifies the hash table.
2013-04-16 23:57:47 -04:00
LSN begin_lsn ;
{
PAIR pendings = 0 ;
2013-04-16 23:57:18 -04:00
unsigned i ;
2013-04-16 23:57:47 -04:00
cachetable_lock ( ct ) ;
if ( logger ) {
// The checkpoint must be performed after the lock is acquired.
{
int r = toku_log_begin_checkpoint ( logger , & begin_lsn , 0 ) ;
assert ( r = = 0 ) ;
}
// Log all the open transactions
{
int r = toku_logger_iterate_over_live_txns ( logger , log_open_txn , NULL ) ;
assert ( r = = 0 ) ;
}
// Log all the open files
{
CACHEFILE cf ;
for ( cf = ct - > cachefiles ; cf ; cf = cf - > next ) {
BYTESTRING bs = { strlen ( cf - > fname ) , // don't include the NUL
cf - > fname } ;
int r = toku_log_fassociate ( logger , NULL , 0 , cf - > filenum , bs ) ;
assert ( r = = 0 ) ;
}
}
}
{
CACHEFILE cf ;
for ( cf = ct - > cachefiles ; cf ; cf = cf - > next ) {
if ( cf - > begin_checkpoint_userdata ) {
int r = cf - > begin_checkpoint_userdata ( cf , begin_lsn , cf - > userdata ) ;
assert ( r = = 0 ) ;
}
}
}
2013-04-16 23:57:18 -04:00
for ( i = 0 ; i < ct - > table_size ; i + + ) {
PAIR p ;
for ( p = ct - > table [ i ] ; p ; p = p - > hash_chain ) {
2013-04-16 23:57:47 -04:00
p - > checkpoint_pending = FALSE ;
2013-04-16 23:57:18 -04:00
// p->dirty && p->modified_lsn.lsn>ct->lsn_of_checkpoint.lsn
2013-04-16 23:57:38 -04:00
if ( p - > state = = CTPAIR_READING )
continue ; // skip pairs being read as they will be clean
else if ( p - > state = = CTPAIR_IDLE | | p - > state = = CTPAIR_WRITING ) {
2013-04-16 23:57:47 -04:00
if ( p - > dirty ) {
p - > checkpoint_pending = TRUE ;
p - > checkpoint_list = pendings ;
pendings = p ;
}
2013-04-16 23:57:38 -04:00
} else
assert ( 0 ) ;
2013-04-16 23:57:18 -04:00
}
}
2013-04-16 23:57:47 -04:00
// Now anything that is pending should be written. If anyone else comes across a pending object, they should make sure it's
// written too. And no one better deallocate a pending PAIR either.
PAIR p ;
while ( ( p = pendings ) ! = 0 ) {
pendings = p - > checkpoint_list ;
ctpair_write_lock ( & p - > rwlock , ct - > mutex ) ; // grab an exclusive lock on the pair
// If it is no longer pending we don't have do do anything
if ( p - > checkpoint_pending ) {
write_pair_for_checkpoint ( ct , p ) ; // clears the pending bit, writes it out, and unlocks the write pair,
} else {
// it was previously written, so we just have to releases the lock.
ctpair_write_unlock ( & p - > rwlock ) ; // didn't call cachetable_write_pair so we have to unlock it ourselves.
}
// Don't need to unlock and lock cachetable, because the cachetable was unlocked and locked while the flush callback ran.
}
cachetable_unlock ( ct ) ;
{ // have just written data blocks, so next write the translation and header for each open dictionary
2013-04-16 23:57:29 -04:00
CACHEFILE cf ;
for ( cf = ct - > cachefiles ; cf ; cf = cf - > next ) {
if ( cf - > checkpoint_userdata ) {
int r = cf - > checkpoint_userdata ( cf , cf - > userdata ) ;
assert ( r = = 0 ) ;
}
}
}
2013-04-16 23:57:47 -04:00
{ // everything has been written to file (or at least OS internal buffer)...
// ... so fsync and call checkpoint-end function in block translator
CACHEFILE cf ;
for ( cf = ct - > cachefiles ; cf ; cf = cf - > next ) {
if ( cf - > end_checkpoint_userdata ) {
int r = cf - > end_checkpoint_userdata ( cf , cf - > userdata ) ;
assert ( r = = 0 ) ;
}
}
}
2013-04-16 23:57:29 -04:00
2013-04-16 23:57:17 -04:00
}
2013-04-16 23:57:18 -04:00
2013-04-16 23:57:47 -04:00
if ( logger ) {
int r = toku_log_end_checkpoint ( logger , NULL , 0 , begin_lsn . lsn ) ;
assert ( r = = 0 ) ;
toku_logger_note_checkpoint ( logger , begin_lsn ) ;
}
2013-04-16 23:57:17 -04:00
return 0 ;
}
TOKULOGGER toku_cachefile_logger ( CACHEFILE cf ) {
return cf - > cachetable - > logger ;
}
FILENUM toku_cachefile_filenum ( CACHEFILE cf ) {
return cf - > filenum ;
}
2013-04-16 23:57:38 -04:00
# if DO_WORKER_THREAD
2013-04-16 23:57:17 -04:00
2013-04-16 23:57:38 -04:00
// Worker thread function to write a pair from memory to its cachefile
static void cachetable_writer ( WORKITEM wi ) {
PAIR p = workitem_arg ( wi ) ;
CACHETABLE ct = p - > cachefile - > cachetable ;
cachetable_lock ( ct ) ;
cachetable_write_pair ( ct , p ) ;
cachetable_unlock ( ct ) ;
}
2013-04-16 23:57:17 -04:00
2013-04-16 23:57:38 -04:00
// Worker thread function to read a pair from a cachefile to memory
static void cachetable_reader ( WORKITEM wi ) {
PAIR p = workitem_arg ( wi ) ;
CACHETABLE ct = p - > cachefile - > cachetable ;
2013-04-16 23:57:17 -04:00
cachetable_lock ( ct ) ;
2013-04-16 23:57:38 -04:00
int r = cachetable_fetch_pair ( ct , p - > cachefile , p ) ;
2013-04-16 23:57:41 -04:00
# define DO_FLUSH_FROM_READER 0
if ( r = = 0 ) {
# if DO_FLUSH_FROM_READER
2013-04-16 23:57:38 -04:00
maybe_flush_some ( ct , 0 ) ;
2013-04-16 23:57:41 -04:00
# else
r = r ;
# endif
}
2013-04-16 23:57:17 -04:00
cachetable_unlock ( ct ) ;
}
# endif
// debug functions
2007-09-21 17:55:49 +00:00
2013-04-16 23:57:38 -04:00
int toku_cachetable_assert_all_unpinned ( CACHETABLE ct ) {
2013-04-16 23:57:18 -04:00
u_int32_t i ;
int some_pinned = 0 ;
2013-04-16 23:57:38 -04:00
cachetable_lock ( ct ) ;
for ( i = 0 ; i < ct - > table_size ; i + + ) {
2013-04-16 23:57:18 -04:00
PAIR p ;
2013-04-16 23:57:38 -04:00
for ( p = ct - > table [ i ] ; p ; p = p - > hash_chain ) {
2013-04-16 23:57:18 -04:00
assert ( ctpair_pinned ( & p - > rwlock ) > = 0 ) ;
if ( ctpair_pinned ( & p - > rwlock ) ) {
2013-04-16 23:57:25 -04:00
//printf("%s:%d pinned: %"PRId64" (%p)\n", __FILE__, __LINE__, p->key.b, p->value);
2013-04-16 23:57:18 -04:00
some_pinned = 1 ;
}
}
}
2013-04-16 23:57:38 -04:00
cachetable_unlock ( ct ) ;
2013-04-16 23:57:18 -04:00
return some_pinned ;
}
int toku_cachefile_count_pinned ( CACHEFILE cf , int print_them ) {
u_int32_t i ;
int n_pinned = 0 ;
2013-04-16 23:57:38 -04:00
CACHETABLE ct = cf - > cachetable ;
cachetable_lock ( ct ) ;
for ( i = 0 ; i < ct - > table_size ; i + + ) {
2013-04-16 23:57:18 -04:00
PAIR p ;
2013-04-16 23:57:38 -04:00
for ( p = ct - > table [ i ] ; p ; p = p - > hash_chain ) {
2013-04-16 23:57:18 -04:00
assert ( ctpair_pinned ( & p - > rwlock ) > = 0 ) ;
if ( ctpair_pinned ( & p - > rwlock ) & & ( cf = = 0 | | p - > cachefile = = cf ) ) {
if ( print_them ) printf ( " %s:%d pinned: % " PRId64 " (%p) \n " , __FILE__ , __LINE__ , p - > key . b , p - > value ) ;
n_pinned + + ;
}
}
}
2013-04-16 23:57:38 -04:00
cachetable_unlock ( ct ) ;
2013-04-16 23:57:18 -04:00
return n_pinned ;
}
2007-11-19 23:47:44 +00:00
void toku_cachetable_print_state ( CACHETABLE ct ) {
2013-04-16 23:57:18 -04:00
u_int32_t i ;
cachetable_lock ( ct ) ;
for ( i = 0 ; i < ct - > table_size ; i + + ) {
PAIR p = ct - > table [ i ] ;
if ( p ! = 0 ) {
2013-04-16 23:57:20 -04:00
printf ( " t[%u]= " , i ) ;
2013-04-16 23:57:18 -04:00
for ( p = ct - > table [ i ] ; p ; p = p - > hash_chain ) {
2013-04-16 23:57:38 -04:00
printf ( " {% " PRId64 " , %p, dirty=%d, pin=%d, size=%ld} " , p - > key . b , p - > cachefile , ( int ) p - > dirty , p - > rwlock . pinned , p - > size ) ;
2013-04-16 23:57:18 -04:00
}
printf ( " \n " ) ;
}
}
cachetable_unlock ( ct ) ;
}
2007-09-21 17:55:49 +00:00
2007-11-19 23:47:44 +00:00
void toku_cachetable_get_state ( CACHETABLE ct , int * num_entries_ptr , int * hash_size_ptr , long * size_current_ptr , long * size_limit_ptr ) {
2013-04-16 23:57:17 -04:00
cachetable_lock ( ct ) ;
2007-09-21 17:55:49 +00:00
if ( num_entries_ptr )
* num_entries_ptr = ct - > n_in_table ;
if ( hash_size_ptr )
* hash_size_ptr = ct - > table_size ;
if ( size_current_ptr )
* size_current_ptr = ct - > size_current ;
if ( size_limit_ptr )
* size_limit_ptr = ct - > size_limit ;
2013-04-16 23:57:17 -04:00
cachetable_unlock ( ct ) ;
2007-09-21 17:55:49 +00:00
}
2008-06-05 22:09:59 +00:00
int toku_cachetable_get_key_state ( CACHETABLE ct , CACHEKEY key , CACHEFILE cf , void * * value_ptr ,
2007-11-19 23:47:44 +00:00
int * dirty_ptr , long long * pin_ptr , long * size_ptr ) {
2007-09-17 16:23:05 +00:00
PAIR p ;
2008-06-14 01:38:53 +00:00
int count = 0 ;
2013-04-16 23:57:17 -04:00
int r = - 1 ;
2008-06-17 17:05:19 +00:00
u_int32_t fullhash = toku_cachetable_hash ( cf , key ) ;
2013-04-16 23:57:17 -04:00
cachetable_lock ( ct ) ;
2008-06-17 17:05:19 +00:00
for ( p = ct - > table [ fullhash & ( ct - > table_size - 1 ) ] ; p ; p = p - > hash_chain ) {
2008-06-14 01:38:53 +00:00
count + + ;
2013-04-16 23:57:18 -04:00
if ( p - > key . b = = key . b & & p - > cachefile = = cf ) {
note_hash_count ( count ) ;
2007-09-21 17:55:49 +00:00
if ( value_ptr )
* value_ptr = p - > value ;
if ( dirty_ptr )
* dirty_ptr = p - > dirty ;
if ( pin_ptr )
2013-04-16 23:57:17 -04:00
* pin_ptr = p - > rwlock . pinned ;
2007-09-21 17:55:49 +00:00
if ( size_ptr )
* size_ptr = p - > size ;
2013-04-16 23:57:17 -04:00
r = 0 ;
break ;
2007-09-17 16:23:05 +00:00
}
}
2013-04-16 23:57:41 -04:00
note_hash_count ( count ) ;
cachetable_unlock ( ct ) ;
2013-04-16 23:57:17 -04:00
return r ;
2008-06-17 17:05:19 +00:00
}
2013-04-16 23:57:18 -04:00
2013-04-16 23:57:29 -04:00
void
toku_cachefile_set_userdata ( CACHEFILE cf ,
void * userdata ,
2013-04-16 23:57:38 -04:00
int ( * close_userdata ) ( CACHEFILE , void * , char * * ) ,
2013-04-16 23:57:47 -04:00
int ( * checkpoint_userdata ) ( CACHEFILE , void * ) ,
int ( * begin_checkpoint_userdata ) ( CACHEFILE , LSN , void * ) ,
int ( * end_checkpoint_userdata ) ( CACHEFILE , void * ) ) {
2013-04-16 23:57:18 -04:00
cf - > userdata = userdata ;
cf - > close_userdata = close_userdata ;
2013-04-16 23:57:29 -04:00
cf - > checkpoint_userdata = checkpoint_userdata ;
2013-04-16 23:57:47 -04:00
cf - > begin_checkpoint_userdata = begin_checkpoint_userdata ;
cf - > end_checkpoint_userdata = end_checkpoint_userdata ;
2013-04-16 23:57:18 -04:00
}
2013-04-16 23:57:24 -04:00
2013-04-16 23:57:18 -04:00
void * toku_cachefile_get_userdata ( CACHEFILE cf ) {
return cf - > userdata ;
}
2013-04-16 23:57:27 -04:00
2013-04-16 23:57:47 -04:00
int
toku_cachefile_fsync ( CACHEFILE cf ) {
int r ;
if ( cf - > fname = = 0 ) r = 0 ; //Don't fsync /dev/null
else r = fsync ( cf - > fd ) ;
return r ;
}
2013-04-16 23:57:27 -04:00
int toku_cachefile_redirect_nullfd ( CACHEFILE cf ) {
int null_fd ;
struct fileid fileid ;
null_fd = open ( DEV_NULL_FILE , O_WRONLY + O_BINARY ) ;
assert ( null_fd > = 0 ) ;
2013-04-16 23:57:28 -04:00
toku_os_get_unique_file_id ( null_fd , & fileid ) ;
2013-04-16 23:57:27 -04:00
close ( cf - > fd ) ;
cf - > fd = null_fd ;
if ( cf - > fname ) {
toku_free ( cf - > fname ) ;
cf - > fname = 0 ;
}
cachefile_init_filenum ( cf , null_fd , NULL , fileid ) ;
return 0 ;
}
2013-04-16 23:57:47 -04:00
u_int64_t
toku_cachefile_size_in_memory ( CACHEFILE cf )
{
u_int64_t result = 0 ;
CACHETABLE ct = cf - > cachetable ;
unsigned long i ;
for ( i = 0 ; i < ct - > table_size ; i + + ) {
PAIR p ;
for ( p = ct - > table [ i ] ; p ; p = p - > hash_chain ) {
if ( p - > cachefile = = cf ) {
result + = p - > size ;
}
}
}
return result ;
}
2013-04-16 23:57:47 -04:00