mirror of
https://github.com/MariaDB/server.git
synced 2025-02-02 03:51:50 +01:00
Buffer the reading of the rollback file. Gains 5% on large transactions. Fixes #1002.
git-svn-id: file:///svn/tokudb@5005 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
parent
6a0efbf923
commit
674f22ce5e
9 changed files with 186 additions and 21 deletions
|
@ -50,9 +50,10 @@ BINS= brtdump \
|
|||
build default: bins libs tdb-recover tdb_logprint $(TEST_OFILES)
|
||||
cd tests;$(MAKE) build
|
||||
|
||||
# Put crc first to make it work right
|
||||
# Put crc first to make -combine work right
|
||||
BRT_SOURCES = \
|
||||
crc \
|
||||
bread \
|
||||
brt-serialize \
|
||||
brt-verify \
|
||||
brt \
|
||||
|
@ -88,7 +89,7 @@ tdb_logprint: $(OFILES)
|
|||
recover.o: log_header.h log-internal.h log.h brttypes.h kv-pair.h memory.h key.h cachetable.h
|
||||
tdb-recover: $(OFILES)
|
||||
|
||||
roll.o: log_header.h log-internal.h log.h brttypes.h kv-pair.h memory.h key.h cachetable.h omt.h
|
||||
roll.o: log_header.h log-internal.h log.h brttypes.h kv-pair.h memory.h key.h cachetable.h omt.h bread.h
|
||||
|
||||
log_code.o: log_header.h wbuf.h log-internal.h rbuf.h
|
||||
log_header.h: log_code.c
|
||||
|
|
62
newbrt/bread.c
Normal file
62
newbrt/bread.c
Normal file
|
@ -0,0 +1,62 @@
|
|||
/* Buffered read. */
|
||||
#include <assert.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/types.h>
|
||||
|
||||
#include "bread.h"
|
||||
#include "memory.h"
|
||||
|
||||
struct bread {
|
||||
off_t current_offset; // The current offset to be read (in the file). That offset includes anything that is unread in the buffer.
|
||||
int fd;
|
||||
size_t bufsize;
|
||||
char *buf; // A buffer of size bufsize;
|
||||
int bufoff; // The current offset buf.
|
||||
};
|
||||
|
||||
BREAD create_bread_from_fd_initialize_at(int fd, off_t filesize, size_t bufsize) {
|
||||
BREAD MALLOC(result);
|
||||
result->current_offset=filesize;
|
||||
result->fd=fd;
|
||||
result->bufoff=0;
|
||||
result->bufsize=bufsize;
|
||||
MALLOC_N(bufsize, result->buf);
|
||||
return result;
|
||||
}
|
||||
|
||||
int close_bread_without_closing_fd(BREAD br) {
|
||||
toku_free(br->buf);
|
||||
toku_free(br);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
ssize_t bread_backwards(BREAD br, void *vbuf, size_t nbytes) {
|
||||
char *buf=vbuf;
|
||||
ssize_t result=0;
|
||||
while (nbytes > 0) {
|
||||
assert(br->current_offset >= (off_t)nbytes);
|
||||
// read whatever we can out of the buffer.
|
||||
{
|
||||
size_t to_copy = ((size_t)br->bufoff >= nbytes) ? nbytes : (size_t)br->bufoff;
|
||||
memcpy(buf+nbytes-to_copy, &br->buf[br->bufoff-to_copy], to_copy);
|
||||
nbytes -= to_copy;
|
||||
br->current_offset -= to_copy;
|
||||
result += to_copy;
|
||||
br->bufoff -= to_copy;
|
||||
}
|
||||
if (nbytes>0) {
|
||||
assert(br->bufoff==0);
|
||||
size_t to_read = ((size_t)br->current_offset >= br->bufsize) ? br->bufsize : (size_t)br->current_offset;
|
||||
ssize_t r = pread(br->fd, br->buf, to_read, br->current_offset-to_read);
|
||||
assert(r==(ssize_t)to_read);
|
||||
br->bufoff = to_read;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
int bread_has_more(BREAD br) {
|
||||
return br->current_offset>0;
|
||||
}
|
25
newbrt/bread.h
Normal file
25
newbrt/bread.h
Normal file
|
@ -0,0 +1,25 @@
|
|||
#ifndef BREAD_H
|
||||
#define BREAD_H
|
||||
|
||||
// A BREAD reads a file backwards using buffered I/O. BREAD stands for Buffered Read or Backwards Read.
|
||||
// Conceivably, we could read forward too.
|
||||
// The buffered I/O is buffered using a large buffer (e.g., something like a megabyte).
|
||||
// If not for the large-buffer requirement, we could have used a FILE.
|
||||
|
||||
#include <sys/types.h>
|
||||
typedef struct bread *BREAD;
|
||||
|
||||
BREAD create_bread_from_fd_initialize_at(int fd, off_t filesize, size_t bufsize);
|
||||
// Effect: Given a file descriptor, fd, that points at a file of size filesize, create a BREAD.
|
||||
// Requires: The filesize must be correct. The fd must be an open fd.
|
||||
|
||||
int close_bread_without_closing_fd(BREAD);
|
||||
// Effect: Close the BREAD, but don't close the underlying fd.
|
||||
|
||||
ssize_t bread_backwards(BREAD, void *buf, size_t nbytes);
|
||||
// Read nbytes into buf, reading backwards.
|
||||
|
||||
int bread_has_more(BREAD);
|
||||
// Is there more to read?
|
||||
|
||||
#endif
|
|
@ -148,4 +148,4 @@ static inline char *fixup_fname(BYTESTRING *f) {
|
|||
return fname;
|
||||
}
|
||||
|
||||
int toku_read_rollback_backwards(int fd, off_t at, struct roll_entry **item, off_t *new_at);
|
||||
int toku_read_rollback_backwards(BREAD, struct roll_entry **item);
|
||||
|
|
|
@ -985,18 +985,15 @@ int toku_maybe_spill_rollbacks (TOKUTXN txn) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int toku_read_rollback_backwards(int fd, off_t at, struct roll_entry **item, off_t *new_at) {
|
||||
if (at==0) return -1;
|
||||
int toku_read_rollback_backwards(BREAD br, struct roll_entry **item) {
|
||||
u_int32_t nbytes_n; ssize_t sr;
|
||||
if ((sr=pread(fd, &nbytes_n, 4, at-4))!=4) { assert(sr<0); return errno; }
|
||||
if ((sr=bread_backwards(br, &nbytes_n, 4))!=4) { assert(sr<0); return errno; }
|
||||
u_int32_t n_bytes=ntohl(nbytes_n);
|
||||
assert(at>=n_bytes);
|
||||
unsigned char *buf = toku_malloc(n_bytes);
|
||||
if (buf==0) return errno;
|
||||
if ((sr=pread(fd, buf, n_bytes, at-n_bytes))!=(ssize_t)n_bytes) { assert(sr<0); return errno; }
|
||||
if ((sr=bread_backwards(br, buf, n_bytes-4))!=(ssize_t)n_bytes-4) { assert(sr<0); return errno; }
|
||||
int r = toku_parse_rollback(buf, n_bytes, item);
|
||||
if (r!=0) return r;
|
||||
(*new_at) -= n_bytes;
|
||||
toku_free(buf);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
#include "../include/db.h"
|
||||
#include "brttypes.h"
|
||||
#include "memory.h"
|
||||
#include "bread.h"
|
||||
|
||||
struct logbytes;
|
||||
struct logbytes {
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
#include "log-internal.h"
|
||||
#include "cachetable.h"
|
||||
#include "key.h"
|
||||
#include "bread.h"
|
||||
|
||||
// these flags control whether or not we send commit messages for
|
||||
// various operations
|
||||
|
@ -124,27 +125,34 @@ int toku_rollback_cmddelete (TXNID xid, FILENUM filenum, BYTESTRING key,TOKUTXN
|
|||
}
|
||||
|
||||
int toku_commit_fileentries (int fd, off_t filesize, TOKUTXN txn) {
|
||||
while (filesize>0) {
|
||||
int r;
|
||||
BREAD f = create_bread_from_fd_initialize_at(fd, filesize, 1<<20);
|
||||
int r=0;
|
||||
while (bread_has_more(f)) {
|
||||
struct roll_entry *item;
|
||||
r = toku_read_rollback_backwards(fd, filesize, &item, &filesize);
|
||||
if (r!=0) { return r; }
|
||||
r = toku_read_rollback_backwards(f, &item);
|
||||
if (r!=0) goto finish;
|
||||
r = toku_commit_rollback_item(txn, item);
|
||||
if (r!=0) { return r; }
|
||||
if (r!=0) goto finish;
|
||||
}
|
||||
return 0;
|
||||
finish:
|
||||
{ int r2 = close_bread_without_closing_fd(f); assert(r2==0); }
|
||||
return r;
|
||||
}
|
||||
|
||||
int toku_rollback_fileentries (int fd, off_t filesize, TOKUTXN txn) {
|
||||
while (filesize>0) {
|
||||
int r;
|
||||
BREAD f = create_bread_from_fd_initialize_at(fd, filesize, 1<<20);
|
||||
assert(f);
|
||||
int r=0;
|
||||
while (bread_has_more(f)) {
|
||||
struct roll_entry *item;
|
||||
r = toku_read_rollback_backwards(fd, filesize, &item, &filesize);
|
||||
if (r!=0) { return r; }
|
||||
r = toku_read_rollback_backwards(f, &item);
|
||||
if (r!=0) goto finish;
|
||||
r = toku_abort_rollback_item(txn, item);
|
||||
if (r!=0) { return r; }
|
||||
if (r!=0) goto finish;
|
||||
}
|
||||
return 0;
|
||||
finish:
|
||||
{ int r2 = close_bread_without_closing_fd(f); assert(r2==0); }
|
||||
return r;
|
||||
}
|
||||
|
||||
int toku_commit_rollinclude (BYTESTRING bs,TOKUTXN txn) {
|
||||
|
|
|
@ -42,6 +42,7 @@ CPPFLAGS += -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE -D_XOPEN_SOURCE=500 -I.
|
|||
# Put these one-per-line so that if we insert a new one the svn diff can understand it better.
|
||||
# Also keep them sorted.
|
||||
REGRESSION_TESTS = \
|
||||
bread-test \
|
||||
brt-serialize-test \
|
||||
brt-test \
|
||||
brt-test-cursor \
|
||||
|
|
70
newbrt/tests/bread-test.c
Normal file
70
newbrt/tests/bread-test.c
Normal file
|
@ -0,0 +1,70 @@
|
|||
#include <arpa/inet.h>
|
||||
#include <assert.h>
|
||||
#include <fcntl.h>
|
||||
#include <stdlib.h>
|
||||
#include <unistd.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
|
||||
#include "../bread.h"
|
||||
|
||||
#define FNAME "bread-test.data"
|
||||
|
||||
#define RECORDS 2
|
||||
|
||||
void test (int seed) {
|
||||
srandom(seed);
|
||||
unlink(FNAME);
|
||||
int i;
|
||||
char buf[RECORDS][100];
|
||||
int sizes[RECORDS];
|
||||
int sizesn[RECORDS];
|
||||
size_t off = 0;
|
||||
{
|
||||
int fd = creat(FNAME, 0777);
|
||||
assert(fd>=0);
|
||||
for (i=0; i<RECORDS; i++) {
|
||||
sizes[i] = random()%100;
|
||||
sizesn[i] = htonl(sizes[i]);
|
||||
int j;
|
||||
for (j=0; j<sizes[i]; j++) {
|
||||
buf[i][j]=random();
|
||||
}
|
||||
int r = write(fd, buf[i], sizes[i]);
|
||||
assert(r==sizes[i]);
|
||||
off+=r;
|
||||
r = write(fd, &sizesn[i], 4);
|
||||
assert(r==4);
|
||||
off+=4;
|
||||
}
|
||||
{ int r=close(fd); assert(r==0); }
|
||||
}
|
||||
int fd = open(FNAME, O_RDONLY); assert(fd>=0);
|
||||
// Now read it all backward
|
||||
BREAD br = create_bread_from_fd_initialize_at(fd, off, 50);
|
||||
while (bread_has_more(br)) {
|
||||
assert(i>0);
|
||||
i--;
|
||||
int sizen;
|
||||
{ int r = bread_backwards(br, &sizen, 4); assert(r==4); }
|
||||
int sizeh=ntohl(sizen);
|
||||
assert(sizeh==sizes[i]);
|
||||
assert(0<=sizeh && sizeh<100);
|
||||
{
|
||||
char rbuf[100];
|
||||
int r = bread_backwards(br, rbuf,sizeh);
|
||||
assert(r==sizeh);
|
||||
assert(memcmp(rbuf, &buf[i][0], sizes[i])==0);
|
||||
}
|
||||
}
|
||||
assert(i==0);
|
||||
{ int r=close_bread_without_closing_fd(br); assert(r==0); }
|
||||
{ int r=close(fd); assert(r==0); }
|
||||
unlink(FNAME);
|
||||
}
|
||||
|
||||
int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
|
||||
int i;
|
||||
for (i=0; i<10; i++) test(i);
|
||||
return 0;
|
||||
}
|
Loading…
Add table
Reference in a new issue