Addresses #1993 refs[t:1993] Added second callback to checkpoint.

git-svn-id: file:///svn/toku/tokudb@14369 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
Barry Perlman 2013-04-16 23:58:00 -04:00 committed by Yoni Fogel
parent f44c71dbdb
commit 554596ce59
14 changed files with 66 additions and 38 deletions

View file

@ -12,9 +12,9 @@ extern "C" {
#define TOKUDB_NATIVE_H 0
#define DB_VERSION_MAJOR 4
#define DB_VERSION_MINOR 6
#define DB_VERSION_PATCH 21
#define DB_VERSION_PATCH 19
#ifndef _TOKUDB_WRAP_H
#define DB_VERSION_STRING "Tokutek: TokuDB 4.6.21"
#define DB_VERSION_STRING "Tokutek: TokuDB 4.6.19"
#else
#define DB_VERSION_STRING_ydb "Tokutek: TokuDB (wrapped bdb)"
#endif
@ -363,6 +363,7 @@ int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, toku_off_t))
int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default")));
void db_env_set_checkpoint_callback (void (*)(void*), void*) __attribute__((__visibility__("default")));
void db_env_set_checkpoint_callback2 (void (*)(void*), void*) __attribute__((__visibility__("default")));
#if defined(__cplusplus)
}
#endif

View file

@ -452,6 +452,7 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un
printf("int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) %s;\n", VISIBLE);
printf("int db_env_set_func_realloc (void *(*)(void*, size_t)) %s;\n", VISIBLE);
printf("void db_env_set_checkpoint_callback (void (*)(void*), void*) %s;\n", VISIBLE);
printf("void db_env_set_checkpoint_callback2 (void (*)(void*), void*) %s;\n", VISIBLE);
printf("#if defined(__cplusplus)\n}\n#endif\n");
printf("#endif\n");
return 0;

View file

@ -12,9 +12,9 @@ extern "C" {
#define TOKUDB_NATIVE_H 0
#define DB_VERSION_MAJOR 4
#define DB_VERSION_MINOR 6
#define DB_VERSION_PATCH 21
#define DB_VERSION_PATCH 19
#ifndef _TOKUDB_WRAP_H
#define DB_VERSION_STRING "Tokutek: TokuDB 4.6.21"
#define DB_VERSION_STRING "Tokutek: TokuDB 4.6.19"
#else
#define DB_VERSION_STRING_ydb "Tokutek: TokuDB (wrapped bdb)"
#endif
@ -363,6 +363,7 @@ int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, toku_off_t))
int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default")));
void db_env_set_checkpoint_callback (void (*)(void*), void*) __attribute__((__visibility__("default")));
void db_env_set_checkpoint_callback2 (void (*)(void*), void*) __attribute__((__visibility__("default")));
#if defined(__cplusplus)
}
#endif

View file

@ -207,7 +207,7 @@ checkpoint_thread (void *cachetable_v)
{
char *error_string;
CACHETABLE ct = cachetable_v;
int r = toku_checkpoint(ct, ct->logger, &error_string, NULL, NULL);
int r = toku_checkpoint(ct, ct->logger, &error_string, NULL, NULL, NULL, NULL);
if (r) {
if (error_string) {
fprintf(stderr, "%s:%d Got error %d while doing: %s\n", __FILE__, __LINE__, r, error_string);
@ -1637,12 +1637,12 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
int
toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string) {
toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string, void (*testcallback_f)(void*), void * testextra) {
// Requires: The big checkpoint lock must be held (see checkpoint.c).
// Algorithm: Write all pending nodes to disk
// Use checkpoint callback to write snapshot information to disk (header, btt)
// Use end_checkpoint callback to fsync dictionary and log, and to free unused blocks
//
// Note: If testcallback is null (for testing purposes only), call it after writing dictionary but before writing log
int retval = 0;
cachetable_lock(ct);
@ -1704,6 +1704,10 @@ toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_st
}
}
// For testing purposes only. Dictionary has been fsync-ed to disk but log has not yet been written.
if (testcallback_f)
testcallback_f(testextra);
if (logger) {
int r = toku_log_end_checkpoint(logger, NULL,
1, // want the end_checkpoint to be fsync'd

View file

@ -10,7 +10,7 @@
#include "workqueue.h"
// TODO: #1398 Get rid of this entire straddle_callback hack
// Man is this ugly.
// Man is this ugly.
#ifdef BRT_LEVEL_STRADDLE_CALLBACK_LOGIC_NOT_READY
extern int STRADDLE_HACK_INSIDE_CALLBACK;
#endif
@ -47,7 +47,7 @@ int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf);
// TODO: #1510 Add comments on how these behave
int toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER);
int toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string);
int toku_cachetable_end_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string, void (*testcallback_f)(void*), void * testextra);
// Does an fsync of a cachefile.
// Handles the case where cf points to /dev/null

View file

@ -52,12 +52,8 @@
#include "cachetable.h"
#include "checkpoint.h"
// breadcrumbs for debugging
static u_int64_t checkpoint_breadcrumb0 = 0;
static u_int64_t checkpoint_breadcrumb1 = 0;
static u_int64_t checkpoint_breadcrumb2 = 0;
static u_int64_t checkpoint_breadcrumb3 = 0;
static u_int64_t checkpoint_breadcrumb4 = 0;
// footprint for debugging only
static u_int64_t checkpoint_footprint = 0;
static toku_pthread_rwlock_t checkpoint_safe_lock;
static toku_pthread_rwlock_t multi_operation_lock;
@ -182,31 +178,33 @@ toku_checkpoint_destroy(void) {
// Take a checkpoint of all currently open dictionaries
int
toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string, void (*callback_f)(void*), void * extra) {
toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string,
void (*callback_f)(void*), void * extra,
void (*callback2_f)(void*), void * extra2) {
int r;
checkpoint_breadcrumb0++;
checkpoint_footprint = 0;
assert(initialized);
multi_operation_checkpoint_lock();
checkpoint_safe_checkpoint_lock();
ydb_lock();
checkpoint_breadcrumb1++;
checkpoint_footprint = 1;
r = toku_cachetable_begin_checkpoint(ct, logger);
multi_operation_checkpoint_unlock();
ydb_unlock();
checkpoint_breadcrumb2++;
checkpoint_footprint = 2;
if (r==0) {
if (callback_f)
callback_f(extra); // callback is called with checkpoint_safe_lock still held
r = toku_cachetable_end_checkpoint(ct, logger, error_string);
r = toku_cachetable_end_checkpoint(ct, logger, error_string, callback2_f, extra2);
}
checkpoint_breadcrumb3++;
checkpoint_footprint = 3;
checkpoint_safe_checkpoint_unlock();
checkpoint_breadcrumb4++;
checkpoint_footprint = 4;
return r;
}

View file

@ -52,6 +52,8 @@ int toku_checkpoint_init(void (*ydb_lock_callback)(void), void (*ydb_unlock_call
int toku_checkpoint_destroy(void);
// Take a checkpoint of all currently open dictionaries
// Callback is called during checkpoint procedure while checkpoint_safe lock is still held.
// Callback is primarily intended for use in testing.
int toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string, void (*callback_f)(void*), void * extra);
// Callbacks are called during checkpoint procedure while checkpoint_safe lock is still held.
// Callbacks are primarily intended for use in testing.
int toku_checkpoint(CACHETABLE ct, TOKULOGGER logger, char **error_string,
void (*callback_f)(void*), void * extra,
void (*callback2_f)(void*), void * extra2);

View file

@ -727,7 +727,7 @@ static int do_recovery(RECOVER_ENV env, const char *data_dir, const char *log_di
r = toku_cachetable_begin_checkpoint(env->ct, env->logger);
assert(r == 0);
// TODO: what about the error_string?
r = toku_cachetable_end_checkpoint(env->ct, env->logger, NULL);
r = toku_cachetable_end_checkpoint(env->ct, env->logger, NULL, NULL, NULL);
assert(r == 0);
r = chdir(org_wd);

View file

@ -87,7 +87,7 @@ do_update (void *UU(ignore))
static void*
do_checkpoint (void *UU(v))
{
int r = toku_checkpoint(ct, NULL, NULL, NULL, NULL);
int r = toku_checkpoint(ct, NULL, NULL, NULL, NULL, NULL, NULL);
assert(r == 0);
return 0;
}
@ -136,14 +136,14 @@ static void checkpoint_pending(void) {
//printf("E43\n");
n_flush = n_write_me = n_keep_me = n_fetch = 0; expect_value = 43;
r = toku_checkpoint(ct, NULL, NULL, NULL, NULL);
r = toku_checkpoint(ct, NULL, NULL, NULL, NULL, NULL, NULL);
assert(r == 0);
assert(n_flush == N && n_write_me == N && n_keep_me == N);
// a subsequent checkpoint should cause no flushes, or writes since all of the items are clean
n_flush = n_write_me = n_keep_me = n_fetch = 0;
r = toku_checkpoint(ct, NULL, NULL, NULL, NULL);
r = toku_checkpoint(ct, NULL, NULL, NULL, NULL, NULL, NULL);
assert(r == 0);
assert(n_flush == 0 && n_write_me == 0 && n_keep_me == 0);

View file

@ -29,6 +29,7 @@ static int fetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, void **value, l
}
static int callback_was_called = 0;
static int callback2_was_called = 0;
static void checkpoint_callback(void * extra) {
int * x = (int*) extra;
@ -36,6 +37,12 @@ static void checkpoint_callback(void * extra) {
if (verbose) printf("checkpoint_callback called %d (should be 1-16)\n", *x);
}
static void checkpoint_callback2(void * extra) {
int * x = (int*) extra;
(*x)++;
if (verbose) printf("checkpoint_callback2 called %d (should be 1-16)\n", *x);
}
// put n items into the cachetable, maybe mark them dirty, do a checkpoint, and
// verify that all of the items have been written and are clean.
@ -77,9 +84,10 @@ static void cachetable_checkpoint_test(int n, enum cachetable_dirty dirty) {
// all items should be kept in the cachetable
n_flush = n_write_me = n_keep_me = n_fetch = 0;
r = toku_checkpoint(ct, NULL, NULL, checkpoint_callback, &callback_was_called);
r = toku_checkpoint(ct, NULL, NULL, checkpoint_callback, &callback_was_called, checkpoint_callback2, &callback2_was_called);
assert(r == 0);
assert(callback_was_called != 0);
assert(callback_was_called != 0);
assert(callback2_was_called != 0);
assert(n_flush == n && n_write_me == n && n_keep_me == n);
// after the checkpoint, all of the items should be clean
@ -108,7 +116,7 @@ static void cachetable_checkpoint_test(int n, enum cachetable_dirty dirty) {
n_flush = n_write_me = n_keep_me = n_fetch = 0;
r = toku_checkpoint(ct, NULL, NULL, NULL, NULL);
r = toku_checkpoint(ct, NULL, NULL, NULL, NULL, NULL, NULL);
assert(r == 0);
assert(n_flush == 0 && n_write_me == 0 && n_keep_me == 0);

View file

@ -79,7 +79,7 @@ static void cachetable_prefetch_checkpoint_test(int n, enum cachetable_dirty dir
// all items should be kept in the cachetable
n_flush = n_write_me = n_keep_me = n_fetch = 0;
r = toku_checkpoint(ct, NULL, NULL, NULL, NULL);
r = toku_checkpoint(ct, NULL, NULL, NULL, NULL, NULL, NULL);
assert(r == 0);
assert(n_flush == n && n_write_me == n && n_keep_me == n);
@ -108,7 +108,7 @@ static void cachetable_prefetch_checkpoint_test(int n, enum cachetable_dirty dir
// a subsequent checkpoint should cause no flushes, or writes since all of the items are clean
n_flush = n_write_me = n_keep_me = n_fetch = 0;
r = toku_checkpoint(ct, NULL, NULL, NULL, NULL);
r = toku_checkpoint(ct, NULL, NULL, NULL, NULL, NULL, NULL);
assert(r == 0);
assert(n_flush == 0 && n_write_me == 0 && n_keep_me == 0);

View file

@ -12,6 +12,7 @@
db_env_set_func_pwrite;
db_env_set_func_write;
db_env_set_checkpoint_callback;
db_env_set_checkpoint_callback2;
toku_os_get_max_rss;

View file

@ -99,6 +99,7 @@ BDB_DONTRUN_TESTS = \
checkpoint_1 \
checkpoint_stress \
checkpoint_truncate_1 \
checkpoint_callback \
test_txn_nested1 \
test_txn_nested2 \
test_txn_nested3 \

View file

@ -469,13 +469,13 @@ static int toku_env_close(DB_ENV * env, u_int32_t flags) {
if (env->i->logger) {
#if 0
// TODO lock problems (see test_db_remove.c). may want to require an environment.
r0 = toku_checkpoint(env->i->cachetable, env->i->logger, NULL, NULL, NULL);
r0 = toku_checkpoint(env->i->cachetable, env->i->logger, NULL, NULL, NULL, NULL, NULL);
assert(r0 == 0);
#else
// TODO locks?
r0 = toku_cachetable_begin_checkpoint(env->i->cachetable, env->i->logger);
if (r0 == 0)
toku_cachetable_end_checkpoint(env->i->cachetable, env->i->logger, NULL);
toku_cachetable_end_checkpoint(env->i->cachetable, env->i->logger, NULL, NULL, NULL);
assert(r0 == 0);
#endif
r0 = toku_logger_shutdown(env->i->logger); assert(r0 == 0);
@ -726,10 +726,14 @@ static int toku_env_set_verbose(DB_ENV * env, u_int32_t which, int onoff) {
// For test purposes only.
static void (*checkpoint_callback_f)(void*) = NULL;
static void * checkpoint_callback_extra = NULL;
static void (*checkpoint_callback2_f)(void*) = NULL;
static void * checkpoint_callback2_extra = NULL;
static int toku_env_txn_checkpoint(DB_ENV * env, u_int32_t kbyte __attribute__((__unused__)), u_int32_t min __attribute__((__unused__)), u_int32_t flags __attribute__((__unused__))) {
char *error_string = NULL;
int r = toku_checkpoint(env->i->cachetable, env->i->logger, &error_string, checkpoint_callback_f, checkpoint_callback_extra);
int r = toku_checkpoint(env->i->cachetable, env->i->logger, &error_string,
checkpoint_callback_f, checkpoint_callback_extra,
checkpoint_callback2_f, checkpoint_callback2_extra);
if (r) {
env->i->is_panicked = r; // Panicking the whole environment may be overkill, but I'm not sure what else to do.
env->i->panic_string = error_string;
@ -4086,7 +4090,7 @@ void setup_dlmalloc (void) {
}
// For test purposes only.
// With this interface, all checkpoint users get the same callback and the same extra.
// With this interface, all checkpoint users get the same callbacks and the same extras.
void db_env_set_checkpoint_callback (void (*callback_f)(void*), void* extra) {
toku_checkpoint_safe_client_lock();
checkpoint_callback_f = callback_f;
@ -4094,6 +4098,13 @@ void db_env_set_checkpoint_callback (void (*callback_f)(void*), void* extra) {
toku_checkpoint_safe_client_unlock();
//printf("set callback = %p, extra = %p\n", callback_f, extra);
}
void db_env_set_checkpoint_callback2 (void (*callback_f)(void*), void* extra) {
toku_checkpoint_safe_client_lock();
checkpoint_callback2_f = callback_f;
checkpoint_callback2_extra = extra;
toku_checkpoint_safe_client_unlock();
//printf("set callback2 = %p, extra2 = %p\n", callback2_f, extra2);
}
// HACK: To ensure toku_pthread_yield gets included in the .so
// non-static would require a prototype in a header