diff --git a/newbrt/log.c b/newbrt/log.c index d194c918738..05a71287d10 100644 --- a/newbrt/log.c +++ b/newbrt/log.c @@ -367,7 +367,7 @@ static void cleanup_txn (TOKUTXN txn) { return; } -static int commit_rollback_item (TOKUTXN txn, struct roll_entry *item) { +int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item) { int r=0; rolltype_dispatch_assign(item, toku_commit_, r, txn); if (r!=0) return r; @@ -379,7 +379,7 @@ static int commit_rollback_item (TOKUTXN txn, struct roll_entry *item) { return 0; } -static int abort_rollback_item (TOKUTXN txn, struct roll_entry *item) { +int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item) { int r=0; rolltype_dispatch_assign(item, toku_rollback_, r, txn); if (r!=0) return r; @@ -399,7 +399,28 @@ int toku_logger_commit (TOKUTXN txn, int nosync) { cleanup_txn(txn); return r; } else if (txn->parent!=0) { - // Append the list to the front. + // First we must put a rollinclude entry into the parent if we have a rollentry file. + if (txn->rollentry_filename) { + int len = strlen(txn->rollentry_filename); + // Don't have to strdup the rollentry_filename because + // we take ownership of it. + BYTESTRING fname = {len, txn->rollentry_filename}; + r = toku_logger_save_rollback_rollinclude(txn->parent, fname); + if (r!=0) { cleanup_txn(txn); return r; } + r = close(txn->rollentry_fd); + if (r!=0) { + // We have to do the unlink ourselves, and then + // set txn->rollentry_filename=0 so that the cleanup + // won't try to close the fd again. + unlink(txn->rollentry_filename); + txn->rollentry_filename = 0; + cleanup_txn(txn); + return r; + } + // Stop the cleanup from closing and unlinking the file. + txn->rollentry_filename = 0; + } + // Append the list to the front of the parent. if (txn->oldest_logentry) { // There are some entries, so link them in. txn->oldest_logentry->prev = txn->parent->newest_logentry; @@ -409,7 +430,7 @@ int toku_logger_commit (TOKUTXN txn, int nosync) { txn->parent->oldest_logentry = txn->oldest_logentry; } txn->newest_logentry = txn->oldest_logentry = 0; - assert(txn->rollentry_filename==0); // This code isn't ready for this case. ??? When committing a child, we have to get the child's records into the parent. + } else { // do the commit calls and free everything // we do the commit calls in reverse order too. @@ -418,20 +439,14 @@ int toku_logger_commit (TOKUTXN txn, int nosync) { //printf("%s:%d abort\n", __FILE__, __LINE__); while ((item=txn->newest_logentry)) { txn->newest_logentry = item->prev; - r = commit_rollback_item(txn, item); + r = toku_commit_rollback_item(txn, item); if (r!=0) { cleanup_txn(txn); return r; } } } // Read stuff out of the file and execute it. if (txn->rollentry_filename) { - while (txn->rollentry_filesize>0) { - struct roll_entry *item; - r = toku_read_rollback_backwards(txn->rollentry_fd, txn->rollentry_filesize, &item, &txn->rollentry_filesize); - if (r!=0) { cleanup_txn(txn); return r; } - r = commit_rollback_item(txn, item); - if (r!=0) { cleanup_txn(txn); return r; } - } + r = toku_commit_fileentries(txn->rollentry_fd, txn->rollentry_filesize, txn); } } cleanup_txn(txn); @@ -761,20 +776,15 @@ int toku_logger_abort(TOKUTXN txn) { struct roll_entry *item; while ((item=txn->newest_logentry)) { txn->newest_logentry = item->prev; - int r=abort_rollback_item(txn, item); + int r = toku_abort_rollback_item(txn, item); if (r!=0) { cleanup_txn(txn); return r; } } } list_remove(&txn->live_txns_link); // Read stuff out of the file and roll it back. if (txn->rollentry_filename) { - while (txn->rollentry_filesize>0) { - struct roll_entry *item; - int r = toku_read_rollback_backwards(txn->rollentry_fd, txn->rollentry_filesize, &item, &txn->rollentry_filesize); - if (r!=0) { cleanup_txn(txn); return r; } - r=abort_rollback_item(txn, item); - if (r!=0) { cleanup_txn(txn); return r; } - } + int r = toku_rollback_fileentries(txn->rollentry_fd, txn->rollentry_filesize, txn); + assert(r==0); } cleanup_txn(txn); return 0; diff --git a/newbrt/log.h b/newbrt/log.h index e2ebbbfb4fc..3a042865029 100644 --- a/newbrt/log.h +++ b/newbrt/log.h @@ -159,4 +159,10 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags); int toku_maybe_spill_rollbacks (TOKUTXN txn); +struct roll_entry; +int toku_rollback_fileentries (int fd, off_t filesize, TOKUTXN txn); +int toku_commit_fileentries (int fd, off_t filesize, TOKUTXN txn); +int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item); +int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item); + #endif diff --git a/newbrt/logformat.c b/newbrt/logformat.c index 2ace2c3c894..55c6b1a62b9 100644 --- a/newbrt/logformat.c +++ b/newbrt/logformat.c @@ -56,6 +56,8 @@ const struct logtype rollbacks[] = { {"FILENUM", "filenum", 0}, {"BYTESTRING", "key", 0}, NULLFIELD}}, + {"rollinclude", 'I', FA{{"BYTESTRING", "fname", 0}, + NULLFIELD}}, // {"fclose", 'c', FA{{"FILENUM", "filenum", 0}, // {"BYTESTRING", "fname", 0}, // NULLFIELD}}, diff --git a/newbrt/roll.c b/newbrt/roll.c index 5fdc86d229d..c4157542d4b 100644 --- a/newbrt/roll.c +++ b/newbrt/roll.c @@ -5,6 +5,7 @@ #include #include +#include #include #include "log_header.h" @@ -96,3 +97,61 @@ int toku_rollback_cmddelete (TXNID xid, FILENUM filenum, BYTESTRING key,TOKUTXN if (r!=0) return r; return toku_cachefile_close(&cf, toku_txn_logger(txn)); } + +int toku_commit_fileentries (int fd, off_t filesize, TOKUTXN txn) { + while (filesize>0) { + int r; + struct roll_entry *item; + r = toku_read_rollback_backwards(fd, filesize, &item, &filesize); + if (r!=0) { return r; } + r = toku_commit_rollback_item(txn, item); + if (r!=0) { return r; } + } + return 0; +} + +int toku_rollback_fileentries (int fd, off_t filesize, TOKUTXN txn) { + while (filesize>0) { + int r; + struct roll_entry *item; + r = toku_read_rollback_backwards(fd, filesize, &item, &filesize); + if (r!=0) { return r; } + r = toku_abort_rollback_item(txn, item); + if (r!=0) { return r; } + } + return 0; +} + +int toku_commit_rollinclude (BYTESTRING bs,TOKUTXN txn) { + int r; + char *fname = fixup_fname(&bs); + int fd = open(fname, O_RDONLY); + assert(fd>=0); + struct stat statbuf; + r = fstat(fd, &statbuf); + assert(r==0); + r = toku_commit_fileentries(fd, statbuf.st_size, txn); + assert(r==0); + r = close(fd); + assert(r==0); + unlink(fname); + free(fname); + return 0; +} + +int toku_rollback_rollinclude (BYTESTRING bs,TOKUTXN txn) { + int r; + char *fname = fixup_fname(&bs); + int fd = open(fname, O_RDONLY); + assert(fd>=0); + struct stat statbuf; + r = fstat(fd, &statbuf); + assert(r==0); + r = toku_rollback_fileentries(fd, statbuf.st_size, txn); + assert(r==0); + r = close(fd); + assert(r==0); + unlink(fname); + free(fname); + return 0; +}