#ident "$Id: cachetable-pin-checkpoint.c 38124 2011-12-22 19:45:52Z bkuszmaul $"
#ident "Copyright (c) 2007-2011 Tokutek Inc.  All rights reserved."
#include "includes.h"
#include "test.h"


//
// This test ensures that get_and_pin with dependent nodes works
// as intended with checkpoints, by having multiple threads changing
// values on elements in data, and ensure that checkpoints always get snapshots 
// such that the sum of all the elements in data are 0.
//

// The arrays

// must be power of 2 minus 1
#define NUM_ELEMENTS 127
// must be (NUM_ELEMENTS +1)/2 - 1
#define NUM_INTERNAL 63
#define NUM_MOVER_THREADS 4

int64_t data[NUM_ELEMENTS];
int64_t checkpointed_data[NUM_ELEMENTS];

u_int32_t time_of_test;
BOOL run_test;


static void
flush (CACHEFILE f __attribute__((__unused__)),
       int UU(fd),
       CACHEKEY k  __attribute__((__unused__)),
       void *v     __attribute__((__unused__)),
       void *e     __attribute__((__unused__)),
       PAIR_ATTR s      __attribute__((__unused__)),
       PAIR_ATTR* new_size      __attribute__((__unused__)),
       BOOL write_me,
       BOOL keep_me,
       BOOL checkpoint_me
       ) {
    int64_t val_to_write = *(int64_t *)v;
    size_t data_index = (size_t)k.b;
    if (write_me) {
        usleep(10);
        data[data_index] = val_to_write;
        if (checkpoint_me) checkpointed_data[data_index] = val_to_write;
    }
    if (!keep_me) {
        toku_free(v);
    }
}

static int
fetch (CACHEFILE f        __attribute__((__unused__)),
       int UU(fd),
       CACHEKEY k,
       u_int32_t fullhash __attribute__((__unused__)),
       void **value,
       PAIR_ATTR *sizep,
       int  *dirtyp,
       void *extraargs    __attribute__((__unused__))
       ) {
    *dirtyp = 0;
    size_t data_index = (size_t)k.b;
    // assert that data_index is valid
    // if it is INT64_MAX, then that means
    // the block is not supposed to be in the cachetable
    assert(data[data_index] != INT64_MAX);
    
    int64_t* data_val = toku_malloc(sizeof(int64_t));
    usleep(10);
    *data_val = data[data_index];
    *value = data_val;
    *sizep = make_pair_attr(8);
    return 0;
}

static void *test_time(void *arg) {
    //
    // if num_Seconds is set to 0, run indefinitely
    //
    if (time_of_test != 0) {
        usleep(time_of_test*1000*1000);
        if (verbose) printf("should now end test\n");
        run_test = FALSE;
    }
    if (verbose) printf("should be ending test now\n");
    return arg;
}

CACHETABLE ct;
CACHEFILE f1;

static void move_number_to_child(
    int parent, 
    int64_t* parent_val, 
    enum cachetable_dirty parent_dirty
    ) 
{
    int child = 0;
    int r;
    child = ((random() % 2) == 0) ? (2*parent + 1) : (2*parent + 2); 
    
    void* v1;
    long s1;
    CACHEKEY parent_key;
    parent_key.b = parent;
    u_int32_t parent_fullhash = toku_cachetable_hash(f1, parent_key);

    
    CACHEKEY child_key;
    child_key.b = child;
    u_int32_t child_fullhash = toku_cachetable_hash(f1, child_key);
    r = toku_cachetable_get_and_pin_with_dep_pairs(
        f1,
        child_key,
        child_fullhash,
        &v1,
        &s1,
        flush, fetch, def_pe_est_callback, def_pe_callback, def_pf_req_callback, def_pf_callback, def_cleaner_callback,
        NULL,
        NULL,
        1, //num_dependent_pairs
        &f1,
        &parent_key,
        &parent_fullhash,
        &parent_dirty
        );
    assert(r==0);
    
    int64_t* child_val = (int64_t *)v1;
    assert(child_val != parent_val); // sanity check that we are messing with different vals
    assert(*parent_val != INT64_MAX);
    assert(*child_val != INT64_MAX);
    usleep(10);
    (*parent_val)++;
    (*child_val)--;
    r = toku_cachetable_unpin(f1, parent_key, parent_fullhash, CACHETABLE_DIRTY, make_pair_attr(8));
    assert_zero(r);
    if (child < NUM_INTERNAL) {
        move_number_to_child(child, child_val, CACHETABLE_DIRTY);
    }
    else {
        r = toku_cachetable_unpin(f1, child_key, child_fullhash, CACHETABLE_DIRTY, make_pair_attr(8));
        assert_zero(r);
    }
}

static void *move_numbers(void *arg) {
    while (run_test) {
        int parent = 0;
        int r;
        void* v1;
        long s1;
        CACHEKEY parent_key;
        parent_key.b = parent;
        u_int32_t parent_fullhash = toku_cachetable_hash(f1, parent_key);
        r = toku_cachetable_get_and_pin_with_dep_pairs(
            f1,
            parent_key,
            parent_fullhash,
            &v1,
            &s1,
            flush, fetch, def_pe_est_callback, def_pe_callback, def_pf_req_callback, def_pf_callback, def_cleaner_callback,
            NULL,
            NULL,
            0, //num_dependent_pairs
            NULL,
            NULL,
            NULL,
            NULL
            );
        assert(r==0);
        int64_t* parent_val = (int64_t *)v1;
        move_number_to_child(parent, parent_val, CACHETABLE_CLEAN);
    }
    return arg;
}

static void remove_data(CACHEKEY* cachekey, BOOL for_checkpoint, void* UU(extra)) {
    assert(cachekey->b < NUM_ELEMENTS);
    data[cachekey->b] = INT64_MAX;
    if (for_checkpoint) {
        checkpointed_data[cachekey->b] = INT64_MAX;
    }
}


static void get_data(CACHEKEY* cachekey, u_int32_t* fullhash, void* extra) {
    int* key = extra;
    cachekey->b = *key;
    *fullhash = toku_cachetable_hash(f1, *cachekey);
    data[*key] = INT64_MAX - 1;
}

static void merge_and_split_child(
    int parent, 
    int64_t* parent_val, 
    enum cachetable_dirty parent_dirty
    ) 
{
    int child = 0;
    int other_child = 0;
    int r;
    BOOL even = (random() % 2) == 0;
    child = (even) ? (2*parent + 1) : (2*parent + 2); 
    other_child = (!even) ? (2*parent + 1) : (2*parent + 2);
    assert(child != other_child);
    
    void* v1;
    long s1;

    CACHEKEY parent_key;
    parent_key.b = parent;
    u_int32_t parent_fullhash = toku_cachetable_hash(f1, parent_key);

    CACHEKEY child_key;
    child_key.b = child;
    u_int32_t child_fullhash = toku_cachetable_hash(f1, child_key);
    enum cachetable_dirty child_dirty = CACHETABLE_CLEAN;
    r = toku_cachetable_get_and_pin_with_dep_pairs(
        f1,
        child_key,
        child_fullhash,
        &v1,
        &s1,
        flush, fetch, def_pe_est_callback, def_pe_callback, def_pf_req_callback, def_pf_callback, def_cleaner_callback,
        NULL,
        NULL,
        1, //num_dependent_pairs
        &f1,
        &parent_key,
        &parent_fullhash,
        &parent_dirty
        );
    assert(r==0);
    int64_t* child_val = (int64_t *)v1;
    
    CACHEKEY other_child_key;
    other_child_key.b = other_child;
    u_int32_t other_child_fullhash = toku_cachetable_hash(f1, other_child_key);
    CACHEFILE cfs[2];
    cfs[0] = f1;
    cfs[1] = f1;
    CACHEKEY keys[2];
    keys[0] = parent_key;
    keys[1] = child_key;
    u_int32_t hashes[2];
    hashes[0] = parent_fullhash;
    hashes[1] = child_fullhash;
    enum cachetable_dirty dirties[2];
    dirties[0] = parent_dirty;
    dirties[1] = child_dirty;
    
    r = toku_cachetable_get_and_pin_with_dep_pairs(
        f1,
        other_child_key,
        other_child_fullhash,
        &v1,
        &s1,
        flush, fetch, def_pe_est_callback, def_pe_callback, def_pf_req_callback, def_pf_callback, def_cleaner_callback,
        NULL,
        NULL,
        2, //num_dependent_pairs
        cfs,
        keys,
        hashes,
        dirties
        );
    assert(r==0);
    int64_t* other_child_val = (int64_t *)v1;
    assert(*parent_val != INT64_MAX);
    assert(*child_val != INT64_MAX);
    assert(*other_child_val != INT64_MAX);
    
    // lets get rid of other_child_val with a merge
    *child_val += *other_child_val;
    *other_child_val = INT64_MAX;        
    toku_cachetable_unpin_and_remove(f1, other_child_key, remove_data, NULL);
    dirties[1] = CACHETABLE_DIRTY;
    child_dirty = CACHETABLE_DIRTY;
    
    // now do a split
    CACHEKEY new_key;
    u_int32_t new_fullhash;
    int64_t* data_val = toku_malloc(sizeof(int64_t));
    r = toku_cachetable_put_with_dep_pairs(
          f1,
          get_data,
          data_val,
          make_pair_attr(8),
          flush, def_pe_est_callback, def_pe_callback, def_cleaner_callback,
          NULL, // parameter for flush_callback, pe_est_callback, pe_callback, and cleaner_callback
          &other_child,
          2, // number of dependent pairs that we may need to checkpoint
          cfs,
          keys,
          hashes,
          dirties,
          &new_key,
          &new_fullhash
          );
    assert(new_key.b == other_child);
    assert(new_fullhash == other_child_fullhash);
    *data_val = 5000;
    *child_val -= 5000;
    
    r = toku_cachetable_unpin(f1, parent_key, parent_fullhash, CACHETABLE_DIRTY, make_pair_attr(8));
    assert_zero(r);
    r = toku_cachetable_unpin(f1, other_child_key, other_child_fullhash, CACHETABLE_DIRTY, make_pair_attr(8));
    assert_zero(r);
    if (child < NUM_INTERNAL) {
        merge_and_split_child(child, child_val, CACHETABLE_DIRTY);
    }
    else {
        r = toku_cachetable_unpin(f1, child_key, child_fullhash, CACHETABLE_DIRTY, make_pair_attr(8));
        assert_zero(r);
    }
}

static void *merge_and_split(void *arg) {
    while (run_test) {
        int parent = 0;
        int r;        
        void* v1;
        long s1;
        CACHEKEY parent_key;
        parent_key.b = parent;
        u_int32_t parent_fullhash = toku_cachetable_hash(f1, parent_key);
        r = toku_cachetable_get_and_pin_with_dep_pairs(
            f1,
            parent_key,
            parent_fullhash,
            &v1,
            &s1,
            flush, fetch, def_pe_est_callback, def_pe_callback, def_pf_req_callback, def_pf_callback, def_cleaner_callback,
            NULL,
            NULL,
            0, //num_dependent_pairs
            NULL,
            NULL,
            NULL,
            NULL
            );
        assert(r==0);
        int64_t* parent_val = (int64_t *)v1;
        merge_and_split_child(parent, parent_val, CACHETABLE_CLEAN);
    }
    return arg;
}

static int num_checkpoints = 0;
static void *checkpoints(void *arg) {
    // first verify that checkpointed_data is correct;
    while(run_test) {
        int64_t sum = 0;
        for (int i = 0; i < NUM_ELEMENTS; i++) {
            if (checkpointed_data[i] != INT64_MAX) {
                sum += checkpointed_data[i];
            }
        }
        assert (sum==0);
    
        //
        // now run a checkpoint
        //
        int r;
        
        r = toku_cachetable_begin_checkpoint(ct, NULL); assert(r == 0);    
        r = toku_cachetable_end_checkpoint(
            ct, 
            NULL, 
            fake_ydb_lock,
            fake_ydb_unlock,
            NULL,
            NULL
            );
        assert(r==0);
        assert (sum==0);
        for (int i = 0; i < NUM_ELEMENTS; i++) {
            if (checkpointed_data[i] != INT64_MAX) {
                sum += checkpointed_data[i];
            }
        }
        assert (sum==0);
        num_checkpoints++;
    }
    return arg;
}

static int
test_begin_checkpoint (
    LSN UU(checkpoint_lsn), 
    void* UU(header_v)) 
{
    memcpy(checkpointed_data, data, sizeof(int64_t)*NUM_ELEMENTS);
    return 0;
}

static int
dummy_int_checkpoint_userdata(CACHEFILE UU(cf), int UU(n), void* UU(extra)) {
    return 0;
}

static void sum_vals(void) {
    int64_t sum = 0;
    for (int i = 0; i < NUM_ELEMENTS; i++) {
        //printf("actual: i %d val %"PRId64" \n", i, data[i]);
        if (data[i] != INT64_MAX) {
            sum += data[i];
        }
    }
    if (verbose) printf("actual sum %"PRId64" \n", sum);
    assert(sum == 0);
    sum = 0;
    for (int i = 0; i < NUM_ELEMENTS; i++) {
        //printf("checkpointed: i %d val %"PRId64" \n", i, checkpointed_data[i]);
        if (checkpointed_data[i] != INT64_MAX) {
            sum += checkpointed_data[i];
        }
    }
    if (verbose) printf("checkpointed sum %"PRId64" \n", sum);
    assert(sum == 0);
}

static void
cachetable_test (void) {
    const int test_limit = NUM_ELEMENTS;

    //
    // let's set up the data
    //
    for (int64_t i = 0; i < NUM_ELEMENTS; i++) {
        data[i] = 0;
        checkpointed_data[i] = 0;
    }
    time_of_test = 60;

    int r;
    
    r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
    char fname1[] = __FILE__ "test-put-checkpoint.dat";
    unlink(fname1);
    r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
    
    toku_cachefile_set_userdata(
        f1, 
        NULL, 
        NULL, 
        NULL, 
        NULL, 
        dummy_int_checkpoint_userdata, 
        test_begin_checkpoint, // called in begin_checkpoint
        dummy_int_checkpoint_userdata,
        NULL, 
        NULL
        );
    
    toku_pthread_t time_tid;
    toku_pthread_t checkpoint_tid;
    toku_pthread_t move_tid[NUM_MOVER_THREADS];
    toku_pthread_t merge_and_split_tid[NUM_MOVER_THREADS];
    run_test = TRUE;

    for (int i = 0; i < NUM_MOVER_THREADS; i++) {
        r = toku_pthread_create(&move_tid[i], NULL, move_numbers, NULL); 
        assert_zero(r);
    }
    for (int i = 0; i < NUM_MOVER_THREADS; i++) {
        r = toku_pthread_create(&merge_and_split_tid[i], NULL, merge_and_split, NULL); 
        assert_zero(r);
    }
    r = toku_pthread_create(&checkpoint_tid, NULL, checkpoints, NULL); 
    assert_zero(r);    
    r = toku_pthread_create(&time_tid, NULL, test_time, NULL); 
    assert_zero(r);

    
    void *ret;
    r = toku_pthread_join(time_tid, &ret); 
    assert_zero(r);
    r = toku_pthread_join(checkpoint_tid, &ret); 
    assert_zero(r);
    for (int i = 0; i < NUM_MOVER_THREADS; i++) {
        r = toku_pthread_join(merge_and_split_tid[i], &ret); 
        assert_zero(r);
    }
    for (int i = 0; i < NUM_MOVER_THREADS; i++) {
        r = toku_pthread_join(move_tid[i], &ret); 
        assert_zero(r);
    }

    toku_cachetable_verify(ct);
    r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0 && f1 == 0);
    r = toku_cachetable_close(&ct); lazy_assert_zero(r);
    
    sum_vals();
    if (verbose) printf("num_checkpoints %d\n", num_checkpoints);
    
}

int
test_main(int argc, const char *argv[]) {
  default_parse_args(argc, argv);
  cachetable_test();
  return 0;
}