diff --git a/ft/le-cursor.cc b/ft/le-cursor.cc index 35b29a650b7..c57ab49879e 100644 --- a/ft/le-cursor.cc +++ b/ft/le-cursor.cc @@ -120,8 +120,8 @@ toku_le_cursor_create(LE_CURSOR *le_cursor_result, FT_HANDLE ft_handle, TOKUTXN if (result == 0) { // TODO move the leaf mode to the ft cursor constructor toku_ft_cursor_set_leaf_mode(le_cursor->ft_cursor); - le_cursor->neg_infinity = true; - le_cursor->pos_infinity = false; + le_cursor->neg_infinity = false; + le_cursor->pos_infinity = true; // zero out the fake DB. this is a rare operation so it's not too slow. memset(&le_cursor->fake_db, 0, sizeof(le_cursor->fake_db)); } @@ -147,21 +147,21 @@ void toku_le_cursor_close(LE_CURSOR le_cursor) { int toku_le_cursor_next(LE_CURSOR le_cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) { int result; - if (le_cursor->pos_infinity) { + if (le_cursor->neg_infinity) { result = DB_NOTFOUND; } else { - le_cursor->neg_infinity = false; + le_cursor->pos_infinity = false; // TODO replace this with a non deprecated function. Which? - result = toku_ft_cursor_get(le_cursor->ft_cursor, NULL, getf, getf_v, DB_NEXT); + result = toku_ft_cursor_get(le_cursor->ft_cursor, NULL, getf, getf_v, DB_PREV); if (result == DB_NOTFOUND) { - le_cursor->pos_infinity = true; + le_cursor->neg_infinity = true; } } return result; } bool -toku_le_cursor_is_key_greater(LE_CURSOR le_cursor, const DBT *key) { +toku_le_cursor_is_key_greater_or_equal(LE_CURSOR le_cursor, const DBT *key) { bool result; if (le_cursor->neg_infinity) { result = true; // all keys are greater than -infinity @@ -175,7 +175,7 @@ toku_le_cursor_is_key_greater(LE_CURSOR le_cursor, const DBT *key) { // get the current position from the cursor and compare it to the given key. DBT *cursor_key = &le_cursor->ft_cursor->key; int r = keycompare(&le_cursor->fake_db, cursor_key, key); - if (r < 0) { + if (r <= 0) { result = true; // key is right of the cursor key } else { result = false; // key is at or left of the cursor key @@ -183,3 +183,17 @@ toku_le_cursor_is_key_greater(LE_CURSOR le_cursor, const DBT *key) { } return result; } + +void +toku_le_cursor_update_estimate(LE_CURSOR le_cursor, DBT* estimate) { + // don't handle these edge cases, not worth it. + // estimate stays same + if (le_cursor->pos_infinity || le_cursor->neg_infinity) { + return; + } + DBT *cursor_key = &le_cursor->ft_cursor->key; + estimate->data = toku_xrealloc(estimate->data, cursor_key->size); + memcpy(estimate->data, cursor_key->data, cursor_key->size); + estimate->size = cursor_key->size; + estimate->flags = DB_DBT_REALLOC; +} diff --git a/ft/le-cursor.h b/ft/le-cursor.h index 4ad960762d3..4bd24c47d98 100644 --- a/ft/le-cursor.h +++ b/ft/le-cursor.h @@ -121,6 +121,10 @@ int toku_le_cursor_next(LE_CURSOR le_cursor, FT_GET_CALLBACK_FUNCTION getf, void // The LE_CURSOR position is intialized to -infinity. Any key comparision with -infinity returns true. // When the cursor runs off the right edge of the tree, the LE_CURSOR position is set to +infinity. Any key comparision with +infinity // returns false. -bool toku_le_cursor_is_key_greater(LE_CURSOR le_cursor, const DBT *key); +bool toku_le_cursor_is_key_greater_or_equal(LE_CURSOR le_cursor, const DBT *key); + +// extracts position of le_cursor into estimate. Responsibility of caller to handle +// thread safety. Caller (the indexer), does so by ensuring indexer lock is held +void toku_le_cursor_update_estimate(LE_CURSOR le_cursor, DBT* estimate); #endif diff --git a/ft/tests/le-cursor-provdel.cc b/ft/tests/le-cursor-provdel.cc index c71c49847de..a48d07b7e85 100644 --- a/ft/tests/le-cursor-provdel.cc +++ b/ft/tests/le-cursor-provdel.cc @@ -252,7 +252,7 @@ test_provdel(const char *logdir, const char *fname, int n) { assert(le->keylen == sizeof (int)); int ii; memcpy(&ii, le->u.mvcc.key_xrs, le->keylen); - assert((int) toku_htonl(i) == ii); + assert((int) toku_htonl(n-i-1) == ii); } assert(i == n); diff --git a/ft/tests/le-cursor-right.cc b/ft/tests/le-cursor-right.cc index 3db4cd9d812..6744bfdeeb8 100644 --- a/ft/tests/le-cursor-right.cc +++ b/ft/tests/le-cursor-right.cc @@ -188,9 +188,9 @@ create_populate_tree(const char *logdir, const char *fname, int n) { toku_cachetable_close(&ct); } -// test toku_le_cursor_is_key_greater when the LE_CURSOR is positioned at -infinity +// test toku_le_cursor_is_key_greater when the LE_CURSOR is positioned at +infinity static void -test_neg_infinity(const char *fname, int n) { +test_pos_infinity(const char *fname, int n) { if (verbose) fprintf(stderr, "%s %s %d\n", __FUNCTION__, fname, n); int error; @@ -210,8 +210,8 @@ test_neg_infinity(const char *fname, int n) { int k = toku_htonl(i); DBT key; toku_fill_dbt(&key, &k, sizeof k); - int right = toku_le_cursor_is_key_greater(cursor, &key); - assert(right == true); + int right = toku_le_cursor_is_key_greater_or_equal(cursor, &key); + assert(right == false); } toku_le_cursor_close(cursor); @@ -222,9 +222,9 @@ test_neg_infinity(const char *fname, int n) { toku_cachetable_close(&ct); } -// test toku_le_cursor_is_key_greater when the LE_CURSOR is positioned at +infinity +// test toku_le_cursor_is_key_greater when the LE_CURSOR is positioned at -infinity static void -test_pos_infinity(const char *fname, int n) { +test_neg_infinity(const char *fname, int n) { if (verbose) fprintf(stderr, "%s %s %d\n", __FUNCTION__, fname, n); int error; @@ -246,7 +246,7 @@ test_pos_infinity(const char *fname, int n) { toku_init_dbt(&val); val.flags = DB_DBT_REALLOC; int i; - for (i = 0; ; i++) { + for (i = n-1; ; i--) { error = le_cursor_get_next(cursor, &val); if (error != 0) break; @@ -259,7 +259,7 @@ test_pos_infinity(const char *fname, int n) { assert((int) toku_htonl(i) == ii); } - assert(i == n); + assert(i == -1); toku_destroy_dbt(&key); toku_destroy_dbt(&val); @@ -268,8 +268,8 @@ test_pos_infinity(const char *fname, int n) { int k = toku_htonl(i); DBT key2; toku_fill_dbt(&key2, &k, sizeof k); - int right = toku_le_cursor_is_key_greater(cursor, &key2); - assert(right == false); + int right = toku_le_cursor_is_key_greater_or_equal(cursor, &key2); + assert(right == true); } toku_le_cursor_close(cursor); @@ -315,24 +315,26 @@ test_between(const char *fname, int n) { assert(le->keylen == sizeof (int)); int ii; memcpy(&ii, le->u.mvcc.key_xrs, le->keylen); - assert((int) toku_htonl(i) == ii); + // hot indexer runs in reverse, therefore need + // to check n-i-1 + assert((int) toku_htonl(n-i-1) == ii); - // test that 0 .. i is not right of the cursor + // test 0 .. i-1 for (int j = 0; j <= i; j++) { - int k = toku_htonl(j); + int k = toku_htonl(n-j-1); DBT key2; toku_fill_dbt(&key2, &k, sizeof k); - int right = toku_le_cursor_is_key_greater(cursor, &key2); - assert(right == false); + int right = toku_le_cursor_is_key_greater_or_equal(cursor, &key2); + assert(right == true); } - // test that i+1 .. n is left of the cursor - for (int j = i + 1; j <= n; j++) { - int k = toku_htonl(j); + // test i .. n + for (int j = i+1; j < n; j++) { + int k = toku_htonl(n-j-1); DBT key2; toku_fill_dbt(&key2, &k, sizeof k); - int right = toku_le_cursor_is_key_greater(cursor, &key2); - assert(right == true); + int right = toku_le_cursor_is_key_greater_or_equal(cursor, &key2); + assert(right == false); } } @@ -373,8 +375,8 @@ test_main (int argc , const char *argv[]) { const int n = 10; create_populate_tree(".", "ftfile", n); - test_neg_infinity("ftfile", n); test_pos_infinity("ftfile", n); + test_neg_infinity("ftfile", n); test_between("ftfile", n); return 0; diff --git a/ft/tests/le-cursor-walk.cc b/ft/tests/le-cursor-walk.cc index 01104699eae..c0b7866925f 100644 --- a/ft/tests/le-cursor-walk.cc +++ b/ft/tests/le-cursor-walk.cc @@ -208,10 +208,10 @@ walk_tree(const char *fname, int n) { int i; for (i = 0; ; i++) { - error = TOKUDB_TRY_AGAIN; - while (error == TOKUDB_TRY_AGAIN) { - error = le_cursor_get_next(cursor, &val); - } + error = TOKUDB_TRY_AGAIN; + while (error == TOKUDB_TRY_AGAIN) { + error = le_cursor_get_next(cursor, &val); + } if (error != 0) break; @@ -220,7 +220,7 @@ walk_tree(const char *fname, int n) { assert(le->keylen == sizeof (int)); int ii; memcpy(&ii, le->u.mvcc.key_xrs, le->keylen); - assert((int) toku_htonl(i) == ii); + assert((int) toku_htonl(n-i-1) == ii); } assert(i == n); diff --git a/src/indexer-internal.h b/src/indexer-internal.h index aed95d2dd6d..a6b2270801e 100644 --- a/src/indexer-internal.h +++ b/src/indexer-internal.h @@ -123,8 +123,10 @@ struct ule_prov_info { struct __toku_indexer_internal { DB_ENV *env; - DB_TXN *txn; + DB_TXN *txn; toku_mutex_t indexer_lock; + toku_mutex_t indexer_estimate_lock; + DBT position_estimate; DB *src_db; int N; DB **dest_dbs; /* [N] */ diff --git a/src/indexer.cc b/src/indexer.cc index 55a9271d946..c2b5f78eaed 100644 --- a/src/indexer.cc +++ b/src/indexer.cc @@ -189,6 +189,8 @@ static void free_indexer_resources(DB_INDEXER *indexer) { if ( indexer->i ) { toku_mutex_destroy(&indexer->i->indexer_lock); + toku_mutex_destroy(&indexer->i->indexer_estimate_lock); + toku_destroy_dbt(&indexer->i->position_estimate); if ( indexer->i->lec ) { toku_le_cursor_close(indexer->i->lec); } @@ -222,6 +224,49 @@ toku_indexer_unlock(DB_INDEXER* indexer) { toku_mutex_unlock(&indexer->i->indexer_lock); } +// a shortcut call +// +// a cheap(er) call to see if a key must be inserted +// into the DB. If true, then we know we have to insert. +// If false, then we don't know, and have to check again +// after grabbing the indexer lock +bool +toku_indexer_may_insert(DB_INDEXER* indexer, const DBT* key) { + bool retval = false; + toku_mutex_lock(&indexer->i->indexer_estimate_lock); + // if we have no position estimate, we can't tell, so return false + if (indexer->i->position_estimate.data == NULL) { + retval = false; + } + else { + FT_HANDLE ft_handle = indexer->i->src_db->i->ft_handle; + ft_compare_func keycompare = toku_ft_get_bt_compare(ft_handle); + int r = keycompare( + indexer->i->src_db, + &indexer->i->position_estimate, + key + ); + // if key > position_estimate, then we know the indexer cursor + // is past key, and we can safely say that associated values of + // key must be inserted into the indexer's db + if (r < 0) { + retval = true; + } + else { + retval = false; + } + } + toku_mutex_unlock(&indexer->i->indexer_estimate_lock); + return retval; +} + +void +toku_indexer_update_estimate(DB_INDEXER* indexer) { + toku_mutex_lock(&indexer->i->indexer_estimate_lock); + toku_le_cursor_update_estimate(indexer->i->lec, &indexer->i->position_estimate); + toku_mutex_unlock(&indexer->i->indexer_estimate_lock); +} + // forward declare the test-only wrapper function for undo-do static int test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule); @@ -272,6 +317,8 @@ toku_indexer_create_indexer(DB_ENV *env, indexer->abort = abort_indexer; toku_mutex_init(&indexer->i->indexer_lock, NULL); + toku_mutex_init(&indexer->i->indexer_estimate_lock, NULL); + toku_init_dbt(&indexer->i->position_estimate); // // create and close a dummy loader to get redirection going for the hot indexer @@ -356,8 +403,14 @@ toku_indexer_set_error_callback(DB_INDEXER *indexer, // a key is to the right of the indexer's cursor if it compares // greater than the current le cursor position. bool -toku_indexer_is_key_right_of_le_cursor(DB_INDEXER *indexer, const DBT *key) { - return toku_le_cursor_is_key_greater(indexer->i->lec, key); +toku_indexer_should_insert_key(DB_INDEXER *indexer, const DBT *key) { + // the hot indexer runs from the end to the beginning, it gets the largest keys first + // + // if key is less than indexer's position, then we should NOT insert it because + // the indexer will get to it. If it is greater or equal, that means the indexer + // has already processed the key, and will not get to it, therefore, we need + // to handle it + return toku_le_cursor_is_key_greater_or_equal(indexer->i->lec, key); } // initialize provisional info by allocating enough space to hold provisional diff --git a/src/indexer.h b/src/indexer.h index 46b302676d4..10543598825 100644 --- a/src/indexer.h +++ b/src/indexer.h @@ -97,6 +97,8 @@ PATENT RIGHTS GRANT: void toku_indexer_lock(DB_INDEXER* indexer); void toku_indexer_unlock(DB_INDEXER* indexer); +bool toku_indexer_may_insert(DB_INDEXER* indexer, const DBT* key); +void toku_indexer_update_estimate(DB_INDEXER* indexer); // The indexer populates multiple destination db's from the contents of one source db. // While the indexes are being built by the indexer, the application may continue to @@ -146,7 +148,7 @@ int toku_indexer_set_error_callback(DB_INDEXER *indexer, // Is the key right of the indexer's leaf entry cursor? // Returns true if right of le_cursor // Returns false if left or equal to le_cursor -bool toku_indexer_is_key_right_of_le_cursor(DB_INDEXER *indexer, const DBT *key); +bool toku_indexer_should_insert_key(DB_INDEXER *indexer, const DBT *key); // Get the indexer's source db DB *toku_indexer_get_src_db(DB_INDEXER *indexer); diff --git a/src/ydb_write.cc b/src/ydb_write.cc index 48d71af2066..08db94c1ca5 100644 --- a/src/ydb_write.cc +++ b/src/ydb_write.cc @@ -440,7 +440,7 @@ lookup_src_db(uint32_t num_dbs, DB *db_array[], DB *src_db) { } static int -do_del_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], DB *src_db, const DBT *src_key) { +do_del_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], DB *src_db, const DBT *src_key, bool indexer_shortcut) { int r = 0; TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn; for (uint32_t which_db = 0; r == 0 && which_db < num_dbs; which_db++) { @@ -452,7 +452,7 @@ do_del_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], // indexers cursor. we have to get the src_db from the indexer and find it in the db_array. int do_delete = true; DB_INDEXER *indexer = toku_db_get_indexer(db); - if (indexer) { // if this db is the index under construction + if (indexer && !indexer_shortcut) { // if this db is the index under construction DB *indexer_src_db = toku_indexer_get_src_db(indexer); invariant(indexer_src_db != NULL); const DBT *indexer_src_key; @@ -465,7 +465,8 @@ do_del_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], invariant(keys[which_src_db].size == 1); indexer_src_key = &keys[which_src_db].dbts[0]; } - do_delete = !toku_indexer_is_key_right_of_le_cursor(indexer, indexer_src_key); + do_delete = toku_indexer_should_insert_key(indexer, indexer_src_key); + toku_indexer_update_estimate(indexer); } if (do_delete) { for (uint32_t i = 0; i < keys[which_db].size; i++) { @@ -485,7 +486,9 @@ static int get_indexer_if_exists( uint32_t num_dbs, DB **db_array, - DB_INDEXER** indexerp + DB *src_db, + DB_INDEXER** indexerp, + bool *src_db_is_indexer_src ) { int r = 0; @@ -502,6 +505,13 @@ get_indexer_if_exists( } } if (r == 0) { + if (first_indexer) { + DB* indexer_src_db = toku_indexer_get_src_db(first_indexer); + // we should just make this an invariant + if (src_db == indexer_src_db) { + *src_db_is_indexer_src = true; + } + } *indexerp = first_indexer; } return r; @@ -529,6 +539,9 @@ env_del_multiple( uint32_t lock_flags[num_dbs]; uint32_t remaining_flags[num_dbs]; FT_HANDLE brts[num_dbs]; + bool indexer_lock_taken = false; + bool src_same = false; + bool indexer_shortcut = false; if (!txn) { r = EINVAL; goto cleanup; @@ -539,7 +552,7 @@ env_del_multiple( } HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn); - r = get_indexer_if_exists(num_dbs, db_array, &indexer); + r = get_indexer_if_exists(num_dbs, db_array, src_db, &indexer, &src_same); if (r) { goto cleanup; } @@ -584,13 +597,23 @@ env_del_multiple( } if (indexer) { - toku_indexer_lock(indexer); + // do a cheap check + if (src_same) { + bool may_insert = toku_indexer_may_insert(indexer, src_key); + if (!may_insert) { + toku_indexer_lock(indexer); + indexer_lock_taken = true; + } + else { + indexer_shortcut = true; + } + } } toku_multi_operation_client_lock(); log_del_multiple(txn, src_db, src_key, src_val, num_dbs, brts, del_keys); - r = do_del_multiple(txn, num_dbs, db_array, del_keys, src_db, src_key); + r = do_del_multiple(txn, num_dbs, db_array, del_keys, src_db, src_key, indexer_shortcut); toku_multi_operation_client_unlock(); - if (indexer) { + if (indexer_lock_taken) { toku_indexer_unlock(indexer); } @@ -612,7 +635,7 @@ log_put_multiple(DB_TXN *txn, DB *src_db, const DBT *src_key, const DBT *src_val } static int -do_put_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], DBT_ARRAY vals[], DB *src_db, const DBT *src_key) { +do_put_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], DBT_ARRAY vals[], DB *src_db, const DBT *src_key, bool indexer_shortcut) { TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn; for (uint32_t which_db = 0; which_db < num_dbs; which_db++) { DB *db = db_array[which_db]; @@ -624,7 +647,7 @@ do_put_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], if (keys[which_db].size > 0) { bool do_put = true; DB_INDEXER *indexer = toku_db_get_indexer(db); - if (indexer) { // if this db is the index under construction + if (indexer && !indexer_shortcut) { // if this db is the index under construction DB *indexer_src_db = toku_indexer_get_src_db(indexer); invariant(indexer_src_db != NULL); const DBT *indexer_src_key; @@ -637,7 +660,8 @@ do_put_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], invariant(keys[which_src_db].size == 1); indexer_src_key = &keys[which_src_db].dbts[0]; } - do_put = !toku_indexer_is_key_right_of_le_cursor(indexer, indexer_src_key); + do_put = toku_indexer_should_insert_key(indexer, indexer_src_key); + toku_indexer_update_estimate(indexer); } if (do_put) { for (uint32_t i = 0; i < keys[which_db].size; i++) { @@ -677,6 +701,9 @@ env_put_multiple_internal( uint32_t lock_flags[num_dbs]; uint32_t remaining_flags[num_dbs]; FT_HANDLE brts[num_dbs]; + bool indexer_shortcut = false; + bool indexer_lock_taken = false; + bool src_same = false; if (!txn || !num_dbs) { r = EINVAL; @@ -688,7 +715,7 @@ env_put_multiple_internal( } HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn); - r = get_indexer_if_exists(num_dbs, db_array, &indexer); + r = get_indexer_if_exists(num_dbs, db_array, src_db, &indexer, &src_same); if (r) { goto cleanup; } @@ -749,13 +776,23 @@ env_put_multiple_internal( } if (indexer) { - toku_indexer_lock(indexer); + // do a cheap check + if (src_same) { + bool may_insert = toku_indexer_may_insert(indexer, src_key); + if (!may_insert) { + toku_indexer_lock(indexer); + indexer_lock_taken = true; + } + else { + indexer_shortcut = true; + } + } } toku_multi_operation_client_lock(); log_put_multiple(txn, src_db, src_key, src_val, num_dbs, brts); - r = do_put_multiple(txn, num_dbs, db_array, put_keys, put_vals, src_db, src_key); + r = do_put_multiple(txn, num_dbs, db_array, put_keys, put_vals, src_db, src_key, indexer_shortcut); toku_multi_operation_client_unlock(); - if (indexer) { + if (indexer_lock_taken) { toku_indexer_unlock(indexer); } @@ -787,6 +824,9 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, HANDLE_PANICKED_ENV(env); DB_INDEXER* indexer = NULL; + bool indexer_shortcut = false; + bool indexer_lock_taken = false; + bool src_same = false; HANDLE_READ_ONLY_TXN(txn); DBT_ARRAY old_key_arrays[num_dbs]; DBT_ARRAY new_key_arrays[num_dbs]; @@ -806,7 +846,7 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, } HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn); - r = get_indexer_if_exists(num_dbs, db_array, &indexer); + r = get_indexer_if_exists(num_dbs, db_array, src_db, &indexer, &src_same); if (r) { goto cleanup; } @@ -1008,12 +1048,24 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, } } if (indexer) { - toku_indexer_lock(indexer); + // do a cheap check + if (src_same) { + bool may_insert = + toku_indexer_may_insert(indexer, old_src_key) && + toku_indexer_may_insert(indexer, new_src_key); + if (!may_insert) { + toku_indexer_lock(indexer); + indexer_lock_taken = true; + } + else { + indexer_shortcut = true; + } + } } toku_multi_operation_client_lock(); if (r == 0 && n_del_dbs > 0) { log_del_multiple(txn, src_db, old_src_key, old_src_data, n_del_dbs, del_fts, del_key_arrays); - r = do_del_multiple(txn, n_del_dbs, del_dbs, del_key_arrays, src_db, old_src_key); + r = do_del_multiple(txn, n_del_dbs, del_dbs, del_key_arrays, src_db, old_src_key, indexer_shortcut); } if (r == 0 && n_put_dbs > 0) { @@ -1022,10 +1074,10 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, // recovery so we don't end up losing data. // So unlike env->put_multiple, we ONLY log a 'put_multiple' log entry. log_put_multiple(txn, src_db, new_src_key, new_src_data, n_put_dbs, put_fts); - r = do_put_multiple(txn, n_put_dbs, put_dbs, put_key_arrays, put_val_arrays, src_db, new_src_key); + r = do_put_multiple(txn, n_put_dbs, put_dbs, put_key_arrays, put_val_arrays, src_db, new_src_key, indexer_shortcut); } toku_multi_operation_client_unlock(); - if (indexer) { + if (indexer_lock_taken) { toku_indexer_unlock(indexer); } }