From cb204011d668a1c3e9a31607f362de5423def409 Mon Sep 17 00:00:00 2001
From: "Bradley C. Kuszmaul" <bradley@tokutek.com>
Date: Wed, 27 Feb 2008 09:05:58 +0000
Subject: [PATCH] Improvements to rollback.  Addresses #27.

git-svn-id: file:///svn/tokudb@2434 c7de825b-a66e-492c-adef-691d508d4ae1
---
 newbrt/brt.c       | 11 ---------
 newbrt/logformat.c | 19 +++++++---------
 newbrt/pma.c       | 57 +++++++++++++++++++++++++++++-----------------
 newbrt/roll.c      | 20 ++--------------
 4 files changed, 46 insertions(+), 61 deletions(-)

diff --git a/newbrt/brt.c b/newbrt/brt.c
index ab898a0973a..58f8d157fa9 100644
--- a/newbrt/brt.c
+++ b/newbrt/brt.c
@@ -1715,12 +1715,6 @@ int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
     int r;
     BRT_CMD_S brtcmd = { BRT_INSERT, toku_txn_get_txnid(txn), .u.id={key,val}};
 
-    {
-	const BYTESTRING insertedkey  =  { key->size, toku_memdup(key->data, key->size) };
-	const BYTESTRING inserteddata =  { val->size, toku_memdup(val->data, val->size) };
-	r = toku_logger_save_rollback_insert(txn, toku_cachefile_filenum(brt->cf), insertedkey, inserteddata);
-	if (r!=0) return r;
-    }
     r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
     return r;
 }
@@ -1745,11 +1739,6 @@ int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
     DBT val;
     printf("removing\n");
     BRT_CMD_S brtcmd = { BRT_DELETE, toku_txn_get_txnid(txn), .u.id={key, toku_init_dbt(&val)}};
-    {
-	const BYTESTRING deletedkey  =  { key->size, toku_memdup(key->data, key->size) };
-	r = toku_logger_save_rollback_delete(txn, toku_cachefile_filenum(brt->cf), deletedkey);
-	if (r!=0) return r;
-    }
     r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
     return r;
 }
diff --git a/newbrt/logformat.c b/newbrt/logformat.c
index e55554c14cb..de7adb22a31 100644
--- a/newbrt/logformat.c
+++ b/newbrt/logformat.c
@@ -41,17 +41,14 @@ int logformat_version_number = 0;
 const struct logtype rollbacks[] = {
     {"fcreate", 'F', FA{{"BYTESTRING", "fname", 0},
 			NULLFIELD}},
-    {"delete",    'd', FA{{"FILENUM", "filenum", 0}, // Note a delete for rollback.  
-			  {"BYTESTRING", "key", 0},
-			  NULLFIELD}},
-    {"deleteboth", 'D', FA{{"FILENUM", "filenum", 0}, // Note a delete for rollback.  
-			   {"BYTESTRING", "key", 0},
-			   {"BYTESTRING", "data", 0},
-			   NULLFIELD}},
-    {"insert",    'i', FA{{"FILENUM", "filenum", 0}, // Note a delete for rollback.  
-			  {"BYTESTRING", "key", 0},
-			  {"BYTESTRING", "data", 0},
-			  NULLFIELD}},
+    {"deleteatleaf", 'd', FA{{"FILENUM", "filenum", 0}, // Note a delete for rollback.   The delete takes place in a leaf.
+			     {"BYTESTRING", "key", 0},
+			     {"BYTESTRING", "data", 0},
+			     NULLFIELD}},
+    {"insertatleaf", 'i', FA{{"FILENUM", "filenum", 0}, // Note an insert for rollback.   The insert takes place in a leaf.
+			     {"BYTESTRING", "key", 0},
+			     {"BYTESTRING", "data", 0},
+			     NULLFIELD}},
     {0,0,FA{NULLFIELD}}
 };
 
diff --git a/newbrt/pma.c b/newbrt/pma.c
index 88e3555f662..d19d193da89 100644
--- a/newbrt/pma.c
+++ b/newbrt/pma.c
@@ -716,18 +716,27 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKULOGGER logger, TXNID xid, FILE
     pma->n_pairs_present++;
     *fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size); 
 
-    const struct kv_pair *pair = pma->pairs[idx];
-    const BYTESTRING key  = { pair->keylen, toku_memdup(kv_pair_key_const(pair), pair->keylen) };
-    const BYTESTRING data = { pair->vallen, toku_memdup(kv_pair_val_const(pair), pair->vallen) };
-    TOKUTXN txn;
-    int r = toku_log_insertinleaf (logger, xid, pma->filenum, diskoff, idx, key, data);
-    if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger);
-    if (r!=0) goto freeit;
-    if (0!=toku_txnid2txn(logger, xid, &txn)) goto freeit;
+    struct kv_pair *pair = pma->pairs[idx];
+    {
+	TOKUTXN txn;
+	int r;
+	if ((r=toku_txnid2txn(logger, xid, &txn))) return r;
+	const BYTESTRING key  = { pair->keylen, toku_memdup(kv_pair_key_const(pair), pair->keylen) };
+	const BYTESTRING data = { pair->vallen, toku_memdup(kv_pair_val_const(pair), pair->vallen) };
+	if ((r = toku_logger_save_rollback_insertatleaf(txn, pma->filenum, key, data))) {
+	     toku_free(key.data); toku_free(data.data);
+	     return r;
+	}
+    }
+    {
+	const BYTESTRING key  = { pair->keylen, kv_pair_key(pair) };
+	const BYTESTRING data = { pair->vallen, kv_pair_val(pair) };
+	int r = toku_log_insertinleaf (logger, xid, pma->filenum, diskoff, idx, key, data);
+	if (r!=0) return r;
+	if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger);
+    }
 
-    /* Don't save rollback info, instead we'll reinsert the command at the root, if the insert fails. */
-    if (0) { freeit: toku_free(key.data); toku_free(data.data); }
-    return r;
+    return 0;
 }    
 
 static int pma_delete_dup (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int32_t *fingerprint, u_int32_t *deleted_size) {
@@ -866,15 +875,12 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
 		if (r!=0) return r;
 	    }
 	    if (logger) {
-		const BYTESTRING deletedkey  = { kv->keylen, toku_memdup(kv_pair_key(kv), kv->keylen) };
-		const BYTESTRING deleteddata = { kv->vallen, toku_memdup(kv_pair_val(kv), kv->vallen) };
 		TOKUTXN txn;
 		if (0!=toku_txnid2txn(logger, xid, &txn)) return -1;
-		r=toku_logger_save_rollback_deleteboth(txn, pma->filenum, deletedkey, deleteddata);
-		if (r!=0) {
-		    toku_free(deletedkey.data);
-		    toku_free(deleteddata.data);
-		    return r;
+		const BYTESTRING deletedkey  = { kv->keylen, toku_memdup(kv_pair_key(kv), kv->keylen) };
+		const BYTESTRING deleteddata = { kv->vallen, toku_memdup(kv_pair_val(kv), kv->vallen) };
+		r=toku_logger_save_rollback_deleteatleaf(txn, pma->filenum, deletedkey, deleteddata);
+		if (r!=0) { toku_free(deletedkey.data); toku_free(deleteddata.data); return r;
 		}
 	    }
 	}
@@ -904,9 +910,18 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
     //printf("%s:%d txn=%p\n", __FILE__, __LINE__, txn);
  logit_and_update_fingerprint:
     {
-	struct kv_pair *pair = pma->pairs[idx];
-	BYTESTRING key  = { pair->keylen, kv_pair_key(pair) };
-	BYTESTRING data = { pair->vallen, kv_pair_val(pair) };
+	TOKUTXN txn;
+	if ((r=toku_txnid2txn(logger, xid, &txn))) return r;
+	const BYTESTRING key  = { k->size, toku_memdup(k->data, k->size) };
+	const BYTESTRING data = { v->size, toku_memdup(v->data, v->size) };
+	if ((r = toku_logger_save_rollback_insertatleaf(txn, pma->filenum, key, data))) {
+	     toku_free(key.data); toku_free(data.data);
+	     return r;
+	}
+    }
+    {
+	const BYTESTRING key  = { k->size, k->data };
+	const BYTESTRING data = { v->size, k->data };
 	r = toku_log_insertinleaf (logger, xid, pma->filenum, diskoff, idx, key, data);
 	if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger);
 	if (r!=0) return r;
diff --git a/newbrt/roll.c b/newbrt/roll.c
index 873ff4b840e..92b8e2b28a4 100644
--- a/newbrt/roll.c
+++ b/newbrt/roll.c
@@ -411,8 +411,7 @@ void toku_recover_deleteinleaf (LSN lsn, TXNID UU(txnid), FILENUM filenum, DISKO
     toku_free_BYTESTRING(databs);
 }
 
-int toku_rollback_deleteboth (FILENUM filenum,
-			      BYTESTRING key,BYTESTRING data,TOKUTXN txn) {
+int toku_rollback_deleteatleaf (FILENUM filenum, BYTESTRING key, BYTESTRING data,TOKUTXN txn) {
     CACHEFILE cf;
     BRT brt;
     int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf, &brt);
@@ -425,22 +424,7 @@ int toku_rollback_deleteboth (FILENUM filenum,
     return r;
 }
 
-int toku_rollback_delete (FILENUM filenum,
-			      BYTESTRING key,BYTESTRING data,TOKUTXN txn) {
-    CACHEFILE cf;
-    BRT brt;
-    int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf, &brt);
-    assert(r==0);
-    DBT key_dbt,data_dbt;
-    r = toku_brt_insert(brt,
-			toku_fill_dbt(&key_dbt, key.data, key.len),
-			toku_fill_dbt(&data_dbt, data.data, data.len),
-			txn);
-    return r;
-}
-
-int toku_rollback_insert (FILENUM filenum,
-			  BYTESTRING key,BYTESTRING data,TOKUTXN txn) {
+int toku_rollback_insertatleaf (FILENUM filenum, BYTESTRING key,BYTESTRING data, TOKUTXN txn) {
     CACHEFILE cf;
     BRT brt;
     int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf, &brt);