Group commit working in tokulogger. Need to release some locks in ydb. Addresses #484.

git-svn-id: file:///svn/tokudb@2763 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
Bradley C. Kuszmaul 2008-03-12 17:55:11 +00:00
parent ee2351986d
commit 0f29f9abe7
13 changed files with 433 additions and 223 deletions

View file

@ -8,7 +8,7 @@
# GCOV_FLAGS = -fprofile-arcs -ftest-coverage
# PROF_FLAGS = -pg
OPTFLAGS = -O2
# OPTFLAGS = -O2
ifeq ($(VERBOSE),2)
VERBVERBOSE=-v
@ -59,6 +59,10 @@ REGRESSION_TESTS = \
brt-test-cursor \
log-test \
log-test2 \
log-test3 \
log-test4 \
log-test5 \
log-test6 \
test_oexcl \
test-assert \
test-primes \
@ -87,7 +91,9 @@ recover: recover.o log_code.o memory.o log.o brt-serialize.o fifo.o pma.o ybt.o
roll.o: log_header.h log-internal.h log.h yerror.h brttypes.h kv-pair.h memory.h key.h cachetable.h pma.h
log_code.o: log_header.h wbuf.h log-internal.h
log_code.c log_header.h: logformat
log_header.h: log_code.c
@echo generated log_code.c so log_header.c was also generated
log_code.c: logformat
./logformat
libs: log.o
@ -135,7 +141,7 @@ check-fanout:
let BRT_FANOUT=BRT_FANOUT+1; \
done
log-test log-test2 pma-test benchmark-test brt-test brt-test3 brt-test4 brt-test-cursor test-brt-delete-both brt-serialize-test brtdump test-inc-split test-del-inorder: LDFLAGS+=-lz
log-test log-test2 log-test3 log-test4 log-test5 log-test6 pma-test benchmark-test brt-test brt-test3 brt-test4 brt-test-cursor test-brt-delete-both brt-serialize-test brtdump test-inc-split test-del-inorder: LDFLAGS+=-lz
# pma: PROF_FLAGS=-fprofile-arcs -ftest-coverage
BRT_INTERNAL_H_INCLUDES = brt-internal.h cachetable.h fifo.h pma.h brt.h brttypes.h yerror.h ybt.h log.h ../include/db.h kv-pair.h memory.h crc.h
@ -164,7 +170,7 @@ fifo-test: fifo.o memory.o toku_assert.o ybt.o
brt-serialize.o: $(BRT_INTERNAL_H_INCLUDES) key.h wbuf.h rbuf.h
brt-bigtest: memory.o ybt.o brt.o pma.o cachetable.o key.o fifo.o brt-serialize.o
brt-bigtest.o: brt.h ../include/db.h
log-test2 log-test: log.o memory.o toku_assert.o roll.o log_code.o brt-serialize.o brt.o cachetable.o pma.o ybt.o fifo.o key.o fingerprint.o brt-verify.o mempool.o primes.o
log-test6 log-test5 log-test4 log-test3 log-test2 log-test: log.o memory.o toku_assert.o roll.o log_code.o brt-serialize.o brt.o cachetable.o pma.o ybt.o fifo.o key.o fingerprint.o brt-verify.o mempool.o primes.o
brt-verify.o: $(BRT_INTERNAL_H_INCLUDES)
fingerprint.o: $(BRT_INTERNAL_H_INCLUDES)
toku_assert.o: toku_assert.h

View file

@ -95,7 +95,7 @@ static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE
// We only call this function if we have reason to believe that the child's fingerprint did change.
BNC_SUBTREE_FINGERPRINT(node,childnum_of_node)=sum;
node->dirty=1;
toku_log_changechildfingerprint(logger, toku_cachefile_filenum(brt->cf), node->thisnodename, childnum_of_node, old_fingerprint, sum);
toku_log_changechildfingerprint(logger, 0, toku_cachefile_filenum(brt->cf), node->thisnodename, childnum_of_node, old_fingerprint, sum);
toku_update_brtnode_loggerlsn(node, logger);
}
@ -227,7 +227,7 @@ static int malloc_diskblock_header_is_in_memory (DISKOFF *res, BRT brt, int size
DISKOFF result = brt->h->unused_memory;
brt->h->unused_memory+=size;
brt->h->dirty = 1;
int r = toku_log_changeunusedmemory(logger, toku_cachefile_filenum(brt->cf), result, brt->h->unused_memory);
int r = toku_log_changeunusedmemory(logger, 0, toku_cachefile_filenum(brt->cf), result, brt->h->unused_memory);
*res = result;
return r;
}
@ -295,7 +295,7 @@ int toku_create_new_brtnode (BRT t, BRTNODE *result, int height, TOKULOGGER logg
r=toku_cachetable_put(t->cf, n->thisnodename, n, brtnode_size(n),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t);
assert(r==0);
r=toku_log_newbrtnode(logger, toku_cachefile_filenum(t->cf), n->thisnodename, height, n->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, n->rand4fingerprint);
r=toku_log_newbrtnode(logger, 0, toku_cachefile_filenum(t->cf), n->thisnodename, height, n->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, n->rand4fingerprint);
assert(r==0);
toku_update_brtnode_loggerlsn(n, logger);
return 0;
@ -381,7 +381,7 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node
BNC_DISKOFF(B, targchild) = thischilddiskoff;
int r = toku_log_addchild(logger, fnum, B->thisnodename, targchild, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i));
int r = toku_log_addchild(logger, 0, fnum, B->thisnodename, targchild, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i));
if (r!=0) return r;
while (1) {
@ -400,9 +400,9 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node
u_int32_t new_from_fingerprint = old_from_fingerprint - node->rand4fingerprint*delta;
u_int32_t new_to_fingerprint = old_to_fingerprint + B->rand4fingerprint *delta;
if (r!=0) return r;
r = toku_log_brtdeq(logger, fnum, node->thisnodename, n_children_in_a, xid, type, keybs, databs, old_from_fingerprint, new_from_fingerprint);
r = toku_log_brtdeq(logger, 0, fnum, node->thisnodename, n_children_in_a, xid, type, keybs, databs, old_from_fingerprint, new_from_fingerprint);
if (r!=0) return r;
r = toku_log_brtenq(logger, fnum, B->thisnodename, targchild, xid, type, keybs, databs, old_to_fingerprint, new_to_fingerprint);
r = toku_log_brtenq(logger, 0, fnum, B->thisnodename, targchild, xid, type, keybs, databs, old_to_fingerprint, new_to_fingerprint);
r = toku_fifo_enq(to_htab, key, keylen, data, datalen, type, xid);
if (r!=0) return r;
toku_fifo_deq(from_htab);
@ -423,10 +423,10 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node
BYTESTRING bs = { .len = kv_pair_keylen(node->u.n.childkeys[i-1]),
.data = kv_pair_key(node->u.n.childkeys[i-1]) };
assert(i>0);
r = toku_log_delchild(logger, fnum, node->thisnodename, n_children_in_a, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i), bs);
r = toku_log_delchild(logger, 0, fnum, node->thisnodename, n_children_in_a, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i), bs);
if (r!=0) return r;
if (i>n_children_in_a) {
r = toku_log_setpivot(logger, fnum, B->thisnodename, targchild-1, bs);
r = toku_log_setpivot(logger, 0, fnum, B->thisnodename, targchild-1, bs);
if (r!=0) return r;
B->u.n.childkeys[targchild-1] = node->u.n.childkeys[i-1];
B->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, node->u.n.childkeys[i-1]);
@ -624,7 +624,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
for (cnum=node->u.n.n_children; cnum>childnum+1; cnum--) {
node->u.n.childinfos[cnum] = node->u.n.childinfos[cnum-1];
}
r = toku_log_addchild(logger, toku_cachefile_filenum(t->cf), node->thisnodename, childnum+1, childb->thisnodename, 0);
r = toku_log_addchild(logger, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum+1, childb->thisnodename, 0);
assert(BNC_DISKOFF(node, childnum)==childa->thisnodename);
BNC_DISKOFF(node, childnum+1) = childb->thisnodename;
BNC_SUBTREE_FINGERPRINT(node, childnum)=0;
@ -645,7 +645,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
struct kv_pair *pivot = childsplitk->data;
BYTESTRING bs = { .len = childsplitk->size,
.data = kv_pair_key(pivot) };
r = toku_log_setpivot(logger, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, bs);
r = toku_log_setpivot(logger, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, bs);
if (r!=0) return r;
for (cnum=node->u.n.n_children-1; cnum>childnum; cnum--) {
@ -1287,7 +1287,7 @@ static int setup_initial_brt_root_node (BRT t, DISKOFF offset, TOKULOGGER logger
}
toku_verify_counts(node);
// verify_local_fingerprint_nonleaf(node);
toku_log_newbrtnode(logger, toku_cachefile_filenum(t->cf), offset, 0, t->h->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, node->rand4fingerprint);
toku_log_newbrtnode(logger, 0, toku_cachefile_filenum(t->cf), offset, 0, t->h->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, node->rand4fingerprint);
toku_update_brtnode_loggerlsn(node, logger);
r = toku_unpin_brtnode(t, node);
if (r!=0) {
@ -1617,12 +1617,12 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk,
assert(r==0);
assert(newroot);
if (brt->database_name==0) {
toku_log_changeunnamedroot(logger, toku_cachefile_filenum(brt->cf), *rootp, newroot_diskoff);
toku_log_changeunnamedroot(logger, 0, toku_cachefile_filenum(brt->cf), *rootp, newroot_diskoff);
} else {
BYTESTRING bs;
bs.len = 1+strlen(brt->database_name);
bs.data = brt->database_name;
toku_log_changenamedroot(logger, toku_cachefile_filenum(brt->cf), bs, *rootp, newroot_diskoff);
toku_log_changenamedroot(logger, 0, toku_cachefile_filenum(brt->cf), bs, *rootp, newroot_diskoff);
}
*rootp=newroot_diskoff;
brt->h->dirty=1;
@ -1645,18 +1645,18 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk,
toku_verify_counts(newroot);
//verify_local_fingerprint_nonleaf(nodea);
//verify_local_fingerprint_nonleaf(nodeb);
r=toku_log_newbrtnode(logger, toku_cachefile_filenum(brt->cf), newroot_diskoff, new_height, new_nodesize, (brt->flags&TOKU_DB_DUPSORT)!=0, newroot->rand4fingerprint);
r=toku_log_newbrtnode(logger, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, new_height, new_nodesize, (brt->flags&TOKU_DB_DUPSORT)!=0, newroot->rand4fingerprint);
if (r!=0) return r;
r=toku_log_addchild(logger, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, nodea->thisnodename, 0);
r=toku_log_addchild(logger, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, nodea->thisnodename, 0);
if (r!=0) return r;
r=toku_log_addchild(logger, toku_cachefile_filenum(brt->cf), newroot_diskoff, 1, nodeb->thisnodename, 0);
r=toku_log_addchild(logger, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, 1, nodeb->thisnodename, 0);
if (r!=0) return r;
fixup_child_fingerprint(newroot, 0, nodea, brt, logger);
fixup_child_fingerprint(newroot, 1, nodeb, brt, logger);
{
BYTESTRING bs = { .len = kv_pair_keylen(newroot->u.n.childkeys[0]),
.data = kv_pair_key(newroot->u.n.childkeys[0]) };
r=toku_log_setpivot(logger, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, bs);
r=toku_log_setpivot(logger, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, bs);
if (r!=0) return r;
toku_update_brtnode_loggerlsn(newroot, logger);
}

View file

@ -5,24 +5,67 @@
#include "list.h"
#include "yerror.h"
#include <stdio.h>
#include <pthread.h>
#include <sys/types.h>
// Locking for the logger
// For most purposes we use the big ydb lock.
// To log: grab the buf lock
// If the buf would overflow, then grab the file lock, swap file&buf, release buf lock, write the file, write the entry, release the file lock
// else append to buf & release lock
#define LOGGER_BUF_SIZE (1<<24)
struct mylock {
pthread_mutex_t lock;
int is_locked;
};
static inline int ml_init(struct mylock *l) {
l->is_locked=0;
return pthread_mutex_init(&l->lock, 0);
}
static inline int ml_lock(struct mylock *l) {
int r = pthread_mutex_lock(&l->lock);
assert(l->is_locked==0);
l->is_locked=1;
return r;
}
static inline int ml_unlock(struct mylock *l) {
assert(l->is_locked==1);
l->is_locked=0;
return pthread_mutex_unlock(&l->lock);
}
static inline int ml_destroy(struct mylock *l) {
assert(l->is_locked==0);
return pthread_mutex_destroy(&l->lock);
}
struct tokulogger {
enum typ_tag tag; // must be first
struct mylock input_lock, output_lock; // acquired in that order
int is_open;
int is_panicked;
int panic_errno;
enum typ_tag tag;
char *directory;
int fd;
int n_in_file;
long long next_log_file_number;
LSN lsn;
char buf[LOGGER_BUF_SIZE];
int n_in_buf;
CACHETABLE ct;
struct list live_txns; // just a linked list. Should be a hashtable.
int lg_max; // The size of the single file in the log. Default is 100MB in TokuDB
// To access these, you must have the input lock
struct logbytes *head,*tail;
LSN lsn; // the next available lsn
struct list live_txns; // just a linked list. Should be a hashtable.
int n_in_buf;
// To access these, you must have the output lock
LSN written_lsn; // the last lsn written
LSN fsynced_lsn; // What is the LSN of the highest fsynced log entry
long long next_log_file_number;
char buf[LOGGER_BUF_SIZE]; // used to marshall logbytes so we can use only a single write
int n_in_file;
};
int toku_logger_find_next_unused_log_file(const char *directory, long long *result);
@ -51,7 +94,7 @@ struct tokutxn {
struct list live_txns_link;
};
int toku_logger_finish (TOKULOGGER logger, struct wbuf *wbuf);
int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbuf *wbuf, int do_fsync);
static inline int toku_logsizeof_u_int8_t (u_int32_t v __attribute__((__unused__))) {
return 1;

View file

@ -25,12 +25,23 @@ int main (int argc __attribute__((__unused__)),
r = toku_logger_open(dname, logger);
assert(r == 0);
{
LSN lsn={0};
char data[]="a1234";
r = toku_logger_log_bytes(logger, strlen(data), data, lsn);
struct logbytes *b = MALLOC_LOGBYTES(5);
b->nbytes=5;
memcpy(b->bytes, "a1234", 5);
b->lsn=(LSN){0};
r = ml_lock(&logger->input_lock);
assert(r==0);
r = toku_logger_log_bytes(logger, b, 0);
assert(r==0);
assert(logger->input_lock.is_locked==0);
}
r = toku_logger_close(&logger);
assert(r == 0);
{
struct stat statbuf;
r = stat(dname "/log000000000000.tokulog", &statbuf);
assert(r==0);
assert(statbuf.st_size==12+5);
}
return 0;
}

View file

@ -3,6 +3,7 @@
#include "log-internal.h"
#include "toku_assert.h"
#include <dirent.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/stat.h>
@ -33,25 +34,36 @@ int main (int argc __attribute__((__unused__)),
r = toku_logger_open(dname, logger);
assert(r == 0);
int i;
for (i=0; i<20; i++) {
r = ml_lock(&logger->LSN_lock);
for (i=0; i<1000; i++) {
r = ml_lock(&logger->input_lock);
assert(r==0);
LSN lsn={23};
char data[100];
snprintf(data, sizeof(data), "a%04d", i);
r = toku_logger_log_bytes(logger, strlen(data), data, lsn);
int ilen=3+random()%5;
struct logbytes *b = MALLOC_LOGBYTES(ilen+1);
b->nbytes=ilen+1;
snprintf(b->bytes, ilen+1, "a%0*d ", (int)ilen, i); // skip the trailing nul
b->lsn=(LSN){23+i};
r = toku_logger_log_bytes(logger, b, 0);
assert(r==0);
}
r = toku_logger_close(&logger);
assert(r == 0);
{
DIR *dir=opendir(dname);
assert(dir);
struct dirent *dirent;
while ((dirent=readdir(dir))) {
if (strncmp(dirent->d_name, "log", 3)!=0) continue;
char fname[sizeof(dname)+256+1];
snprintf(fname, sizeof(fname), "%s/%s", dname, dirent->d_name);
struct stat statbuf;
r = stat(dname "/log000000000000.tokulog", &statbuf);
r = stat(fname, &statbuf);
assert(r==0);
assert(statbuf.st_size<=LSIZE);
}
r = closedir(dir);
assert(r==0);
}
return 0;
}

71
newbrt/log-test6.c Normal file
View file

@ -0,0 +1,71 @@
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#include "log-internal.h"
#include "toku_assert.h"
#include <fcntl.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#define dname __FILE__ ".dir"
#define rmrf "rm -rf " dname "/"
// create and close, making sure that everything is deallocated properly.
#define LSIZE 100
int main (int argc __attribute__((__unused__)),
char *argv[] __attribute__((__unused__))) {
int r;
system(rmrf);
r = mkdir(dname, 0700); assert(r==0);
TOKULOGGER logger;
r = toku_logger_create(&logger);
assert(r == 0);
r = toku_logger_set_lg_max(logger, LSIZE);
{
u_int32_t n;
r = toku_logger_get_lg_max(logger, &n);
assert(n==LSIZE);
}
r = toku_logger_open(dname, logger);
assert(r == 0);
{
r = ml_lock(&logger->input_lock);
assert(r==0);
int lsize=LSIZE-12-2;
struct logbytes *b = MALLOC_LOGBYTES(lsize);
b->nbytes=lsize;
snprintf(b->bytes, lsize, "a%*d", LSIZE-12-2, 0);
b->lsn=(LSN){23};
r = toku_logger_log_bytes(logger, b, 0);
assert(r==0);
}
{
r = ml_lock(&logger->input_lock);
assert(r==0);
struct logbytes *b = MALLOC_LOGBYTES(2);
b->lsn=(LSN){24};
b->nbytes=2;
memcpy(b->bytes, "b1", 2);
r = toku_logger_log_bytes(logger, b, 0);
assert(r==0);
}
r = toku_logger_close(&logger);
assert(r == 0);
{
struct stat statbuf;
r = stat(dname "/log000000000000.tokulog", &statbuf);
assert(r==0);
assert(statbuf.st_size<=LSIZE);
}
return 0;
}

View file

@ -65,20 +65,74 @@ int toku_logger_find_logfiles (const char *directory, int *n_resultsp, char ***r
}
int toku_logger_create (TOKULOGGER *resultp) {
int r;
TAGMALLOC(TOKULOGGER, result);
if (result==0) return errno;
result->is_open=0;
result->is_panicked=0;
result->lg_max = 100<<20; // 100MB default
result->head = result->tail = 0;
result->lsn = result->written_lsn = result->fsynced_lsn = (LSN){0};
list_init(&result->live_txns);
result->n_in_buf=0;
result->n_in_file=0;
result->directory=0;
*resultp=result;
r = ml_init(&result->input_lock); if (r!=0) goto died0;
r = ml_init(&result->output_lock); if (r!=0) goto died1;
return 0;
died1:
ml_destroy(&result->input_lock);
died0:
toku_free(result);
return r;
}
void toku_logger_set_cachetable (TOKULOGGER tl, CACHETABLE ct) {
tl->ct = ct;
}
static int (*toku_os_fsync_function)(int)=fsync;
static const int log_format_version=0;
// Write something out. Keep trying even if partial writes occur.
// On error: Return negative with errno set.
// On success return nbytes.
static int write_it (int fd, const void *bufv, int nbytes) {
int org_nbytes=nbytes;
const char *buf=bufv;
while (nbytes>0) {
int r = write(fd, buf, nbytes);
if (r<0 || errno!=EAGAIN) return r;
buf+=r;
nbytes-=r;
}
return org_nbytes;
}
static int open_logfile (TOKULOGGER logger) {
int r;
int fnamelen = strlen(logger->directory)+50;
char fname[fnamelen];
snprintf(fname, fnamelen, "%s/log%012llu.tokulog", logger->directory, logger->next_log_file_number);
logger->fd = creat(fname, O_EXCL | 0700); if (logger->fd==-1) return errno;
logger->next_log_file_number++;
int version_l = htonl(log_format_version);
r = write_it(logger->fd, "tokulogg", 8); if (r!=8) return errno;
r = write_it(logger->fd, &version_l, 4); if (r!=4) return errno;
logger->fsynced_lsn = logger->written_lsn;
logger->n_in_file = 12;
return 0;
}
static int close_and_open_logfile (TOKULOGGER logger) {
int r;
r=toku_os_fsync_function(logger->fd); if (r!=0) return errno;
r = close(logger->fd); if (r!=0) return errno;
return open_logfile(logger);
}
int toku_logger_open (const char *directory, TOKULOGGER logger) {
if (logger->is_open) return EINVAL;
@ -86,22 +140,20 @@ int toku_logger_open (const char *directory, TOKULOGGER logger) {
int r;
long long nexti;
r = toku_logger_find_next_unused_log_file(directory, &nexti);
if (r!=0) {
died0:
return r;
}
if (r!=0) return r;
logger->directory = toku_strdup(directory);
if (logger->directory==0) goto died0;
logger->fd = -1;
if (logger->directory==0) return errno;
logger->next_log_file_number = nexti;
logger->n_in_buf = 0;
logger->n_in_file = 0;
open_logfile(logger);
logger->lsn.lsn = 0; // WRONG!!! This should actually be calculated by looking at the log file.
logger->written_lsn.lsn = 0;
logger->fsynced_lsn.lsn = 0;
logger->is_open = 1;
return toku_logger_log_bytes(logger, 0, "");
return 0;
}
void toku_logger_panic (TOKULOGGER logger, int err) {
@ -133,123 +185,157 @@ int toku_logger_get_lg_max(TOKULOGGER logger, u_int32_t *lg_maxp) {
}
static int flush (TOKULOGGER logger, int close_p) {
if (logger->n_in_buf>0) {
int r = write(logger->fd, logger->buf, logger->n_in_buf);
if (r==-1) return errno;
logger->n_in_file += logger->n_in_buf;
// Enter holding both locks
// Exit holding only the output_lock
static int do_write (TOKULOGGER logger, int do_fsync) {
int r;
struct logbytes *list = logger->head;
logger->head=logger->tail=0;
logger->n_in_buf=0;
r=ml_unlock(&logger->input_lock); if (r!=0) goto panic;
logger->n_in_buf=0;
while (list) {
if (logger->n_in_file + list->nbytes <= logger->lg_max) {
if (logger->n_in_buf + list->nbytes <= LOGGER_BUF_SIZE) {
memcpy(logger->buf+logger->n_in_buf, list->bytes, list->nbytes);
logger->n_in_buf+=list->nbytes;
logger->n_in_file+=list->nbytes;
logger->written_lsn = list->lsn;
struct logbytes *next=list->next;
toku_free(list);
list=next;
} else {
// it doesn't fit in the buffer, but it does fit in the file. So flush the buffer
r=write_it(logger->fd, logger->buf, logger->n_in_buf);
if (r!=logger->n_in_buf) { r=errno; goto panic; }
logger->n_in_buf=0;
// Special case for a log entry that's too big to fit in the buffer.
if (list->nbytes > LOGGER_BUF_SIZE) {
r=write_it(logger->fd, list->bytes, list->nbytes);
if (r!=list->nbytes) { r=errno; goto panic; }
logger->n_in_file+=list->nbytes;
logger->written_lsn = list->lsn;
struct logbytes *next=list->next;
toku_free(list);
list=next;
}
if (close_p || logger->n_in_file >= logger->lg_max) {
int r = close(logger->fd);
if (r!=0) return errno;
logger->fd=-1;
logger->n_in_file = 0;
}
} else {
// The new item doesn't fit in the file, so write the buffer, reopen the file, and try again
r=write_it(logger->fd, logger->buf, logger->n_in_buf);
logger->n_in_buf=0;
r=close_and_open_logfile(logger); if (r!=0) goto panic;
}
}
r=write_it(logger->fd, logger->buf, logger->n_in_buf);
if (r!=logger->n_in_buf) { r=errno; goto panic; }
logger->n_in_buf=0;
if (do_fsync) {
r = toku_os_fsync_function(logger->fd);
logger->fsynced_lsn = logger->written_lsn;
}
return 0;
panic:
toku_logger_panic(logger, r);
return r;
}
static int log_format_version=0;
int toku_logger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes) {
// enter holding input_lock
// exit holding no locks
int toku_logger_log_bytes (TOKULOGGER logger, struct logbytes *bytes, int do_fsync) {
int r;
if (logger->is_panicked) return EINVAL;
//fprintf(stderr, "%s:%d logging %d bytes\n", __FILE__, __LINE__, nbytes);
if (logger->fd==-1) {
int fnamelen = strlen(logger->directory)+50;
char fname[fnamelen];
snprintf(fname, fnamelen, "%s/log%012llu.tokulog", logger->directory, logger->next_log_file_number);
//fprintf(stderr, "%s:%d creat(%s, ...)\n", __FILE__, __LINE__, fname);
logger->fd = creat(fname, O_EXCL | 0700);
if (logger->fd==-1) return errno;
logger->next_log_file_number++;
int version_l = htonl(log_format_version);
int r = write(logger->fd, "tokulogg", 8); if (r!=8) return errno;
r = write(logger->fd, &version_l, 4); if (r!=4) return errno;
logger->n_in_buf += bytes->nbytes;
if (logger->tail) {
logger->tail->next=bytes;
} else {
logger->head = bytes;
}
if (logger->n_in_buf + nbytes > LOGGER_BUF_SIZE
|| logger->n_in_file + logger->n_in_buf + nbytes > logger->lg_max) {
//printf("flushing %d %d\n", logger->n_in_buf, logger->n_in_file);
int r=flush(logger, 1);
if (r!=0) return r;
if (nbytes>LOGGER_BUF_SIZE) {
r = write(logger->fd, bytes, nbytes);
if (r!=0) return errno;
logger->n_in_file = nbytes;
return flush(logger, 0);
logger->tail = bytes;
bytes->next = 0;
if (logger->n_in_buf >= LOGGER_BUF_SIZE || do_fsync) {
// We must flush it
r=ml_unlock(&logger->input_lock); if (r!=0) goto panic;
r=ml_lock(&logger->output_lock); if (r!=0) goto panic;
if (logger->written_lsn.lsn < bytes->lsn.lsn) {
// We found that our record has not yet been written, so we must write it, and everything else
r=ml_lock(&logger->input_lock); if (r!=0) goto panic;
r=do_write(logger, do_fsync); if (r!=0) goto panic;
} else {
/* Our LSN has been written. We have the output lock */
if (do_fsync && logger->fsynced_lsn.lsn > bytes->lsn.lsn) {
/* But we need to fsync it. */
r = toku_os_fsync_function(logger->fd);
logger->fsynced_lsn = logger->written_lsn;
}
//printf("saving %d\n", nbytes);
}
memcpy(logger->buf+logger->n_in_buf, bytes, nbytes);
logger->n_in_buf += nbytes;
r=ml_unlock(&logger->output_lock); if (r!=0) goto panic;
} else {
r=ml_unlock(&logger->input_lock); if (r!=0) goto panic;
}
return 0;
panic:
toku_logger_panic(logger, r);
return r;
}
static int (*toku_os_fsync_function)(int)=fsync;
// No locks held on entry
// No locks held on exit.
// No locks are needed, since you cannot legally close the log concurrently with doing anything else.
// But grab the locks just to be careful.
int toku_logger_close(TOKULOGGER *loggerp) {
TOKULOGGER logger = *loggerp;
if (logger->is_panicked) return EINVAL;
int r = 0;
if (!logger->is_open) goto is_closed;
r = ml_lock(&logger->output_lock); if (r!=0) goto panic;
r = ml_lock(&logger->input_lock); if (r!=0) goto panic;
r = do_write(logger, 1); if (r!=0) goto panic;
if (logger->fd!=-1) {
//printf("%s:%d closing log: n_in_buf=%d\n", __FILE__, __LINE__, logger->n_in_buf);
if (logger->n_in_buf>0) {
r = write(logger->fd, logger->buf, logger->n_in_buf);
if (r==-1) return errno;
r = close(logger->fd); if (r!=0) { r=errno; goto panic; }
}
r = toku_os_fsync_function(logger->fd);
if (r!=0) close(logger->fd);
else r = close(logger->fd);
}
toku_free(logger->directory);
logger->fd=-1;
r = ml_unlock(&logger->output_lock);
is_closed:
logger->is_panicked=1; // Just in case this might help.
if (logger->directory) toku_free(logger->directory);
toku_free(logger);
*loggerp=0;
return r;
panic:
toku_logger_panic(logger, r);
return r;
}
#if 0
int toku_logger_log_brt_remove (TOKULOGGER logger,
TXNID txnid,
diskoff diskoff,
unsigned char *key,
int keylen,
unsigned char *val,
int vallen) {
n
}
#endif
// Entry: Holds no locks
// Exit: Holds no locks
// This is the exported fsync used by ydb.c
int toku_logger_fsync (TOKULOGGER logger) {
//return 0;/// NO TXN
int r;
if (logger->is_panicked) return EINVAL;
int r=flush(logger, 0);
if (r!=0) return r;
if (logger->fd>=0) {
r = toku_os_fsync_function(logger->fd);
if (r!=0) return errno;
}
return 0;
r = ml_lock(&logger->output_lock); if (r!=0) goto panic;
r = ml_lock(&logger->input_lock); if (r!=0) goto panic;
r = do_write(logger, 1);
r = ml_unlock(&logger->output_lock); if (r!=0) goto panic;
panic:
toku_logger_panic(logger, r);
return r;
}
int toku_logger_finish (TOKULOGGER logger, struct wbuf *wbuf) {
// wbuf points into logbytes
int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbuf *wbuf, int do_fsync) {
if (logger->is_panicked) return EINVAL;
wbuf_int(wbuf, toku_crc32(0, wbuf->buf, wbuf->ndone));
wbuf_int(wbuf, 4+wbuf->ndone);
return toku_logger_log_bytes(logger, wbuf->ndone, wbuf->buf);
logbytes->nbytes=wbuf->ndone;
return toku_logger_log_bytes(logger, logbytes, do_fsync);
}
int toku_logger_commit (TOKUTXN txn, int nosync) {
// panic handled in log_commit
int r = toku_log_commit(txn->logger, txn->txnid64);
int r = toku_log_commit(txn->logger, (txn->parent==0) && !nosync, txn->txnid64); // exits holding neither of the tokulogger locks.
if (r!=0) goto free_and_return;
if (txn->parent==0) {
if (!nosync) {
r = toku_logger_fsync(txn->logger);
if (r!=0) toku_logger_panic(txn->logger, r);
}
goto free_and_return;
} else {
if (txn->parent!=0) {
// Append the list to the front.
if (txn->oldest_logentry) {
// There are some entries, so link them in.
@ -261,7 +347,8 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
}
txn->newest_logentry = txn->oldest_logentry = 0;
}
free_and_return: /*nothing*/;
free_and_return:
{
struct roll_entry *item;
while ((item=txn->newest_logentry)) {
txn->newest_logentry = item->prev;
@ -270,20 +357,22 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
}
list_remove(&txn->live_txns_link);
toku_free(txn);
}
return r;
}
int toku_logger_log_checkpoint (TOKULOGGER logger, LSN *lsn) {
if (logger->is_panicked) return EINVAL;
struct wbuf wbuf;
const int buflen =10;
unsigned char buf[buflen];
wbuf_init(&wbuf, buf, buflen);
const int buflen =18;
struct logbytes *lbytes = MALLOC_LOGBYTES(buflen);
if (lbytes==0) return errno;
wbuf_init(&wbuf, &lbytes->bytes[0], buflen);
wbuf_char(&wbuf, LT_CHECKPOINT);
wbuf_LSN (&wbuf, logger->lsn);
*lsn = logger->lsn;
*lsn = lbytes->lsn = logger->lsn;
logger->lsn.lsn++;
return toku_logger_log_bytes(logger, wbuf.ndone, wbuf.buf);
return toku_logger_finish(logger, lbytes, &wbuf, 1);
}
@ -304,7 +393,7 @@ int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, int mode) {
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
BYTESTRING bs = { .len=strlen(fname), .data = strdup(fname) };
int r = toku_log_fcreate (txn->logger, toku_txn_get_txnid(txn), bs, mode);
int r = toku_log_fcreate (txn->logger, 0, toku_txn_get_txnid(txn), bs, mode);
if (r!=0) return r;
r = toku_logger_save_rollback_fcreate(txn, bs);
return r;
@ -317,36 +406,12 @@ int toku_logger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum) {
BYTESTRING bs;
bs.len = strlen(fname);
bs.data = (char*)fname;
return toku_log_fopen (txn->logger, toku_txn_get_txnid(txn), bs, filenum);
return toku_log_fopen (txn->logger, 0, toku_txn_get_txnid(txn), bs, filenum);
}
int toku_logger_log_header (TOKUTXN txn, FILENUM filenum, struct brt_header *h) {
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
#if 0
LOGGEDBRTHEADER lh;
lh.size = toku_serialize_brt_header_size(h);
lh.flags = h->flags;
lh.nodesize = h->nodesize;
lh.freelist = h->freelist;
lh.unused_memory = h->unused_memory;
lh.n_named_roots = h->n_named_roots;
if (h->n_named_roots==-1) {
lh.u.one.root = h->unnamed_root;
} else {
int i;
MALLOC_N(h->n_named_roots, lh.u.many.names);
MALLOC_N(h->n_named_roots, lh.u.many.roots);
for (i=0; i<h->n_named_roots; i++) {
lh.u.many.names[i]=toku_strdup(h->names[i]);
lh.u.many.roots[i]=h->roots[i];
}
}
r = toku_log_fheader(txn, toku_txn_get_txnid(txn), filenum, lh);
toku_free(all_that_stuff);
return r;
#else
int subsize=toku_serialize_brt_header_size(h);
int buflen = (4 // firstlen
+ 1 //cmd
@ -356,34 +421,26 @@ int toku_logger_log_header (TOKUTXN txn, FILENUM filenum, struct brt_header *h)
+ subsize
+ 8 // crc & len
);
unsigned char *buf=toku_malloc(buflen); // alloc on heap because it might be big
struct logbytes *lbytes=MALLOC_LOGBYTES(buflen); // alloc on heap because it might be big
int r;
if (buf==0) return errno;
if (lbytes==0) return errno;
struct wbuf wbuf;
wbuf_init(&wbuf, buf, buflen);
r = ml_lock(&txn->logger->input_lock); if (r!=0) { txn->logger->is_panicked=1; txn->logger->panic_errno=r; return r; }
LSN lsn = txn->logger->lsn;
wbuf_init(&wbuf, &lbytes->bytes[0], buflen);
wbuf_int (&wbuf, buflen);
wbuf_char(&wbuf, LT_FHEADER);
wbuf_LSN (&wbuf, txn->logger->lsn);
wbuf_LSN (&wbuf, lsn);
lbytes->lsn = lsn;
txn->logger->lsn.lsn++;
wbuf_TXNID(&wbuf, txn->txnid64);
wbuf_FILENUM(&wbuf, filenum);
r=toku_serialize_brt_header_to_wbuf(&wbuf, h);
if (r!=0) return r;
r=toku_logger_finish(txn->logger, &wbuf);
toku_free(buf);
r=toku_logger_finish(txn->logger, lbytes, &wbuf, 0);
return r;
#endif
}
/*
int brtenv_checkpoint (BRTENV env) {
init the checkpointing lock
acquire_spinlock(&env->checkpointing);
release_spinlock(&env->checkpointing);
return -1;
}
*/
int toku_fread_u_int8_t_nocrclen (FILE *f, u_int8_t *v) {
int vi=fgetc(f);
if (vi==EOF) return -1;

View file

@ -9,10 +9,20 @@
#include "brttypes.h"
#include "kv-pair.h"
struct logbytes;
struct logbytes {
struct logbytes *next;
int nbytes;
LSN lsn;
char bytes[0];
};
#define MALLOC_LOGBYTES(n) toku_malloc(sizeof(struct logbytes)+n)
int toku_logger_create(TOKULOGGER */*resultp*/);
void toku_logger_set_cachetable (TOKULOGGER, CACHETABLE);
int toku_logger_open(const char */*directory*/, TOKULOGGER);
int toku_logger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes);
int toku_logger_log_bytes(TOKULOGGER logger, struct logbytes *bytes, int do_fsync);
int toku_logger_close(TOKULOGGER *logger);
int toku_logger_log_checkpoint (TOKULOGGER, LSN*);
void toku_logger_panic(TOKULOGGER, int/*err*/);

View file

@ -311,7 +311,7 @@ void generate_log_free(void) {
void generate_log_writer (void) {
DO_LOGTYPES(lt, ({
fprintf2(cf, hf, "int toku_log_%s (TOKULOGGER logger", lt->name);
fprintf2(cf, hf, "int toku_log_%s (TOKULOGGER logger, int do_fsync", lt->name);
DO_FIELDS(ft, lt, fprintf2(cf, hf, ", %s %s", ft->type, ft->name));
fprintf(hf, ");\n");
fprintf(cf, ") {\n");
@ -324,19 +324,20 @@ void generate_log_writer (void) {
fprintf(cf, " +8 // crc + len\n");
fprintf(cf, " );\n");
fprintf(cf, " struct wbuf wbuf;\n");
fprintf(cf, " char *buf = toku_malloc(buflen);\n");
fprintf(cf, " if (buf==0) return errno;\n");
fprintf(cf, " wbuf_init(&wbuf, buf, buflen);\n");
fprintf(cf, " struct logbytes *lbytes = MALLOC_LOGBYTES(buflen);\n");
fprintf(cf, " if (lbytes==0) return errno;\n");
fprintf(cf, " wbuf_init(&wbuf, &lbytes->bytes[0], buflen);\n");
fprintf(cf, " wbuf_int(&wbuf, buflen);\n");
fprintf(cf, " wbuf_char(&wbuf, '%c');\n", 0xff&lt->command_and_flags);
fprintf(cf, " wbuf_LSN(&wbuf, logger->lsn);\n");
fprintf(cf, " ml_lock(&logger->input_lock);\n");
fprintf(cf, " LSN lsn = logger->lsn;\n");
fprintf(cf, " wbuf_LSN(&wbuf, lsn);\n");
fprintf(cf, " lbytes->lsn = lsn;\n");
fprintf(cf, " logger->lsn.lsn++;\n");
DO_FIELDS(ft, lt,
fprintf(cf, " wbuf_%s(&wbuf, %s);\n", ft->type, ft->name));
fprintf(cf, " int r= toku_logger_finish(logger, &wbuf);\n");
fprintf(cf, " int r= toku_logger_finish(logger, lbytes, &wbuf, do_fsync);\n");
fprintf(cf, " assert(wbuf.ndone==buflen);\n");
fprintf(cf, " toku_free(buf);\n");
fprintf(cf, " return r;\n");
fprintf(cf, "}\n\n");
}));

View file

@ -369,7 +369,7 @@ static int pma_log_distribute (TOKULOGGER logger, FILENUM filenum, DISKOFF old_d
}
}
ipa.size=j;
int r=toku_log_pmadistribute(logger, filenum, old_diskoff, new_diskoff, ipa);
int r=toku_log_pmadistribute(logger, 0, filenum, old_diskoff, new_diskoff, ipa);
if (logger && oldnode_lsn) *oldnode_lsn = toku_logger_last_lsn(logger);
if (logger && newnode_lsn) *newnode_lsn = toku_logger_last_lsn(logger);
// if (0 && pma) {
@ -546,7 +546,7 @@ static int pma_resize_array(TOKULOGGER logger, FILENUM filenum, DISKOFF offset,
unsigned int oldN, n;
int r = pma_resize_array_nolog(pma, asksize, startz, &oldN, &n);
if (r!=0) return r;
toku_log_resizepma (logger, filenum, offset, oldN, n);
toku_log_resizepma (logger, 0, filenum, offset, oldN, n);
if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger);
return 0;
}
@ -734,7 +734,7 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKULOGGER logger, TXNID xid, FILE
{
const BYTESTRING key = { pair->keylen, kv_pair_key(pair) };
const BYTESTRING data = { pair->vallen, kv_pair_val(pair) };
int r = toku_log_insertinleaf (logger, xid, pma->filenum, diskoff, idx, key, data);
int r = toku_log_insertinleaf (logger, 0, xid, pma->filenum, diskoff, idx, key, data);
if (r!=0) return r;
if (node_lsn) *node_lsn = toku_logger_last_lsn(logger);
}
@ -772,7 +772,7 @@ static int pma_log_delete (PMA pma, const char *key, int keylen, const char *val
{
const BYTESTRING deletedkey = { keylen, (char*)key };
const BYTESTRING deleteddata = { vallen, (char*)val };
int r=toku_log_deleteinleaf(logger, xid, pma->filenum, diskoff, idx, deletedkey, deleteddata);
int r=toku_log_deleteinleaf(logger, 0, xid, pma->filenum, diskoff, idx, deletedkey, deleteddata);
if (r!=0) return r;
}
if (logger) {
@ -945,7 +945,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
{
const BYTESTRING key = { k->size, k->data };
const BYTESTRING data = { v->size, v->data };
r = toku_log_insertinleaf (logger, xid, pma->filenum, diskoff, idx, key, data);
r = toku_log_insertinleaf (logger, 0, xid, pma->filenum, diskoff, idx, key, data);
if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger);
if (r!=0) return r;
/* We don't record the insert here for rollback. The insert should have been logged at the top-level. */
@ -1105,7 +1105,7 @@ int toku_pma_split(TOKULOGGER logger, FILENUM filenum,
{
int r = pma_log_distribute(logger, filenum, diskoff, diskoff, spliti, &pairs[0], lsn, lsn);
if (r!=0) { toku_free(pairs); return r; }
r = toku_log_resizepma(logger, filenum, diskoff, oldn_for_logging, newn_for_logging);
r = toku_log_resizepma(logger, 0, filenum, diskoff, oldn_for_logging, newn_for_logging);
if (r!=0) { toku_free(pairs); return r; }
if (logger && lsn) *lsn = toku_logger_last_lsn(logger);

View file

@ -27,14 +27,11 @@ SHARED=-shared $(EXPORTMAP)
RPATHNAME=
endif
.PHONY: install logformat
install: logformat locktree $(LIBRARY) $(LIBNAME).a
.PHONY: install
install: locktree $(LIBRARY) $(LIBNAME).a
cp $(LIBRARY) ../lib/
cp $(LIBNAME).a ../lib
logformat:
(cd ../newbrt && make)
locktree:
cd lock_tree && make

View file

@ -29,7 +29,7 @@ else
BDB_LDFLAGS += -Wl,-rpath,$(BDBDIR)/lib
endif
BDB_LDFLAGS += -lpthread
TDB_LOADLIBES = -L../ -ltokudb -Wl,-rpath,.. -lpthread
TDB_LOADLIBES = -L.. -ltokudb -Wl,-rpath,.. -lpthread
VGRIND=valgrind --quiet --error-exitcode=1 --leak-check=yes
HGRIND=valgrind --quiet --tool=helgrind
endif

View file

@ -711,25 +711,27 @@ static int toku_txn_release_locks(DB_TXN* txn) {
}
static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) {
if (!txn) return EINVAL;
HANDLE_PANICKED_ENV(txn->mgrp);
//toku_ydb_notef("flags=%d\n", flags);
int r;
int nosync = (flags & DB_TXN_NOSYNC)!=0;
flags &= ~DB_TXN_NOSYNC;
if (!txn) return EINVAL;
if (flags!=0) goto return_invalid;
r = toku_logger_commit(txn->i->tokutxn, nosync);
if (0) {
return_invalid:
r = EINVAL;
if (flags!=0) {
if (txn->i) {
if (txn->i->tokutxn)
toku_free(txn->i->tokutxn);
toku_free(txn->i);
}
// Cleanup */
int r2 = toku_txn_release_locks(txn);
toku_free(txn);
return EINVAL;
}
int r = toku_logger_commit(txn->i->tokutxn, nosync); // frees the tokutxn
int r2 = toku_txn_release_locks(txn); // release the locks after the commit (otherwise, what if we abort)
// the toxutxn is freed, and we must free the rest. */
if (txn->i)
toku_free(txn->i);
toku_free(txn);
return r ? r : r2; // The txn is no good after the commit.
return r2 ? r2 : r; // The txn is no good after the commit even if the commit fails.
}
static u_int32_t toku_txn_id(DB_TXN * txn) {