From 1f796663cde9f7c7d4fbebf7845d27adb3f45ccc Mon Sep 17 00:00:00 2001 From: "Bradley C. Kuszmaul" Date: Tue, 16 Apr 2013 23:57:38 -0400 Subject: [PATCH] Check in the code for compressing the rolltmp file. This was a tricky merge. In the 1332a directory I did: {{{ svn merge https://svn.tokutek.com/tokudb/toku/tokudb.1032b+1332@8415 https://svn.tokutek.com/tokudb/toku/tokudb.1032b+1332@8416 }}} Then I was able to resolve the conflicts. Then in the main line I did: {{{ svn merge -r9042:9046 https://svn.tokutek.com/tokudb/toku/tokudb.1332a }}} Fixes #1332. git-svn-id: file:///svn/toku/tokudb@9047 c7de825b-a66e-492c-adef-691d508d4ae1 --- newbrt/bread.c | 54 ++++++++++++++++++++++++------------ newbrt/bread.h | 10 ++++--- newbrt/log-internal.h | 2 +- newbrt/log.c | 33 ++++++++++++++++++---- newbrt/log.h | 2 -- newbrt/roll.c | 17 +++--------- newbrt/tests/bread-test.c | 58 ++++++++++++++++++++------------------- newbrt/tests/test1305.c | 32 +++++++++++++++++++-- 8 files changed, 134 insertions(+), 74 deletions(-) diff --git a/newbrt/bread.c b/newbrt/bread.c index cba444098d0..9b7ac4485d1 100644 --- a/newbrt/bread.c +++ b/newbrt/bread.c @@ -3,20 +3,19 @@ #include "includes.h" struct bread { - toku_off_t current_offset; // The current offset to be read (in the file). That offset includes anything that is unread in the buffer. + int64_t fileoff; // The byte before this offset is the next byte we will read (since we are reading backward) int fd; - size_t bufsize; - char *buf; // A buffer of size bufsize; - int bufoff; // The current offset buf. + int bufoff; // The current offset in the buf. The next byte we will read is buf[bufoff-1] (assuming that bufoff>0). + char *buf; // A buffer with at least bufoff bytes in it. }; -BREAD create_bread_from_fd_initialize_at(int fd, toku_off_t filesize, size_t bufsize) { - BREAD MALLOC(result); - result->current_offset=filesize; +BREAD create_bread_from_fd_initialize_at(int fd) { + BREAD XMALLOC(result); + int r = toku_os_get_file_size(fd, &result->fileoff); + assert(r==0); result->fd=fd; result->bufoff=0; - result->bufsize=bufsize; - MALLOC_N(bufsize, result->buf); + result->buf = 0; return result; } @@ -30,29 +29,48 @@ int close_bread_without_closing_fd(BREAD br) { ssize_t bread_backwards(BREAD br, void *vbuf, size_t nbytes) { char *buf=vbuf; ssize_t result=0; + const int i4 = sizeof(u_int32_t); while (nbytes > 0) { - assert(br->current_offset >= (toku_off_t)nbytes); // read whatever we can out of the buffer. - { + if (br->bufoff>0) { size_t to_copy = ((size_t)br->bufoff >= nbytes) ? nbytes : (size_t)br->bufoff; memcpy(buf+nbytes-to_copy, &br->buf[br->bufoff-to_copy], to_copy); nbytes -= to_copy; - br->current_offset -= to_copy; result += to_copy; br->bufoff -= to_copy; } if (nbytes>0) { assert(br->bufoff==0); - size_t to_read = ((u_int64_t)br->current_offset >= (u_int64_t)br->bufsize) ? br->bufsize : (size_t)br->current_offset; - assert(to_read>0 && to_read<=br->bufsize); - ssize_t r = pread(br->fd, br->buf, to_read, br->current_offset-to_read); - assert(r==(ssize_t)to_read); - br->bufoff = to_read; + u_int32_t compressed_length_n, uncompressed_length_n; + assert(br->fileoff>=i4); // there better be the three lengths plus the compressed data. + { ssize_t r = pread(br->fd, &compressed_length_n, i4, br->fileoff- i4); assert(r==i4); } + u_int32_t compressed_length = ntohl(compressed_length_n); + assert(br->fileoff >= compressed_length + 3*i4); + { ssize_t r = pread(br->fd, &uncompressed_length_n, i4, br->fileoff-2*i4); assert(r==i4); } + u_int32_t uncompressed_length = ntohl(uncompressed_length_n); + char *XMALLOC_N(compressed_length, zbuf); + { + ssize_t r = pread(br->fd, zbuf, compressed_length, br->fileoff- compressed_length -2*i4); + assert(r==(ssize_t)compressed_length); + } + { + u_int32_t compressed_length_n_again; + ssize_t r = pread(br->fd, &compressed_length_n_again, i4, br->fileoff-compressed_length-3*i4); assert(r==i4); + assert(compressed_length_n_again == compressed_length_n); + } + uLongf destlen = uncompressed_length; + XREALLOC_N(uncompressed_length, br->buf); + uncompress((Bytef*)br->buf, &destlen, (Bytef*)zbuf, compressed_length); + assert(destlen==uncompressed_length); + toku_free(zbuf); + + br->bufoff = uncompressed_length; + br->fileoff -= (compressed_length + 3*i4); } } return result; } int bread_has_more(BREAD br) { - return br->current_offset>0; + return (br->fileoff>0) || (br->bufoff>0); } diff --git a/newbrt/bread.h b/newbrt/bread.h index d0a9977b4ce..686a3c6f89e 100644 --- a/newbrt/bread.h +++ b/newbrt/bread.h @@ -4,14 +4,16 @@ // A BREAD reads a file backwards using buffered I/O. BREAD stands for Buffered Read or Backwards Read. // Conceivably, we could read forward too. // The buffered I/O is buffered using a large buffer (e.g., something like a megabyte). -// If not for the large-buffer requirement, we could have used a FILE. +// Furthermore, data is compressed into blocks. Each block is a 4-byte compressed length (in network order), followed by compressed data, followed by a 4-byte uncompressed-length (in network order), followed by a 4-byte compressed length +// The compressed-length appears twice so that the file can be read backward or forward. +// If not for the large-buffer requirement, as well as compression, as well as reading backward, we could have used a FILE. #include typedef struct bread *BREAD; -BREAD create_bread_from_fd_initialize_at(int fd, toku_off_t filesize, size_t bufsize); -// Effect: Given a file descriptor, fd, that points at a file of size filesize, create a BREAD. -// Requires: The filesize must be correct. The fd must be an open fd. +BREAD create_bread_from_fd_initialize_at(int fd); +// Effect: Given a file descriptor, fd, create a BREAD. +// Requires: The fd must be an open fd. int close_bread_without_closing_fd(BREAD); // Effect: Close the BREAD, but don't close the underlying fd. diff --git a/newbrt/log-internal.h b/newbrt/log-internal.h index fa9512e4063..a7cb55f02ed 100644 --- a/newbrt/log-internal.h +++ b/newbrt/log-internal.h @@ -100,7 +100,7 @@ struct tokutxn { size_t rollentry_resident_bytecount; // How many bytes for the rollentries that are stored in main memory. char *rollentry_filename; int rollentry_fd; // If we spill the roll_entries, we write them into this fd. - toku_off_t rollentry_filesize; // How many bytes are in the rollentry file (compressed) + toku_off_t rollentry_filesize; // How many bytes are in the rollentry file (this is the uncompressed bytes. If the file is compressed it may actually be smaller (or even larger with header information)) u_int64_t rollentry_raw_count; // the total count of every byte in the transaction and all its children. OMT open_brts; // a collection of the brts that we touched. Indexed by filenum. }; diff --git a/newbrt/log.c b/newbrt/log.c index 0235bc5cc07..a6a39c684a8 100644 --- a/newbrt/log.c +++ b/newbrt/log.c @@ -501,7 +501,7 @@ int toku_logger_commit (TOKUTXN txn, int nosync, void(*yield)(void*yieldv), void // Read stuff out of the file and execute it. if (txn->rollentry_filename) { - r = toku_commit_fileentries(txn->rollentry_fd, txn->rollentry_filesize, txn, yield, yieldv); + r = toku_commit_fileentries(txn->rollentry_fd, txn, yield, yieldv); } } } @@ -872,7 +872,7 @@ int toku_logger_abort(TOKUTXN txn, void (*yield)(void*), void*yieldv) { list_remove(&txn->live_txns_link); // Read stuff out of the file and roll it back. if (txn->rollentry_filename) { - int r = toku_rollback_fileentries(txn->rollentry_fd, txn->rollentry_filesize, txn, yield, yieldv); + int r = toku_rollback_fileentries(txn->rollentry_fd, txn, yield, yieldv); assert(r==0); } return 0; @@ -1027,9 +1027,32 @@ int toku_maybe_spill_rollbacks (TOKUTXN txn) { txn->rollentry_fd = open(txn->rollentry_filename, O_CREAT+O_RDWR+O_EXCL+O_BINARY, 0600); if (txn->rollentry_fd==-1) return errno; } - ssize_t r = write_it(txn->rollentry_fd, buf, w.ndone); - if (r<0) return r; - assert(r==(ssize_t)w.ndone); + uLongf compressed_len = compressBound(w.ndone); + char *MALLOC_N(compressed_len, compressed_buf); + { + int r = compress2((Bytef*)compressed_buf, &compressed_len, + (Bytef*)buf, w.ndone, + 1); + assert(r==Z_OK); + } + { + u_int32_t v = htonl(compressed_len); + ssize_t r = write_it(txn->rollentry_fd, &v, sizeof(v)); assert(r==sizeof(v)); + } + { + ssize_t r = write_it(txn->rollentry_fd, compressed_buf, compressed_len); + if (r<0) return r; + assert(r==(ssize_t)compressed_len); + } + { + u_int32_t v = htonl(w.ndone); + ssize_t r = write_it(txn->rollentry_fd, &v, sizeof(v)); assert(r==sizeof(v)); + } + { + u_int32_t v = htonl(compressed_len); + ssize_t r = write_it(txn->rollentry_fd, &v, sizeof(v)); assert(r==sizeof(v)); + } + toku_free(compressed_buf); txn->rollentry_filesize+=w.ndone; toku_free(buf); diff --git a/newbrt/log.h b/newbrt/log.h index b0940599899..8247c866512 100644 --- a/newbrt/log.h +++ b/newbrt/log.h @@ -179,12 +179,10 @@ int toku_maybe_spill_rollbacks (TOKUTXN txn); struct roll_entry; int toku_rollback_fileentries (int fd, - toku_off_t filesize, TOKUTXN txn, void (*yield)(void*yieldv), void * yieldv); int toku_commit_fileentries (int fd, - toku_off_t filesize, TOKUTXN txn, void (*yield)(void*yieldv), void * yieldv); diff --git a/newbrt/roll.c b/newbrt/roll.c index 0cc094ab479..1e666013fa9 100644 --- a/newbrt/roll.c +++ b/newbrt/roll.c @@ -203,12 +203,11 @@ toku_rollback_cmddelete (TXNID xid, int toku_commit_fileentries (int fd, - toku_off_t filesize, TOKUTXN txn, YIELDF yield, void * yieldv) { - BREAD f = create_bread_from_fd_initialize_at(fd, filesize, 1<<20); + BREAD f = create_bread_from_fd_initialize_at(fd); int r=0; MEMARENA ma = memarena_create(); int count=0; @@ -230,12 +229,11 @@ toku_commit_fileentries (int fd, int toku_rollback_fileentries (int fd, - toku_off_t filesize, TOKUTXN txn, YIELDF yield, void * yieldv) { - BREAD f = create_bread_from_fd_initialize_at(fd, filesize, 1<<20); + BREAD f = create_bread_from_fd_initialize_at(fd); assert(f); int r=0; MEMARENA ma = memarena_create(); @@ -265,11 +263,7 @@ toku_commit_rollinclude (BYTESTRING bs, char *fname = fixup_fname(&bs); int fd = open(fname, O_RDONLY+O_BINARY); assert(fd>=0); - - int64_t fsize = 0; - r = toku_os_get_file_size(fd, &fsize); - assert(r==0); - r = toku_commit_fileentries(fd, fsize, txn, yield, yieldv); + r = toku_commit_fileentries(fd, txn, yield, yieldv); assert(r==0); r = close(fd); assert(r==0); @@ -288,10 +282,7 @@ toku_rollback_rollinclude (BYTESTRING bs, char *fname = fixup_fname(&bs); int fd = open(fname, O_RDONLY+O_BINARY); assert(fd>=0); - int64_t fsize = 0; - r = toku_os_get_file_size(fd, &fsize); - assert(r==0); - r = toku_rollback_fileentries(fd, fsize, txn, yield, yieldv); + r = toku_rollback_fileentries(fd, txn, yield, yieldv); assert(r==0); r = close(fd); assert(r==0); diff --git a/newbrt/tests/bread-test.c b/newbrt/tests/bread-test.c index 331a14a32c5..3e098b8981a 100644 --- a/newbrt/tests/bread-test.c +++ b/newbrt/tests/bread-test.c @@ -10,61 +10,63 @@ #include #include #include +#include #include "../brttypes.h" #include "../bread.h" #define FNAME "bread-test.data" -#define RECORDS 2 +#define RECORDS 20 +#define RECORDLEN 100 + +char buf[RECORDS][RECORDLEN]; +int sizes[RECORDS]; +int sizesn[RECORDS]; +int nwrote=0; +char wrotedata[RECORDS*RECORDLEN]; static void test (int seed) { srandom(seed); unlink(FNAME); int i; - char buf[RECORDS][100]; - int sizes[RECORDS]; - int sizesn[RECORDS]; - toku_off_t off = 0; { int fd = open(FNAME, O_CREAT+O_RDWR+O_BINARY, 0777); assert(fd>=0); for (i=0; i=0); // Now read it all backward - BREAD br = create_bread_from_fd_initialize_at(fd, off, 50); + BREAD br = create_bread_from_fd_initialize_at(fd); while (bread_has_more(br)) { - assert(i>0); - i--; - int sizen; - { int r = bread_backwards(br, &sizen, 4); assert(r==4); } - int sizeh=toku_ntohl(sizen); - assert(sizeh==sizes[i]); - assert(0<=sizeh && sizeh<100); - { - char rbuf[100]; - int r = bread_backwards(br, rbuf,sizeh); - assert(r==sizeh); - assert(memcmp(rbuf, &buf[i][0], sizes[i])==0); - } + assert(nwrote>0); + int to_read = 1+(random()%RECORDLEN); // read from 1 to 100 (if RECORDLEN is 100) + if (to_read>nwrote) to_read=nwrote; + char rbuf[to_read]; + int r = bread_backwards(br, rbuf, to_read); + assert(r==to_read); + assert(memcmp(rbuf, &wrotedata[nwrote-to_read], to_read)==0); + nwrote-=to_read; } - assert(i==0); + assert(nwrote==0); + { int r=close_bread_without_closing_fd(br); assert(r==0); } { int r=close(fd); assert(r==0); } unlink(FNAME); diff --git a/newbrt/tests/test1305.c b/newbrt/tests/test1305.c index f792bb35a5b..ffb91a115a8 100644 --- a/newbrt/tests/test1305.c +++ b/newbrt/tests/test1305.c @@ -8,6 +8,7 @@ #include #include #include +#include #include "../brttypes.h" #include "../bread.h" @@ -32,6 +33,8 @@ test (u_int64_t fsize) { int fd = open(FNAME, O_CREAT+O_RDWR+O_BINARY, 0777); assert(fd>=0); static u_int64_t buf[N_BIGINTS]; //windows cannot handle this on the stack + static char compressed_buf[N_BIGINTS*2 + 1000]; // this is more than compressbound returns + uLongf compressed_len; while (i*BIGINT_SIZE < fsize) { if (verbose>0 && i % (1<<25) == 0) { printf(" %s:test (%"PRIu64") forwards [%"PRIu64"%%]\n", __FILE__, fsize, 100*BIGINT_SIZE*((u_int64_t)i) / fsize); @@ -42,8 +45,31 @@ test (u_int64_t fsize) { for (j=0; j=0); - BREAD br = create_bread_from_fd_initialize_at(fd, fsize, READBACK_BUFSIZE); + BREAD br = create_bread_from_fd_initialize_at(fd); while (bread_has_more(br)) { if (verbose>0 && (fsize/BIGINT_SIZE - i) % (1<<25) == 0) { printf(" %s:test (%"PRIu64") backwards [%"PRIu64"%%]\n", __FILE__, fsize, 100*BIGINT_SIZE*((u_int64_t)i) / fsize);