diff --git a/src/ydb.c b/src/ydb.c index 2c6a923a1a4..bd9baf714f6 100644 --- a/src/ydb.c +++ b/src/ydb.c @@ -91,6 +91,7 @@ static int toku_db_cursor(DB *db, DB_TXN * txn, DBC **c, u_int32_t flags, int is /* lightweight cursor methods. */ static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra); +static int toku_c_getf_next_dup(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra); static int toku_c_getf_heavi(DBC *c, u_int32_t flags, void(*f)(DBT const *key, DBT const *value, void *extra_f, int r_h), @@ -1765,6 +1766,10 @@ static int locked_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, D toku_ydb_lock(); int r = toku_c_getf_next(c, flag, f, extra); toku_ydb_unlock(); return r; } +static int locked_c_getf_next_dup(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) { + toku_ydb_lock(); int r = toku_c_getf_next_dup(c, flag, f, extra); toku_ydb_unlock(); return r; +} + static int toku_c_getf_next_old(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) { DBT key,val; memset(&key, 0, sizeof(key)); @@ -1823,6 +1828,46 @@ static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT return r; } +static int toku_c_getf_next_dup(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) { + HANDLE_PANICKED_DB(c->dbp); + if (toku_c_uninitialized(c)) return EINVAL; + u_int32_t lock_flags = get_prelocked_flags(flag); + flag &= ~lock_flags; + assert(flag==0); + TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL; + const DBT *pkey, *pval; + pkey = pval = toku_lt_infinity; + int r; + + DB *db=c->dbp; + toku_lock_tree* lt = db->i->lt; + BOOL do_locking = lt!=NULL && !lock_flags; + + int c_get_result = toku_brt_cursor_get(c->i->c, NULL, NULL, DB_NEXT_DUP, txn); + if (c_get_result!=0 && c_get_result!=DB_NOTFOUND) { r = c_get_result; goto cleanup; } + int found = c_get_result==0; + if (found) brt_cursor_peek_current(c->i->c, &pkey, &pval); + if (do_locking) { + + DBT *prevkey = found ? brt_cursor_peek_prev_key(c->i->c) : brt_cursor_peek_current_key(c->i->c); + DBT *prevval = found ? brt_cursor_peek_prev_val(c->i->c) : brt_cursor_peek_current_key(c->i->c); + + DB_TXN *txn_anc = toku_txn_ancestor(c->i->txn); + r = toku_txn_add_lt(txn_anc, lt); + if (r!=0) goto cleanup; + r = toku_lt_acquire_range_read_lock(lt, db, toku_txn_get_txnid(txn_anc->i->tokutxn), + prevkey, prevval, + prevkey, pval); + if (r!=0) goto cleanup; + } + if (found) { + f(pkey, pval, extra); + } + r = c_get_result; + cleanup: + return r; +} + static int locked_c_getf_heavi(DBC *c, u_int32_t flags, void(*f)(DBT const *key, DBT const *value, void *extra_f, int r_h), void *extra_f, @@ -2258,6 +2303,7 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int SCRS(c_del); SCRS(c_count); SCRS(c_getf_next); + SCRS(c_getf_next_dup); SCRS(c_getf_heavi); #undef SCRS MALLOC(result->i);