[t:2350] Added DB_NOOVERWRITE_NO_ERROR flag for DB->put.

Will return EINVAL for ENV->put_multiple.

Has same semantics as INSERT IGNORE in mysql.
(equivalently) Has same semantics as DB_NOOVERWRITE except that it returns 0 if there is a duplicate key error
instead of DB_KEYEXISTS

git-svn-id: file:///svn/toku/tokudb@17682 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
Yoni Fogel 2013-04-16 23:59:00 -04:00
parent 9a3d6c5dc9
commit adf3e7ae95
16 changed files with 117 additions and 26 deletions

View file

@ -132,9 +132,10 @@ typedef enum {
#define DB_DUPSORT 4
#define DB_KEYFIRST 16
#define DB_KEYLAST 17
#define DB_NODUPDATA 22
#define DB_NOOVERWRITE 23
#define DB_YESOVERWRITE 254
#define DB_NODUPDATA 22
#define DB_YESOVERWRITE 1
#define DB_NOOVERWRITE_NO_ERROR 2
#define DB_OPFLAGS_MASK 255
#define DB_AUTO_COMMIT 8388608
#define DB_INIT_LOCK 4096

View file

@ -132,9 +132,10 @@ typedef enum {
#define DB_DUPSORT 4
#define DB_KEYFIRST 15
#define DB_KEYLAST 16
#define DB_NODUPDATA 21
#define DB_NOOVERWRITE 22
#define DB_YESOVERWRITE 254
#define DB_NODUPDATA 21
#define DB_YESOVERWRITE 1
#define DB_NOOVERWRITE_NO_ERROR 2
#define DB_OPFLAGS_MASK 255
#define DB_AUTO_COMMIT 16777216
#define DB_INIT_LOCK 8192

View file

@ -132,9 +132,10 @@ typedef enum {
#define DB_DUPSORT 32768
#define DB_KEYFIRST 15
#define DB_KEYLAST 16
#define DB_NODUPDATA 21
#define DB_NOOVERWRITE 22
#define DB_YESOVERWRITE 254
#define DB_NODUPDATA 21
#define DB_YESOVERWRITE 1
#define DB_NOOVERWRITE_NO_ERROR 2
#define DB_OPFLAGS_MASK 255
#define DB_AUTO_COMMIT 16777216
#define DB_INIT_LOCK 16384

View file

@ -132,9 +132,10 @@ typedef enum {
#define DB_DUPSORT 65536
#define DB_KEYFIRST 13
#define DB_KEYLAST 14
#define DB_NODUPDATA 19
#define DB_NOOVERWRITE 20
#define DB_YESOVERWRITE 254
#define DB_NODUPDATA 19
#define DB_YESOVERWRITE 1
#define DB_NOOVERWRITE_NO_ERROR 2
#define DB_OPFLAGS_MASK 255
#define DB_AUTO_COMMIT 33554432
#define DB_INIT_LOCK 32768

View file

@ -132,9 +132,10 @@ typedef enum {
#define DB_DUPSORT 65536
#define DB_KEYFIRST 13
#define DB_KEYLAST 14
#define DB_NODUPDATA 19
#define DB_NOOVERWRITE 20
#define DB_YESOVERWRITE 254
#define DB_NODUPDATA 19
#define DB_YESOVERWRITE 1
#define DB_NOOVERWRITE_NO_ERROR 2
#define DB_OPFLAGS_MASK 255
#define DB_AUTO_COMMIT 33554432
#define DB_INIT_LOCK 131072

View file

@ -43,6 +43,21 @@ void print_db_notices (void) {
printf("#define %s %d\n", #name, bit); \
} while (0)
#define dodefine_track_enum(flags, name) do {assert(!(flags[name])); \
flags[name] = 1; \
printf("#define %s %d\n", #name, name);} while (0)
#define dodefine_from_track_enum(flags, name) do { \
uint32_t which; \
/* don't use 0 */ \
for (which = 1; which < 256; which++) { \
if (!(flags[which])) break; \
} \
assert(which < 256); \
flags[which] = 1; \
printf("#define %s %d\n", #name, which); \
} while (0)
enum {
TOKUDB_OUT_OF_LOCKS = -100000,
TOKUDB_SUCCEEDED_EARLY = -100001,
@ -82,9 +97,13 @@ void print_defines (void) {
dodefine(DB_KEYFIRST);
dodefine(DB_KEYLAST);
dodefine(DB_NODUPDATA);
dodefine(DB_NOOVERWRITE);
printf("#define DB_YESOVERWRITE 254\n"); // tokudb
{
static uint8_t insert_flags[256];
dodefine_track_enum(insert_flags, DB_NOOVERWRITE);
dodefine_track_enum(insert_flags, DB_NODUPDATA);
dodefine_from_track_enum(insert_flags, DB_YESOVERWRITE);
dodefine_from_track_enum(insert_flags, DB_NOOVERWRITE_NO_ERROR);
}
dodefine(DB_OPFLAGS_MASK);
dodefine(DB_AUTO_COMMIT);

View file

@ -132,9 +132,10 @@ typedef enum {
#define DB_DUPSORT 65536
#define DB_KEYFIRST 13
#define DB_KEYLAST 14
#define DB_NODUPDATA 19
#define DB_NOOVERWRITE 20
#define DB_YESOVERWRITE 254
#define DB_NODUPDATA 19
#define DB_YESOVERWRITE 1
#define DB_NOOVERWRITE_NO_ERROR 2
#define DB_OPFLAGS_MASK 255
#define DB_AUTO_COMMIT 33554432
#define DB_INIT_LOCK 131072

View file

@ -132,9 +132,10 @@ typedef enum {
#define DB_DUPSORT 65536
#define DB_KEYFIRST 13
#define DB_KEYLAST 14
#define DB_NODUPDATA 19
#define DB_NOOVERWRITE 20
#define DB_YESOVERWRITE 254
#define DB_NODUPDATA 19
#define DB_YESOVERWRITE 1
#define DB_NOOVERWRITE_NO_ERROR 2
#define DB_OPFLAGS_MASK 255
#define DB_AUTO_COMMIT 33554432
#define DB_INIT_LOCK 131072

View file

@ -1233,6 +1233,7 @@ should_compare_both_keys (BRTNODE node, BRT_MSG cmd)
// Effect: Return nonzero if we need to compare both the key and the value.
{
switch (cmd->type) {
case BRT_INSERT_NO_OVERWRITE:
case BRT_INSERT:
return node->flags & TOKU_DB_DUPSORT;
case BRT_DELETE_BOTH:
@ -1520,6 +1521,7 @@ brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd,
node->u.l.seqinsert = 0;
switch (cmd->type) {
case BRT_INSERT_NO_OVERWRITE:
case BRT_INSERT:
if (doing_seqinsert) {
idx = toku_omt_size(node->u.l.buffer);
@ -1909,6 +1911,7 @@ brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd,
//TODO: Accessing type directly
switch (cmd->type) {
case BRT_INSERT_NO_OVERWRITE:
case BRT_INSERT:
case BRT_DELETE_BOTH:
case BRT_ABORT_BOTH:
@ -2629,7 +2632,7 @@ toku_brt_broadcast_commit_all (BRT brt)
// Effect: Insert the key-val pair into brt.
int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
return toku_brt_maybe_insert(brt, key, val, txn, FALSE, ZERO_LSN, TRUE);
return toku_brt_maybe_insert(brt, key, val, txn, FALSE, ZERO_LSN, TRUE, BRT_INSERT);
}
static void
@ -2660,7 +2663,8 @@ toku_brt_log_put_multiple (TOKUTXN txn, BRT src_brt, BRT *brts, int num_brts, co
return r;
}
int toku_brt_maybe_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, int do_logging) {
int toku_brt_maybe_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, int do_logging, enum brt_msg_type type) {
assert(type==BRT_INSERT || type==BRT_INSERT_NO_OVERWRITE);
int r = 0;
XIDS message_xids;
TXNID xid = toku_txn_get_txnid(txn);
@ -2688,7 +2692,12 @@ int toku_brt_maybe_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn, BOOL oplsn_
if (do_logging && logger) {
BYTESTRING keybs = {.len=key->size, .data=key->data};
BYTESTRING valbs = {.len=val->size, .data=val->data};
r = toku_log_enq_insert(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), xid, keybs, valbs);
if (type == BRT_INSERT) {
r = toku_log_enq_insert(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), xid, keybs, valbs);
}
else {
r = toku_log_enq_insert_no_overwrite(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), xid, keybs, valbs);
}
if (r!=0) return r;
}
@ -2696,7 +2705,7 @@ int toku_brt_maybe_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn, BOOL oplsn_
if (oplsn_valid && oplsn.lsn <= treelsn.lsn) {
r = 0;
} else {
BRT_MSG_S brtcmd = { BRT_INSERT, message_xids, .u.id={key,val}};
BRT_MSG_S brtcmd = { type, message_xids, .u.id={key,val}};
r = toku_brt_root_put_cmd(brt, &brtcmd);
}
return r;

View file

@ -61,7 +61,7 @@ int toku_brt_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn);
// Effect: Insert a key and data pair into a brt if the oplsn is newer than the brt lsn. This function is called during recovery.
// Returns 0 if successful
int toku_brt_maybe_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, int do_logging);
int toku_brt_maybe_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, int do_logging, enum brt_msg_type type);
int toku_brt_log_put_multiple (TOKUTXN txn, BRT src_brt, BRT *brts, int num_brts, const DBT *key, const DBT *val);
int toku_brt_log_del_multiple (TOKUTXN txn, BRT src_brt, BRT *brts, int num_brts, const DBT *key, const DBT *val);

View file

@ -112,6 +112,7 @@ dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) {
switch ((enum brt_msg_type)typ) {
case BRT_NONE: printf("NONE"); goto ok;
case BRT_INSERT: printf("INSERT"); goto ok;
case BRT_INSERT_NO_OVERWRITE: printf("INSERT_NO_OVERWRITE"); goto ok;
case BRT_DELETE_ANY: printf("DELETE_ANY"); goto ok;
case BRT_DELETE_BOTH: printf("DELETE_BOTH"); goto ok;
case BRT_ABORT_ANY: printf("ABORT_ANY"); goto ok;

View file

@ -85,6 +85,7 @@ enum brt_msg_type {
BRT_COMMIT_BROADCAST_ALL = 8, // Broadcast to all leafentries, (commit all transactions).
BRT_COMMIT_BROADCAST_TXN = 9, // Broadcast to all leafentries, (commit specific transaction).
BRT_ABORT_BROADCAST_TXN = 10, // Broadcast to all leafentries, (commit specific transaction).
BRT_INSERT_NO_OVERWRITE = 11,
};
typedef struct xids_t *XIDS;

View file

@ -139,6 +139,11 @@ const struct logtype logtypes[] = {
{"tablelock_on_empty_table", 'L', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
NULLFIELD}},
{"enq_insert_no_overwrite", 'i', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "value", 0},
NULLFIELD}},
{"enq_insert", 'I', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "key", 0},

View file

@ -470,7 +470,7 @@ static int toku_recover_enq_insert (struct logtype_enq_insert *l, RECOVER_ENV re
DBT keydbt, valdbt;
toku_fill_dbt(&keydbt, l->key.data, l->key.len);
toku_fill_dbt(&valdbt, l->value.data, l->value.len);
r = toku_brt_maybe_insert(tuple->brt, &keydbt, &valdbt, txn, TRUE, l->lsn, FALSE);
r = toku_brt_maybe_insert(tuple->brt, &keydbt, &valdbt, txn, TRUE, l->lsn, FALSE, BRT_INSERT);
assert(r == 0);
return 0;
@ -481,6 +481,36 @@ static int toku_recover_backward_enq_insert (struct logtype_enq_insert *UU(l), R
return 0;
}
static int toku_recover_enq_insert_no_overwrite (struct logtype_enq_insert_no_overwrite *l, RECOVER_ENV renv) {
int r;
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->xid, &txn);
assert(r == 0);
if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == FORWARD_OLDER_CHECKPOINT_BEGIN); //cannot happen after checkpoint begin
return 0;
}
struct file_map_tuple *tuple = NULL;
r = file_map_find(&renv->fmap, l->filenum, &tuple);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return 0;
}
DBT keydbt, valdbt;
toku_fill_dbt(&keydbt, l->key.data, l->key.len);
toku_fill_dbt(&valdbt, l->value.data, l->value.len);
r = toku_brt_maybe_insert(tuple->brt, &keydbt, &valdbt, txn, TRUE, l->lsn, FALSE, BRT_INSERT_NO_OVERWRITE);
assert(r == 0);
return 0;
}
static int toku_recover_backward_enq_insert_no_overwrite (struct logtype_enq_insert_no_overwrite *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static int toku_recover_enq_insert_multiple (struct logtype_enq_insert_multiple *l, RECOVER_ENV renv) {
int r;
TOKUTXN txn = NULL;
@ -521,7 +551,7 @@ static int toku_recover_enq_insert_multiple (struct logtype_enq_insert_multiple
DB *db = tuple->brt->db;
r = renv->generate_row_for_put(db, src_db, &dest_key, &dest_val, &src_key, &src_val, NULL);
assert(r==0);
r = toku_brt_maybe_insert(tuple->brt, &dest_key, &dest_val, txn, TRUE, l->lsn, FALSE);
r = toku_brt_maybe_insert(tuple->brt, &dest_key, &dest_val, txn, TRUE, l->lsn, FALSE, BRT_INSERT);
assert(r == 0);
//flags==0 indicates the return values are stored in temporary memory that does
//not need to be freed. We need to continue using DB_DBT_REALLOC however.

View file

@ -154,6 +154,13 @@ msg_modify_ule(ULE ule, BRT_MSG msg) {
ule_do_implicit_promotions(ule, xids);
enum brt_msg_type type = brt_msg_get_type(msg);
switch (type) {
case BRT_INSERT_NO_OVERWRITE: {
UXR old_innermost_uxr = ule_get_innermost_uxr(ule);
//If something exists, quit (no overwrite).
if (uxr_is_insert(old_innermost_uxr)) break;
//else it is just an insert, so
//fall through to BRT_INSERT on purpose.
}
case BRT_INSERT: ;
u_int32_t vallen = brt_msg_get_vallen(msg);
void * valp = brt_msg_get_val(msg);

View file

@ -4259,6 +4259,9 @@ db_put_check_overwrite_constraint(DB *db, DB_TXN *txn, DBT *key, DBT *UU(val),
r = toku_ydb_do_error(db->dbenv, EINVAL, "Tokudb requires that db->put specify DB_YESOVERWRITE or DB_NOOVERWRITE on DB_DUPSORT databases");
}
}
else if (overwrite_flag==DB_NOOVERWRITE_NO_ERROR) {
r = 0;
}
else {
//Other flags are not (yet) supported.
r = EINVAL;
@ -4293,7 +4296,11 @@ toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags) {
}
if (r==0) {
//Insert into the brt.
r = toku_brt_insert(db->i->brt, key, val, txn ? db_txn_struct_i(txn)->tokutxn : 0);
TOKUTXN ttxn = txn ? db_txn_struct_i(txn)->tokutxn : NULL;
enum brt_msg_type type = BRT_INSERT;
if (flags==DB_NOOVERWRITE_NO_ERROR)
type = BRT_INSERT_NO_OVERWRITE;
r = toku_brt_maybe_insert(db->i->brt, key, val, ttxn, FALSE, ZERO_LSN, TRUE, type);
}
return r;
}
@ -4326,6 +4333,11 @@ env_put_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT
&keys[which_db], &vals[which_db],
lock_flags[which_db], remaining_flags[which_db]);
if (r!=0) goto cleanup;
if (remaining_flags[which_db] == DB_NOOVERWRITE_NO_ERROR) {
//put_multiple does not support delaying the no error, since we would
//have to log the flag in the put_multiple.
r = EINVAL; goto cleanup;
}
//Do locking if necessary.
if (db->i->lt && !(lock_flags[which_db] & DB_PRELOCKED_WRITE)) {
//Needs locking
@ -4346,7 +4358,7 @@ env_put_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT
for (which_db = 0; which_db < num_dbs; which_db++) {
DB *db = db_array[which_db];
num_inserts++;
r = toku_brt_maybe_insert(db->i->brt, &keys[which_db], &vals[which_db], ttxn, FALSE, ZERO_LSN, FALSE);
r = toku_brt_maybe_insert(db->i->brt, &keys[which_db], &vals[which_db], ttxn, FALSE, ZERO_LSN, FALSE, BRT_INSERT);
if (r!=0) goto cleanup;
}