diff --git a/db-benchmark-test/scanscan.c b/db-benchmark-test/scanscan.c index af8f6c05aa2..77b1567c04c 100644 --- a/db-benchmark-test/scanscan.c +++ b/db-benchmark-test/scanscan.c @@ -9,7 +9,8 @@ #include const char *pname; -int verify_lwc=0, lwc=0, hwc=1, prelock=0; +int verify_lwc=0, lwc=0, hwc=1, prelock=0, prelockflag=0; +u_int32_t lock_flag = DB_PRELOCKED; void parse_args (int argc, const char *argv[]) { @@ -21,8 +22,10 @@ void parse_args (int argc, const char *argv[]) { else if (strcmp(*argv, "--lwc")==0) lwc=1; else if (strcmp(*argv, "--nohwc")==0) hwc=0; else if (strcmp(*argv, "--prelock")==0) prelock=1; + else if (strcmp(*argv, "--prelockflag")==0) prelockflag=1; + else if (strcmp(*argv, "--prelockwriteflag")==0) { prelockflag=1; lock_flag = DB_PRELOCKED_WRITE; } else { - printf("Usage:\n%s [--verify-lwc] [--lwc] [--nohwc]\n", pname); + printf("Usage:\n%s [--verify-lwc] [--lwc] [--nohwc] [--prelock] [--prelockflag] [--prelockwriteflag]\n", pname); exit(1); } argc--; @@ -97,7 +100,11 @@ void scanscan (void) { r = db->cursor(db, tid, &dbc, 0); assert(r==0); memset(&k, 0, sizeof(k)); memset(&v, 0, sizeof(v)); - while (0 == (r = dbc->c_get(dbc, &k, &v, DB_NEXT))) { + u_int32_t c_get_flags = DB_NEXT; + if (prelockflag && (counter || prelock)) { + c_get_flags |= lock_flag; + } + while (0 == (r = dbc->c_get(dbc, &k, &v, c_get_flags))) { totalbytes += k.size + v.size; rowcounter++; } @@ -126,7 +133,11 @@ void scanscan_lwc (void) { double prevtime = gettime(); DBC *dbc; r = db->cursor(db, tid, &dbc, 0); assert(r==0); - while (0 == (r = dbc->c_getf_next(dbc, 0, counttotalbytes, &e))); + u_int32_t f_flags = 0; + if (prelockflag && (counter || prelock)) { + f_flags |= lock_flag; + } + while (0 == (r = dbc->c_getf_next(dbc, f_flags, counttotalbytes, &e))); r = dbc->c_close(dbc); assert(r==0); double thistime = gettime(); double tdiff = thistime-prevtime; @@ -165,10 +176,16 @@ void scanscan_verify (void) { r = db->cursor(db, tid, &dbc2, 0); assert(r==0); memset(&v.k, 0, sizeof(v.k)); memset(&v.v, 0, sizeof(v.v)); + u_int32_t f_flags = 0; + u_int32_t c_get_flags = DB_NEXT; + if (prelockflag && (counter || prelock)) { + f_flags |= lock_flag; + c_get_flags |= lock_flag; + } while (1) { int r1,r2; - r2 = dbc1->c_get(dbc1, &v.k, &v.v, DB_NEXT); - r1 = dbc2->c_getf_next(dbc2, 0, checkbytes, &v); + r2 = dbc1->c_get(dbc1, &v.k, &v.v, c_get_flags); + r1 = dbc2->c_getf_next(dbc2, f_flags, checkbytes, &v); assert(r1==r2); if (r1) break; } diff --git a/src/ydb.c b/src/ydb.c index 5ed92822cb9..af1e538b358 100644 --- a/src/ydb.c +++ b/src/ydb.c @@ -1055,17 +1055,7 @@ static int verify_secondary_key(DB *secondary, DBT *pkey, DBT *data, DBT *skey) //Get the main portion of a cursor flag (excluding the bitwise or'd components). static int get_main_cursor_flag(u_int32_t flag) { -#ifdef DB_READ_UNCOMMITTED - flag &= ~DB_READ_UNCOMMITTED; -#endif -#ifdef DB_MULTIPLE - flag &= ~DB_MULTIPLE; -#endif -#ifdef DB_MULTIPLE_KEY - flag &= ~DB_MULTIPLE_KEY; -#endif - flag &= ~DB_RMW; - return flag; + return flag & DB_OPFLAGS_MASK; } static inline BOOL toku_c_uninitialized(DBC* c) { @@ -1118,6 +1108,7 @@ typedef struct { DBT tmp_dat; // Temporary data val to protect out param u_int32_t flag; // The c_get flag u_int32_t op; // The operation portion of the c_get flag + u_int32_t lock_flags; // The prelock flags. BOOL cursor_is_write; // Whether op can change position of cursor BOOL cursor_was_saved; // Whether we saved the cursor yet. BOOL key_is_read; @@ -1133,6 +1124,10 @@ typedef struct { BOOL tmp_dat_malloced; } C_GET_VARS; +static inline u_int32_t get_prelocked_flags(u_int32_t flags) { + return flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE); +} + static void toku_c_get_fix_flags(C_GET_VARS* g) { g->op = get_main_cursor_flag(g->flag); @@ -1159,6 +1154,8 @@ static void toku_c_get_fix_flags(C_GET_VARS* g) { default: break; } + g->lock_flags = get_prelocked_flags(g->flag); + g->flag &= ~g->lock_flags; } static inline void toku_c_pget_fix_flags(C_GET_VARS* g) { @@ -1168,7 +1165,6 @@ static inline void toku_c_pget_fix_flags(C_GET_VARS* g) { static int toku_c_get_pre_acquire_lock_if_possible(C_GET_VARS* g, DBT* key, DBT* data) { int r = ENOSYS; toku_lock_tree* lt = g->db->i->lt; - if (!lt) { r = 0; goto cleanup; } /* We know what to lock ahead of time. */ if (g->op == DB_GET_BOTH || @@ -1476,6 +1472,13 @@ static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * val, u_int32_t flag) /* Standardize the op flag. */ toku_c_get_fix_flags(&g); + TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL; + + if (!g.db->i->lt || g.lock_flags) { + r = toku_brt_cursor_get(c->i->c, key, val, g.flag, txn); + goto cleanup; + } + /* If we know what to lock before the cursor op, lock now. */ if ((r = toku_c_get_pre_acquire_lock_if_possible(&g, key, val))) goto cleanup; /* Determine whether the key and val parameters are read, write, @@ -1486,7 +1489,6 @@ static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * val, u_int32_t flag) /* Save key and value to temporary local versions. */ if ((r = toku_c_get_save_inputs(&g, key, val))) goto cleanup; /* Run the cursor operation on the brt. */ - TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL; r_cursor_op = r = toku_brt_cursor_get(c->i->c, &g.tmp_key, &g.tmp_val, g.flag, txn); /* Only DB_CURRENT should possibly retun DB_KEYEMPTY, * and DB_CURRENT requires no locking. */ @@ -1528,8 +1530,11 @@ static int toku_c_del_noassociate(DBC * c, u_int32_t flags) { HANDLE_PANICKED_DB(db); if (toku_c_uninitialized(c)) return EINVAL; + u_int32_t lock_flags = get_prelocked_flags(flags); + flags &= ~lock_flags; + int r; - if (db->i->lt) { + if (db->i->lt && !(lock_flags&DB_PRELOCKED_WRITE)) { DBT saved_key; DBT saved_data; r = toku_c_get_current_unconditional(c, &saved_key, &saved_data); @@ -1606,7 +1611,7 @@ delete_silently_and_retry: /* Save the inputs. */ if ((r = toku_c_pget_save_inputs(&g, key, pkey, data))) goto cleanup; - if ((r = toku_c_get_noassociate(c, &g.tmp_key, &g.tmp_val, flag))) goto cleanup; + if ((r = toku_c_get_noassociate(c, &g.tmp_key, &g.tmp_val, g.flag))) goto cleanup; r = toku_db_get(pdb, c->i->txn, &g.tmp_val, &g.tmp_dat, 0); if (r==DB_NOTFOUND) goto delete_silently_and_retry; @@ -1673,9 +1678,11 @@ static int toku_c_getf_next_old(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT key,val; memset(&key, 0, sizeof(key)); memset(&val, 0, sizeof(val)); - flag &= ~DB_PRELOCKED; // Get rid of the prelock flag, because c_get doesn't know about it. + + u_int32_t lock_flags = get_prelocked_flags(flag); + flag &= ~lock_flags; assert(flag==0); - int r = toku_c_get_noassociate(c, &key, &val, DB_NEXT | flag); + int r = toku_c_get_noassociate(c, &key, &val, DB_NEXT | flag | lock_flags); if (r==0) f(&key, &val, extra); return r; } @@ -1683,8 +1690,9 @@ static int toku_c_getf_next_old(DBC *c, u_int32_t flag, void(*f)(DBT const *key, static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) { HANDLE_PANICKED_DB(c->dbp); //int prelocked = flag & DB_PRELOCKED; - flag &= ~DB_PRELOCKED; // Get rid of the prelock flag, because c_get doesn't know about it. if (toku_c_uninitialized(c)) return toku_c_getf_next_old(c, flag, f, extra); //return toku_c_getf_first(c, flag, f, extra); + u_int32_t lock_flags = get_prelocked_flags(flag); + flag &= ~lock_flags; assert(flag==0); TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL; DBT key,val; @@ -1693,35 +1701,42 @@ static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT DBT prevkey,prevval; memset(&prevkey, 0, sizeof(key)); prevkey.flags = DB_DBT_MALLOC; memset(&prevval, 0, sizeof(val)); prevval.flags = DB_DBT_MALLOC; - int r = brt_cursor_save_key_val(c->i->c, &prevkey, &prevval); - if (r!=0) goto cleanup; + int r; DB *db=c->dbp; + toku_lock_tree* lt = db->i->lt; + BOOL do_locking = lt!=NULL && !lock_flags; + if (do_locking) { + r = brt_cursor_save_key_val(c->i->c, &prevkey, &prevval); + if (r!=0) goto cleanup; + } + unsigned int brtflags; toku_brt_get_flags(db->i->brt, &brtflags); int c_get_result = toku_brt_cursor_get(c->i->c, &key, &val, - (brtflags & TOKU_DB_DUPSORT) ? DB_NEXT : DB_NEXT_NODUP, - txn); + (brtflags & TOKU_DB_DUPSORT) ? DB_NEXT : DB_NEXT_NODUP, + txn); if (c_get_result!=0 && c_get_result!=DB_NOTFOUND) { r = c_get_result; goto cleanup; } int found = c_get_result==0; - toku_lock_tree* lt = db->i->lt; - DB_TXN *txn_anc = toku_txn_ancestor(c->i->txn); - r = toku_txn_add_lt(txn_anc, lt); - if (r!=0) goto cleanup; - r = toku_lt_acquire_range_read_lock(lt, db, toku_txn_get_txnid(txn_anc->i->tokutxn), - &prevkey, &prevval, - found ? &key : toku_lt_infinity, - found ? &val : toku_lt_infinity); - if (r!=0) goto cleanup; + if (do_locking) { + DB_TXN *txn_anc = toku_txn_ancestor(c->i->txn); + r = toku_txn_add_lt(txn_anc, lt); + if (r!=0) goto cleanup; + r = toku_lt_acquire_range_read_lock(lt, db, toku_txn_get_txnid(txn_anc->i->tokutxn), + &prevkey, &prevval, + found ? &key : toku_lt_infinity, + found ? &val : toku_lt_infinity); + if (r!=0) goto cleanup; + } if (found) { f(&key, &val, extra); } r = c_get_result; cleanup: - toku_free(prevkey.data); - toku_free(prevval.data); + if (prevkey.data) toku_free(prevkey.data); + if (prevval.data) toku_free(prevval.data); return r; } @@ -1781,31 +1796,35 @@ finish: static int toku_db_get_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) { int r; + u_int32_t lock_flags = get_prelocked_flags(flags); + flags &= ~lock_flags; if (flags!=0 && flags!=DB_GET_BOTH) return EINVAL; DBC *dbc; r = toku_db_cursor(db, txn, &dbc, 0, 1); if (r!=0) return r; - r = toku_c_get_noassociate(dbc, key, data, - (flags == 0) ? DB_SET : DB_GET_BOTH); + u_int32_t c_get_flags = (flags == 0) ? DB_SET : DB_GET_BOTH; + r = toku_c_get_noassociate(dbc, key, data, c_get_flags | lock_flags); int r2 = toku_c_close(dbc); return r ? r : r2; } static int toku_db_del_noassociate(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) { int r; + u_int32_t lock_flags = get_prelocked_flags(flags); + flags &= ~lock_flags; if (flags!=0 && flags!=DB_DELETE_ANY) return EINVAL; //DB_DELETE_ANY supresses the BDB DB->del return value indicating that the key was not found prior to the delete if (!(flags & DB_DELETE_ANY)) { DBT search_val; memset(&search_val, 0, sizeof search_val); search_val.flags = DB_DBT_MALLOC; - r = toku_db_get_noassociate(db, txn, key, &search_val, 0); + r = toku_db_get_noassociate(db, txn, key, &search_val, lock_flags); if (r != 0) return r; toku_free(search_val.data); } //Do the actual deleting. - if (db->i->lt) { + if (db->i->lt && !(lock_flags&DB_PRELOCKED_WRITE)) { DB_TXN* txn_anc = toku_txn_ancestor(txn); r = toku_txn_add_lt(txn_anc, db->i->lt); if (r!=0) { return r; } @@ -2156,13 +2175,16 @@ static int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t if ((db->i->open_flags & DB_THREAD) && db_thread_need_flags(data)) return EINVAL; + u_int32_t lock_flags = get_prelocked_flags(flags); + flags &= ~lock_flags; if (flags != 0 && flags != DB_GET_BOTH) return EINVAL; // We aren't ready to handle flags such as DB_READ_COMMITTED or DB_READ_UNCOMMITTED or DB_RMW DBC *dbc; r = toku_db_cursor(db, txn, &dbc, 0, 1); if (r!=0) return r; - r = toku_c_get(dbc, key, data, (flags == 0) ? DB_SET : DB_GET_BOTH); + u_int32_t c_get_flags = (flags == 0) ? DB_SET : DB_GET_BOTH; + r = toku_c_get(dbc, key, data, c_get_flags | lock_flags); int r2 = toku_c_close(dbc); return r ? r : r2; } @@ -2434,6 +2456,8 @@ static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, if (key->size >= limit || data->size >= limit) return toku_ydb_do_error(db->dbenv, EINVAL, "The largest key or data item allowed is %d bytes", limit); } + u_int32_t lock_flags = get_prelocked_flags(flags); + flags &= ~lock_flags; if (flags == DB_YESOVERWRITE) { /* tokudb does insert or replace */ @@ -2443,7 +2467,7 @@ static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, DBT testfordata; memset(&testfordata, 0, sizeof(testfordata)); testfordata.flags = DB_DBT_MALLOC; - r = toku_db_get_noassociate(db, txn, key, &testfordata, 0); + r = toku_db_get_noassociate(db, txn, key, &testfordata, lock_flags); if (r == 0) { if (testfordata.data) toku_free(testfordata.data); return DB_KEYEXIST; @@ -2456,7 +2480,7 @@ static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, assert(flags == 0); if (brtflags & TOKU_DB_DUPSORT) { #if TDB_EQ_BDB - r = toku_db_get_noassociate(db, txn, key, data, DB_GET_BOTH); + r = toku_db_get_noassociate(db, txn, key, data, DB_GET_BOTH | lock_flags); if (r == 0) return DB_KEYEXIST; if (r != DB_NOTFOUND) return r; @@ -2465,7 +2489,7 @@ static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, #endif } } - if (db->i->lt) { + if (db->i->lt && !(lock_flags&DB_PRELOCKED_WRITE)) { DB_TXN* txn_anc = toku_txn_ancestor(txn); r = toku_txn_add_lt(txn_anc, db->i->lt); if (r!=0) { return r; }