bradley fixed the child transaction commit with a rollback log in an external file. closes #730

git-svn-id: file:///svn/tokudb@3566 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
Rich Prohaska 2008-04-22 17:09:24 +00:00
parent 23efff4b4e
commit cc0fc35141
4 changed files with 97 additions and 20 deletions

View file

@ -367,7 +367,7 @@ static void cleanup_txn (TOKUTXN txn) {
return;
}
static int commit_rollback_item (TOKUTXN txn, struct roll_entry *item) {
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item) {
int r=0;
rolltype_dispatch_assign(item, toku_commit_, r, txn);
if (r!=0) return r;
@ -379,7 +379,7 @@ static int commit_rollback_item (TOKUTXN txn, struct roll_entry *item) {
return 0;
}
static int abort_rollback_item (TOKUTXN txn, struct roll_entry *item) {
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item) {
int r=0;
rolltype_dispatch_assign(item, toku_rollback_, r, txn);
if (r!=0) return r;
@ -399,7 +399,28 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
cleanup_txn(txn);
return r;
} else if (txn->parent!=0) {
// Append the list to the front.
// First we must put a rollinclude entry into the parent if we have a rollentry file.
if (txn->rollentry_filename) {
int len = strlen(txn->rollentry_filename);
// Don't have to strdup the rollentry_filename because
// we take ownership of it.
BYTESTRING fname = {len, txn->rollentry_filename};
r = toku_logger_save_rollback_rollinclude(txn->parent, fname);
if (r!=0) { cleanup_txn(txn); return r; }
r = close(txn->rollentry_fd);
if (r!=0) {
// We have to do the unlink ourselves, and then
// set txn->rollentry_filename=0 so that the cleanup
// won't try to close the fd again.
unlink(txn->rollentry_filename);
txn->rollentry_filename = 0;
cleanup_txn(txn);
return r;
}
// Stop the cleanup from closing and unlinking the file.
txn->rollentry_filename = 0;
}
// Append the list to the front of the parent.
if (txn->oldest_logentry) {
// There are some entries, so link them in.
txn->oldest_logentry->prev = txn->parent->newest_logentry;
@ -409,7 +430,7 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
txn->parent->oldest_logentry = txn->oldest_logentry;
}
txn->newest_logentry = txn->oldest_logentry = 0;
assert(txn->rollentry_filename==0); // This code isn't ready for this case. ??? When committing a child, we have to get the child's records into the parent.
} else {
// do the commit calls and free everything
// we do the commit calls in reverse order too.
@ -418,20 +439,14 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
//printf("%s:%d abort\n", __FILE__, __LINE__);
while ((item=txn->newest_logentry)) {
txn->newest_logentry = item->prev;
r = commit_rollback_item(txn, item);
r = toku_commit_rollback_item(txn, item);
if (r!=0) { cleanup_txn(txn); return r; }
}
}
// Read stuff out of the file and execute it.
if (txn->rollentry_filename) {
while (txn->rollentry_filesize>0) {
struct roll_entry *item;
r = toku_read_rollback_backwards(txn->rollentry_fd, txn->rollentry_filesize, &item, &txn->rollentry_filesize);
if (r!=0) { cleanup_txn(txn); return r; }
r = commit_rollback_item(txn, item);
if (r!=0) { cleanup_txn(txn); return r; }
}
r = toku_commit_fileentries(txn->rollentry_fd, txn->rollentry_filesize, txn);
}
}
cleanup_txn(txn);
@ -761,20 +776,15 @@ int toku_logger_abort(TOKUTXN txn) {
struct roll_entry *item;
while ((item=txn->newest_logentry)) {
txn->newest_logentry = item->prev;
int r=abort_rollback_item(txn, item);
int r = toku_abort_rollback_item(txn, item);
if (r!=0) { cleanup_txn(txn); return r; }
}
}
list_remove(&txn->live_txns_link);
// Read stuff out of the file and roll it back.
if (txn->rollentry_filename) {
while (txn->rollentry_filesize>0) {
struct roll_entry *item;
int r = toku_read_rollback_backwards(txn->rollentry_fd, txn->rollentry_filesize, &item, &txn->rollentry_filesize);
if (r!=0) { cleanup_txn(txn); return r; }
r=abort_rollback_item(txn, item);
if (r!=0) { cleanup_txn(txn); return r; }
}
int r = toku_rollback_fileentries(txn->rollentry_fd, txn->rollentry_filesize, txn);
assert(r==0);
}
cleanup_txn(txn);
return 0;

View file

@ -159,4 +159,10 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags);
int toku_maybe_spill_rollbacks (TOKUTXN txn);
struct roll_entry;
int toku_rollback_fileentries (int fd, off_t filesize, TOKUTXN txn);
int toku_commit_fileentries (int fd, off_t filesize, TOKUTXN txn);
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item);
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item);
#endif

View file

@ -56,6 +56,8 @@ const struct logtype rollbacks[] = {
{"FILENUM", "filenum", 0},
{"BYTESTRING", "key", 0},
NULLFIELD}},
{"rollinclude", 'I', FA{{"BYTESTRING", "fname", 0},
NULLFIELD}},
// {"fclose", 'c', FA{{"FILENUM", "filenum", 0},
// {"BYTESTRING", "fname", 0},
// NULLFIELD}},

View file

@ -5,6 +5,7 @@
#include <stdlib.h>
#include <inttypes.h>
#include <sys/stat.h>
#include <unistd.h>
#include "log_header.h"
@ -96,3 +97,61 @@ int toku_rollback_cmddelete (TXNID xid, FILENUM filenum, BYTESTRING key,TOKUTXN
if (r!=0) return r;
return toku_cachefile_close(&cf, toku_txn_logger(txn));
}
int toku_commit_fileentries (int fd, off_t filesize, TOKUTXN txn) {
while (filesize>0) {
int r;
struct roll_entry *item;
r = toku_read_rollback_backwards(fd, filesize, &item, &filesize);
if (r!=0) { return r; }
r = toku_commit_rollback_item(txn, item);
if (r!=0) { return r; }
}
return 0;
}
int toku_rollback_fileentries (int fd, off_t filesize, TOKUTXN txn) {
while (filesize>0) {
int r;
struct roll_entry *item;
r = toku_read_rollback_backwards(fd, filesize, &item, &filesize);
if (r!=0) { return r; }
r = toku_abort_rollback_item(txn, item);
if (r!=0) { return r; }
}
return 0;
}
int toku_commit_rollinclude (BYTESTRING bs,TOKUTXN txn) {
int r;
char *fname = fixup_fname(&bs);
int fd = open(fname, O_RDONLY);
assert(fd>=0);
struct stat statbuf;
r = fstat(fd, &statbuf);
assert(r==0);
r = toku_commit_fileentries(fd, statbuf.st_size, txn);
assert(r==0);
r = close(fd);
assert(r==0);
unlink(fname);
free(fname);
return 0;
}
int toku_rollback_rollinclude (BYTESTRING bs,TOKUTXN txn) {
int r;
char *fname = fixup_fname(&bs);
int fd = open(fname, O_RDONLY);
assert(fd>=0);
struct stat statbuf;
r = fstat(fd, &statbuf);
assert(r==0);
r = toku_rollback_fileentries(fd, statbuf.st_size, txn);
assert(r==0);
r = close(fd);
assert(r==0);
unlink(fname);
free(fname);
return 0;
}