mirror of
https://github.com/MariaDB/server.git
synced 2025-01-21 22:34:18 +01:00
git-svn-id: file:///svn/toku/tokudb@51238 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
parent
c4a8d4434c
commit
12ccf6ae0a
21 changed files with 660 additions and 143 deletions
189
ft/dbufio.cc
189
ft/dbufio.cc
|
@ -11,6 +11,9 @@
|
|||
#include <unistd.h>
|
||||
#include "memory.h"
|
||||
#include <string.h>
|
||||
#include "ftloader-internal.h"
|
||||
#include "ft-internal.h"
|
||||
#include "ft.h"
|
||||
|
||||
struct dbufio_file {
|
||||
// i/o thread owns these
|
||||
|
@ -18,7 +21,7 @@ struct dbufio_file {
|
|||
|
||||
// consumers own these
|
||||
size_t offset_in_buf;
|
||||
toku_off_t offset_in_file;
|
||||
toku_off_t offset_in_uncompressed_file;
|
||||
|
||||
// need the mutex to modify these
|
||||
struct dbufio_file *next;
|
||||
|
@ -49,6 +52,7 @@ struct dbufio_fileset {
|
|||
size_t bufsize; // the bufsize is the constant (the same for all buffers).
|
||||
|
||||
bool panic;
|
||||
bool compressed;
|
||||
int panic_errno;
|
||||
toku_pthread_t iothread;
|
||||
};
|
||||
|
@ -75,6 +79,162 @@ static bool paniced (DBUFIO_FILESET bfs) {
|
|||
return bfs->panic;
|
||||
}
|
||||
|
||||
static ssize_t dbf_read_some_compressed(struct dbufio_file *dbf, char *buf, size_t bufsize) {
|
||||
ssize_t ret;
|
||||
invariant(bufsize >= MAX_UNCOMPRESSED_BUF);
|
||||
unsigned char *raw_block = NULL;
|
||||
|
||||
// deserialize the sub block header
|
||||
|
||||
// total_size
|
||||
// num_sub_blocks
|
||||
// compressed_size,uncompressed_size,xsum (repeated num_sub_blocks times)
|
||||
ssize_t readcode;
|
||||
const uint32_t header_size = sizeof(uint32_t);
|
||||
char header[header_size];
|
||||
|
||||
readcode = toku_os_read(dbf->fd, &header, header_size);
|
||||
if (readcode < 0) {
|
||||
ret = -1;
|
||||
goto exit;
|
||||
}
|
||||
if (readcode == 0) {
|
||||
ret = 0;
|
||||
goto exit;
|
||||
}
|
||||
if (readcode < header_size) {
|
||||
errno = TOKUDB_NO_DATA;
|
||||
ret = -1;
|
||||
goto exit;
|
||||
}
|
||||
uint32_t total_size;
|
||||
{
|
||||
uint32_t *p = (uint32_t *) &header[0];
|
||||
total_size = toku_dtoh32(p[0]);
|
||||
}
|
||||
if (total_size == 0 || total_size > (1<<30)) {
|
||||
errno = toku_db_badformat();
|
||||
ret = -1;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
//Cannot use XMALLOC
|
||||
MALLOC_N(total_size, raw_block);
|
||||
if (raw_block == nullptr) {
|
||||
errno = ENOMEM;
|
||||
ret = -1;
|
||||
goto exit;
|
||||
}
|
||||
readcode = toku_os_read(dbf->fd, raw_block, total_size);
|
||||
if (readcode < 0) {
|
||||
ret = -1;
|
||||
goto exit;
|
||||
}
|
||||
if (readcode < total_size) {
|
||||
errno = TOKUDB_NO_DATA;
|
||||
ret = -1;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
struct sub_block sub_block[max_sub_blocks];
|
||||
uint32_t *sub_block_header;
|
||||
sub_block_header = (uint32_t *) &raw_block[0];
|
||||
int32_t n_sub_blocks;
|
||||
n_sub_blocks = toku_dtoh32(sub_block_header[0]);
|
||||
sub_block_header++;
|
||||
size_t size_subblock_header;
|
||||
size_subblock_header = sub_block_header_size(n_sub_blocks);
|
||||
if (n_sub_blocks == 0 || n_sub_blocks > max_sub_blocks || size_subblock_header > total_size) {
|
||||
errno = toku_db_badformat();
|
||||
ret = -1;
|
||||
goto exit;
|
||||
}
|
||||
for (int i = 0; i < n_sub_blocks; i++) {
|
||||
sub_block_init(&sub_block[i]);
|
||||
sub_block[i].compressed_size = toku_dtoh32(sub_block_header[0]);
|
||||
sub_block[i].uncompressed_size = toku_dtoh32(sub_block_header[1]);
|
||||
sub_block[i].xsum = toku_dtoh32(sub_block_header[2]);
|
||||
sub_block_header += 3;
|
||||
}
|
||||
|
||||
// verify sub block sizes
|
||||
size_t total_compressed_size;
|
||||
total_compressed_size = 0;
|
||||
for (int i = 0; i < n_sub_blocks; i++) {
|
||||
uint32_t compressed_size = sub_block[i].compressed_size;
|
||||
if (compressed_size<=0 || compressed_size>(1<<30)) {
|
||||
errno = toku_db_badformat();
|
||||
ret = -1;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
uint32_t uncompressed_size = sub_block[i].uncompressed_size;
|
||||
if (uncompressed_size<=0 || uncompressed_size>(1<<30)) {
|
||||
errno = toku_db_badformat();
|
||||
ret = -1;
|
||||
goto exit;
|
||||
}
|
||||
total_compressed_size += compressed_size;
|
||||
}
|
||||
if (total_size != total_compressed_size + size_subblock_header) {
|
||||
errno = toku_db_badformat();
|
||||
ret = -1;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
// sum up the uncompressed size of the sub blocks
|
||||
size_t uncompressed_size;
|
||||
uncompressed_size = get_sum_uncompressed_size(n_sub_blocks, sub_block);
|
||||
if (uncompressed_size > bufsize || uncompressed_size > MAX_UNCOMPRESSED_BUF) {
|
||||
errno = toku_db_badformat();
|
||||
ret = -1;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
unsigned char *uncompressed_data;
|
||||
uncompressed_data = (unsigned char *)buf;
|
||||
|
||||
// point at the start of the compressed data (past the node header, the sub block header, and the header checksum)
|
||||
unsigned char *compressed_data;
|
||||
compressed_data = raw_block + size_subblock_header;
|
||||
|
||||
// decompress all the compressed sub blocks into the uncompressed buffer
|
||||
{
|
||||
int r;
|
||||
r = decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, get_num_cores(), get_ft_pool());
|
||||
if (r != 0) {
|
||||
fprintf(stderr, "%s:%d loader failed %d at %p size %" PRIu32"\n", __FUNCTION__, __LINE__, r, raw_block, total_size);
|
||||
dump_bad_block(raw_block, total_size);
|
||||
errno = r;
|
||||
ret = -1;
|
||||
goto exit;
|
||||
}
|
||||
}
|
||||
ret = uncompressed_size;
|
||||
exit:
|
||||
if (raw_block) {
|
||||
toku_free(raw_block);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
static ssize_t dbf_read_compressed(struct dbufio_file *dbf, char *buf, size_t bufsize) {
|
||||
invariant(bufsize >= MAX_UNCOMPRESSED_BUF);
|
||||
size_t count = 0;
|
||||
|
||||
while (count + MAX_UNCOMPRESSED_BUF <= bufsize) {
|
||||
ssize_t readcode = dbf_read_some_compressed(dbf, buf + count, bufsize - count);
|
||||
if (readcode < 0) {
|
||||
return readcode;
|
||||
}
|
||||
count += readcode;
|
||||
if (readcode == 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
static void* io_thread (void *v)
|
||||
// The dbuf_thread does all the asynchronous I/O.
|
||||
{
|
||||
|
@ -118,7 +278,13 @@ static void* io_thread (void *v)
|
|||
toku_mutex_unlock(&bfs->mutex);
|
||||
//printf("%s:%d Doing read fd=%d\n", __FILE__, __LINE__, dbf->fd);
|
||||
{
|
||||
ssize_t readcode = toku_os_read(dbf->fd, dbf->buf[1], bfs->bufsize);
|
||||
ssize_t readcode;
|
||||
if (bfs->compressed) {
|
||||
readcode = dbf_read_compressed(dbf, dbf->buf[1], bfs->bufsize);
|
||||
}
|
||||
else {
|
||||
readcode = toku_os_read(dbf->fd, dbf->buf[1], bfs->bufsize);
|
||||
}
|
||||
//printf("%s:%d readcode=%ld\n", __FILE__, __LINE__, readcode);
|
||||
if (readcode==-1) {
|
||||
// a real error. Save the real error.
|
||||
|
@ -159,11 +325,14 @@ static void* io_thread (void *v)
|
|||
}
|
||||
}
|
||||
|
||||
int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t bufsize) {
|
||||
int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t bufsize, bool compressed) {
|
||||
//printf("%s:%d here\n", __FILE__, __LINE__);
|
||||
int result = 0;
|
||||
DBUFIO_FILESET CALLOC(bfs);
|
||||
if (bfs==0) { result = get_error_errno(); }
|
||||
|
||||
bfs->compressed = compressed;
|
||||
|
||||
bool mutex_inited = false, cond_inited = false;
|
||||
if (result==0) {
|
||||
CALLOC_N(N, bfs->files);
|
||||
|
@ -190,7 +359,7 @@ int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t b
|
|||
for (int i=0; i<N; i++) {
|
||||
bfs->files[i].fd = fds[i];
|
||||
bfs->files[i].offset_in_buf = 0;
|
||||
bfs->files[i].offset_in_file = 0;
|
||||
bfs->files[i].offset_in_uncompressed_file = 0;
|
||||
bfs->files[i].next = NULL;
|
||||
bfs->files[i].second_buf_ready = false;
|
||||
for (int j=0; j<2; j++) {
|
||||
|
@ -202,8 +371,14 @@ int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t b
|
|||
bfs->files[i].error_code[j] = 0;
|
||||
}
|
||||
bfs->files[i].io_done = false;
|
||||
ssize_t r;
|
||||
if (bfs->compressed) {
|
||||
r = dbf_read_compressed(&bfs->files[i], bfs->files[i].buf[0], bufsize);
|
||||
} else {
|
||||
//TODO: 5663 If compressed need to read differently
|
||||
r = toku_os_read(bfs->files[i].fd, bfs->files[i].buf[0], bufsize);
|
||||
}
|
||||
{
|
||||
ssize_t r = toku_os_read(bfs->files[i].fd, bfs->files[i].buf[0], bufsize);
|
||||
if (r<0) {
|
||||
result=get_error_errno();
|
||||
break;
|
||||
|
@ -297,7 +472,7 @@ int dbufio_fileset_read (DBUFIO_FILESET bfs, int filenum, void *buf_v, size_t co
|
|||
// Enough data is present to do it all now
|
||||
memcpy(buf, dbf->buf[0]+dbf->offset_in_buf, count);
|
||||
dbf->offset_in_buf += count;
|
||||
dbf->offset_in_file += count;
|
||||
dbf->offset_in_uncompressed_file += count;
|
||||
*n_read = count;
|
||||
return 0;
|
||||
} else if (dbf->n_in_buf[0] > dbf->offset_in_buf) {
|
||||
|
@ -306,7 +481,7 @@ int dbufio_fileset_read (DBUFIO_FILESET bfs, int filenum, void *buf_v, size_t co
|
|||
assert(dbf->offset_in_buf + this_count <= bfs->bufsize);
|
||||
memcpy(buf, dbf->buf[0]+dbf->offset_in_buf, this_count);
|
||||
dbf->offset_in_buf += this_count;
|
||||
dbf->offset_in_file += this_count;
|
||||
dbf->offset_in_uncompressed_file += this_count;
|
||||
size_t sub_n_read;
|
||||
int r = dbufio_fileset_read(bfs, filenum, buf+this_count, count-this_count, &sub_n_read);
|
||||
if (r==0) {
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
/* An implementation would typically use a separate thread or asynchronous I/O to fetch ahead data for each file. The system will typically fill two buffers of size M for each file. One buffer is being read out of using dbuf_read(), and the other buffer is either empty (waiting on the asynchronous I/O to start), being filled in by the asynchronous I/O mechanism, or is waiting for the caller to read data from it. */
|
||||
typedef struct dbufio_fileset *DBUFIO_FILESET;
|
||||
|
||||
int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t bufsize);
|
||||
int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t bufsize, bool compressed);
|
||||
|
||||
int destroy_dbufio_fileset(DBUFIO_FILESET);
|
||||
|
||||
|
|
3
ft/ft.h
3
ft/ft.h
|
@ -110,4 +110,7 @@ void toku_ft_set_blackhole(FT_HANDLE ft_handle);
|
|||
// The difference between the two is MVCC garbage.
|
||||
void toku_ft_get_garbage(FT ft, uint64_t *total_space, uint64_t *used_space);
|
||||
|
||||
int get_num_cores(void);
|
||||
struct toku_thread_pool *get_ft_pool(void);
|
||||
void dump_bad_block(unsigned char *vp, uint64_t size);
|
||||
#endif
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
#include <portability/toku_atomic.h>
|
||||
#include <util/sort.h>
|
||||
#include <util/threadpool.h>
|
||||
#include "ft.h"
|
||||
|
||||
static FT_UPGRADE_STATUS_S ft_upgrade_status;
|
||||
|
||||
|
@ -57,6 +58,14 @@ static inline void do_toku_trace(const char *cp, int len) {
|
|||
static int num_cores = 0; // cache the number of cores for the parallelization
|
||||
static struct toku_thread_pool *ft_pool = NULL;
|
||||
|
||||
int get_num_cores(void) {
|
||||
return num_cores;
|
||||
}
|
||||
|
||||
struct toku_thread_pool *get_ft_pool(void) {
|
||||
return ft_pool;
|
||||
}
|
||||
|
||||
void
|
||||
toku_ft_serialize_layer_init(void) {
|
||||
num_cores = toku_os_get_number_active_processors();
|
||||
|
@ -980,7 +989,7 @@ deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf,
|
|||
|
||||
// dump a buffer to stderr
|
||||
// no locking around this for now
|
||||
static void
|
||||
void
|
||||
dump_bad_block(unsigned char *vp, uint64_t size) {
|
||||
const uint64_t linesize = 64;
|
||||
uint64_t n = size / linesize;
|
||||
|
@ -1181,14 +1190,7 @@ read_and_decompress_sub_block(struct rbuf *rb, struct sub_block *sb)
|
|||
goto exit;
|
||||
}
|
||||
|
||||
sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size);
|
||||
|
||||
toku_decompress(
|
||||
(Bytef *) sb->uncompressed_ptr,
|
||||
sb->uncompressed_size,
|
||||
(Bytef *) sb->compressed_ptr,
|
||||
sb->compressed_size
|
||||
);
|
||||
just_decompress_sub_block(sb);
|
||||
exit:
|
||||
return r;
|
||||
}
|
||||
|
|
|
@ -12,6 +12,19 @@
|
|||
#include <toku_pthread.h>
|
||||
#include "dbufio.h"
|
||||
|
||||
enum { EXTRACTOR_QUEUE_DEPTH = 2,
|
||||
FILE_BUFFER_SIZE = 1<<24,
|
||||
MIN_ROWSET_MEMORY = 1<<23,
|
||||
MIN_MERGE_FANIN = 2,
|
||||
FRACTAL_WRITER_QUEUE_DEPTH = 3,
|
||||
FRACTAL_WRITER_ROWSETS = FRACTAL_WRITER_QUEUE_DEPTH + 2,
|
||||
DBUFIO_DEPTH = 2,
|
||||
TARGET_MERGE_BUF_SIZE = 1<<24, // we'd like the merge buffer to be this big.
|
||||
MIN_MERGE_BUF_SIZE = 1<<20, // always use at least this much
|
||||
MAX_UNCOMPRESSED_BUF = MIN_MERGE_BUF_SIZE
|
||||
};
|
||||
|
||||
|
||||
/* These functions are exported to allow the tests to compile. */
|
||||
|
||||
/* These structures maintain a collection of all the open temporary files used by the loader. */
|
||||
|
@ -56,7 +69,7 @@ int init_rowset (struct rowset *rows, uint64_t memory_budget);
|
|||
void destroy_rowset (struct rowset *rows);
|
||||
int add_row (struct rowset *rows, DBT *key, DBT *val);
|
||||
|
||||
int loader_write_row(DBT *key, DBT *val, FIDX data, FILE*, uint64_t *dataoff, FTLOADER bl);
|
||||
int loader_write_row(DBT *key, DBT *val, FIDX data, FILE*, uint64_t *dataoff, struct wbuf *wb, FTLOADER bl);
|
||||
int loader_read_row (FILE *f, DBT *key, DBT *val);
|
||||
|
||||
struct merge_fileset {
|
||||
|
@ -146,6 +159,7 @@ struct ft_loader_s {
|
|||
|
||||
CACHETABLE cachetable;
|
||||
bool did_reserve_memory;
|
||||
bool compress_intermediates;
|
||||
uint64_t reserved_memory; // how much memory are we allowed to use?
|
||||
|
||||
/* To make it easier to recover from errors, we don't use FILE*, instead we use an index into the file_infos. */
|
||||
|
@ -243,7 +257,8 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
|
|||
const char *temp_file_template,
|
||||
LSN load_lsn,
|
||||
TOKUTXN txn,
|
||||
bool reserve_memory);
|
||||
bool reserve_memory,
|
||||
bool compress_intermediates);
|
||||
|
||||
void toku_ft_loader_internal_destroy (FTLOADER bl, bool is_error);
|
||||
|
||||
|
|
263
ft/ftloader.cc
263
ft/ftloader.cc
|
@ -47,18 +47,6 @@ static uint32_t size_factor = 1024;
|
|||
static uint32_t default_loader_nodesize = FT_DEFAULT_NODE_SIZE;
|
||||
static uint32_t default_loader_basementnodesize = FT_DEFAULT_BASEMENT_NODE_SIZE;
|
||||
|
||||
enum { EXTRACTOR_QUEUE_DEPTH = 2,
|
||||
FILE_BUFFER_SIZE = 1<<24,
|
||||
MIN_ROWSET_MEMORY = 1<<23,
|
||||
MIN_MERGE_FANIN = 2,
|
||||
FRACTAL_WRITER_QUEUE_DEPTH = 3,
|
||||
FRACTAL_WRITER_ROWSETS = FRACTAL_WRITER_QUEUE_DEPTH + 2,
|
||||
DBUFIO_DEPTH = 2,
|
||||
TARGET_MERGE_BUF_SIZE = 1<<24, // we'd like the merge buffer to be this big.
|
||||
MIN_MERGE_BUF_SIZE = 1<<20 // always use at least this much
|
||||
};
|
||||
|
||||
|
||||
void
|
||||
toku_ft_loader_set_size_factor(uint32_t factor) {
|
||||
// For test purposes only
|
||||
|
@ -468,7 +456,8 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
|
|||
const char *temp_file_template,
|
||||
LSN load_lsn,
|
||||
TOKUTXN txn,
|
||||
bool reserve_memory)
|
||||
bool reserve_memory,
|
||||
bool compress_intermediates)
|
||||
// Effect: Allocate and initialize a FTLOADER, but do not create the extractor thread.
|
||||
{
|
||||
FTLOADER CALLOC(bl); // initialized to all zeros (hence CALLOC)
|
||||
|
@ -484,6 +473,7 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
|
|||
bl->did_reserve_memory = false;
|
||||
bl->reserved_memory = 512*1024*1024; // if no cache table use 512MB.
|
||||
}
|
||||
bl->compress_intermediates = compress_intermediates;
|
||||
//printf("Reserved memory=%ld\n", bl->reserved_memory);
|
||||
|
||||
bl->src_db = src_db;
|
||||
|
@ -570,7 +560,8 @@ int toku_ft_loader_open (/* out */ FTLOADER *blp,
|
|||
const char *temp_file_template,
|
||||
LSN load_lsn,
|
||||
TOKUTXN txn,
|
||||
bool reserve_memory)
|
||||
bool reserve_memory,
|
||||
bool compress_intermediates)
|
||||
/* Effect: called by DB_ENV->create_loader to create a brt loader.
|
||||
* Arguments:
|
||||
* blp Return the brt loader here.
|
||||
|
@ -592,7 +583,8 @@ int toku_ft_loader_open (/* out */ FTLOADER *blp,
|
|||
temp_file_template,
|
||||
load_lsn,
|
||||
txn,
|
||||
reserve_memory);
|
||||
reserve_memory,
|
||||
compress_intermediates);
|
||||
if (r!=0) result = r;
|
||||
}
|
||||
if (result==0) {
|
||||
|
@ -608,8 +600,12 @@ int toku_ft_loader_open (/* out */ FTLOADER *blp,
|
|||
return result;
|
||||
}
|
||||
|
||||
static void ft_loader_set_panic(FTLOADER bl, int error, bool callback) {
|
||||
int r = ft_loader_set_error(&bl->error_callback, error, NULL, 0, NULL, NULL);
|
||||
static void ft_loader_set_panic(FTLOADER bl, int error, bool callback, int which_db, DBT *key, DBT *val) {
|
||||
DB *db = nullptr;
|
||||
if (bl && bl->dbs && which_db >= 0 && which_db < bl->N) {
|
||||
db = bl->dbs[which_db];
|
||||
}
|
||||
int r = ft_loader_set_error(&bl->error_callback, error, db, which_db, key, val);
|
||||
if (r == 0 && callback)
|
||||
ft_loader_call_error_function(&bl->error_callback);
|
||||
}
|
||||
|
@ -624,17 +620,118 @@ FILE *toku_bl_fidx2file (FTLOADER bl, FIDX i) {
|
|||
return result;
|
||||
}
|
||||
|
||||
static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, FTLOADER UU(bl))
|
||||
static int bl_finish_compressed_write(FILE *stream, struct wbuf *wb) {
|
||||
int r;
|
||||
char *compressed_buf = NULL;
|
||||
const size_t data_size = wb->ndone;
|
||||
invariant(data_size > 0);
|
||||
invariant(data_size <= MAX_UNCOMPRESSED_BUF);
|
||||
|
||||
int n_sub_blocks = 0;
|
||||
int sub_block_size = 0;
|
||||
|
||||
r = choose_sub_block_size(wb->ndone, max_sub_blocks, &sub_block_size, &n_sub_blocks);
|
||||
invariant(r==0);
|
||||
invariant(0 < n_sub_blocks && n_sub_blocks <= max_sub_blocks);
|
||||
invariant(sub_block_size > 0);
|
||||
|
||||
struct sub_block sub_block[max_sub_blocks];
|
||||
// set the initial sub block size for all of the sub blocks
|
||||
for (int i = 0; i < n_sub_blocks; i++) {
|
||||
sub_block_init(&sub_block[i]);
|
||||
}
|
||||
set_all_sub_block_sizes(data_size, sub_block_size, n_sub_blocks, sub_block);
|
||||
|
||||
size_t compressed_len = get_sum_compressed_size_bound(n_sub_blocks, sub_block, TOKU_DEFAULT_COMPRESSION_METHOD);
|
||||
const size_t sub_block_header_len = sub_block_header_size(n_sub_blocks);
|
||||
const size_t other_overhead = sizeof(uint32_t); //total_size
|
||||
const size_t header_len = sub_block_header_len + other_overhead;
|
||||
MALLOC_N(header_len + compressed_len, compressed_buf);
|
||||
if (compressed_buf == nullptr) {
|
||||
return ENOMEM;
|
||||
}
|
||||
|
||||
// compress all of the sub blocks
|
||||
char *uncompressed_ptr = (char*)wb->buf;
|
||||
char *compressed_ptr = compressed_buf + header_len;
|
||||
compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr,
|
||||
get_num_cores(), get_ft_pool(), TOKU_DEFAULT_COMPRESSION_METHOD);
|
||||
|
||||
//total_size does NOT include itself
|
||||
uint32_t total_size = compressed_len + sub_block_header_len;
|
||||
// serialize the sub block header
|
||||
uint32_t *ptr = (uint32_t *)(compressed_buf);
|
||||
*ptr++ = toku_htod32(total_size);
|
||||
*ptr++ = toku_htod32(n_sub_blocks);
|
||||
for (int i=0; i<n_sub_blocks; i++) {
|
||||
ptr[0] = toku_htod32(sub_block[i].compressed_size);
|
||||
ptr[1] = toku_htod32(sub_block[i].uncompressed_size);
|
||||
ptr[2] = toku_htod32(sub_block[i].xsum);
|
||||
ptr += 3;
|
||||
}
|
||||
// Mark as written
|
||||
wb->ndone = 0;
|
||||
|
||||
size_t size_to_write = total_size + 4; // Includes writing total_size
|
||||
|
||||
{
|
||||
size_t written = do_fwrite(compressed_buf, 1, size_to_write, stream);
|
||||
if (written!=size_to_write) {
|
||||
if (os_fwrite_fun) // if using hook to induce artificial errors (for testing) ...
|
||||
r = get_maybe_error_errno(); // ... then there is no error in the stream, but there is one in errno
|
||||
else
|
||||
r = ferror(stream);
|
||||
invariant(r!=0);
|
||||
goto exit;
|
||||
}
|
||||
}
|
||||
r = 0;
|
||||
exit:
|
||||
if (compressed_buf) {
|
||||
toku_free(compressed_buf);
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
static int bl_compressed_write(void *ptr, size_t nbytes, FILE *stream, struct wbuf *wb) {
|
||||
invariant(wb->size <= MAX_UNCOMPRESSED_BUF);
|
||||
size_t bytes_left = nbytes;
|
||||
char *buf = (char*)ptr;
|
||||
|
||||
while (bytes_left > 0) {
|
||||
size_t bytes_to_copy = bytes_left;
|
||||
if (wb->ndone + bytes_to_copy > wb->size) {
|
||||
bytes_to_copy = wb->size - wb->ndone;
|
||||
}
|
||||
wbuf_nocrc_literal_bytes(wb, buf, bytes_to_copy);
|
||||
if (wb->ndone == wb->size) {
|
||||
//Compress, write to disk, and empty out wb
|
||||
int r = bl_finish_compressed_write(stream, wb);
|
||||
if (r != 0) {
|
||||
errno = r;
|
||||
return -1;
|
||||
}
|
||||
wb->ndone = 0;
|
||||
}
|
||||
bytes_left -= bytes_to_copy;
|
||||
buf += bytes_to_copy;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, struct wbuf *wb, FTLOADER bl)
|
||||
/* Effect: this is a wrapper for fwrite that returns 0 on success, otherwise returns an error number.
|
||||
* Arguments:
|
||||
* ptr the data to be writen.
|
||||
* size the amount of data to be written.
|
||||
* nmemb the number of units of size to be written.
|
||||
* stream write the data here.
|
||||
* wb where to write uncompressed data (if we're compressing) or ignore if NULL
|
||||
* bl passed so we can panic the ft_loader if something goes wrong (recording the error number).
|
||||
* Return value: 0 on success, an error number otherwise.
|
||||
*/
|
||||
{
|
||||
if (!bl->compress_intermediates || !wb) {
|
||||
size_t r = do_fwrite(ptr, size, nmemb, stream);
|
||||
if (r!=nmemb) {
|
||||
int e;
|
||||
|
@ -645,6 +742,13 @@ static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, FTLOADE
|
|||
invariant(e!=0);
|
||||
return e;
|
||||
}
|
||||
} else {
|
||||
size_t num_bytes = size * nmemb;
|
||||
int r = bl_compressed_write(ptr, num_bytes, stream, wb);
|
||||
if (r != 0) {
|
||||
return r;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -674,12 +778,12 @@ static int bl_fread (void *ptr, size_t size, size_t nmemb, FILE *stream)
|
|||
}
|
||||
}
|
||||
|
||||
static int bl_write_dbt (DBT *dbt, FILE* datafile, uint64_t *dataoff, FTLOADER bl)
|
||||
static int bl_write_dbt (DBT *dbt, FILE* datafile, uint64_t *dataoff, struct wbuf *wb, FTLOADER bl)
|
||||
{
|
||||
int r;
|
||||
int dlen = dbt->size;
|
||||
if ((r=bl_fwrite(&dlen, sizeof(dlen), 1, datafile, bl))) return r;
|
||||
if ((r=bl_fwrite(dbt->data, 1, dlen, datafile, bl))) return r;
|
||||
if ((r=bl_fwrite(&dlen, sizeof(dlen), 1, datafile, wb, bl))) return r;
|
||||
if ((r=bl_fwrite(dbt->data, 1, dlen, datafile, wb, bl))) return r;
|
||||
if (dataoff)
|
||||
*dataoff += dlen + sizeof(dlen);
|
||||
return 0;
|
||||
|
@ -741,12 +845,13 @@ static int bl_read_dbt_from_dbufio (/*in*/DBT *dbt, DBUFIO_FILESET bfs, int file
|
|||
}
|
||||
|
||||
|
||||
int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, uint64_t *dataoff, FTLOADER bl)
|
||||
int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, uint64_t *dataoff, struct wbuf *wb, FTLOADER bl)
|
||||
/* Effect: Given a key and a val (both DBTs), write them to a file. Increment *dataoff so that it's up to date.
|
||||
* Arguments:
|
||||
* key, val write these.
|
||||
* data the file to write them to
|
||||
* dataoff a pointer to a counter that keeps track of the amount of data written so far.
|
||||
* wb a pointer (possibly NULL) to buffer uncompressed output
|
||||
* bl the ft_loader (passed so we can panic if needed).
|
||||
* Return value: 0 on success, an error number otherwise.
|
||||
*/
|
||||
|
@ -755,8 +860,8 @@ int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, uint64_t *datao
|
|||
//int vlen = val->size;
|
||||
int r;
|
||||
// we have a chance to handle the errors because when we close we can delete all the files.
|
||||
if ((r=bl_write_dbt(key, dataf, dataoff, bl))) return r;
|
||||
if ((r=bl_write_dbt(val, dataf, dataoff, bl))) return r;
|
||||
if ((r=bl_write_dbt(key, dataf, dataoff, wb, bl))) return r;
|
||||
if ((r=bl_write_dbt(val, dataf, dataoff, wb, bl))) return r;
|
||||
toku_mutex_lock(&bl->file_infos.lock);
|
||||
bl->file_infos.file_infos[data.idx].n_rows++;
|
||||
toku_mutex_unlock(&bl->file_infos.lock);
|
||||
|
@ -948,7 +1053,7 @@ static void* extractor_thread (void *blv) {
|
|||
{
|
||||
r = process_primary_rows(bl, primary_rowset);
|
||||
if (r)
|
||||
ft_loader_set_panic(bl, r, false);
|
||||
ft_loader_set_panic(bl, r, false, 0, nullptr, nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -956,7 +1061,7 @@ static void* extractor_thread (void *blv) {
|
|||
if (r == 0) {
|
||||
r = finish_primary_rows(bl);
|
||||
if (r)
|
||||
ft_loader_set_panic(bl, r, false);
|
||||
ft_loader_set_panic(bl, r, false, 0, nullptr, nullptr);
|
||||
|
||||
}
|
||||
return NULL;
|
||||
|
@ -1087,30 +1192,40 @@ static int process_primary_rows_internal (FTLOADER bl, struct rowset *primary_ro
|
|||
pval.data = primary_rowset->data + prow->off + prow->klen;
|
||||
pval.size = prow->vlen;
|
||||
|
||||
DBT *dest_key = &skey;
|
||||
DBT *dest_val = &sval;
|
||||
|
||||
{
|
||||
int r = bl->generate_row_for_put(bl->dbs[i], bl->src_db, &skey, &sval, &pkey, &pval);
|
||||
int r;
|
||||
|
||||
if (bl->dbs[i] != bl->src_db) {
|
||||
r = bl->generate_row_for_put(bl->dbs[i], bl->src_db, dest_key, dest_val, &pkey, &pval);
|
||||
if (r != 0) {
|
||||
error_codes[i] = r;
|
||||
inc_error_count();
|
||||
break;
|
||||
}
|
||||
if (skey.size > klimit) {
|
||||
} else {
|
||||
dest_key = &pkey;
|
||||
dest_val = &pval;
|
||||
}
|
||||
if (dest_key->size > klimit) {
|
||||
error_codes[i] = EINVAL;
|
||||
fprintf(stderr, "Key too big (keysize=%d bytes, limit=%d bytes)\n", skey.size, klimit);
|
||||
fprintf(stderr, "Key too big (keysize=%d bytes, limit=%d bytes)\n", dest_key->size, klimit);
|
||||
inc_error_count();
|
||||
break;
|
||||
}
|
||||
if (sval.size > vlimit) {
|
||||
if (dest_val->size > vlimit) {
|
||||
error_codes[i] = EINVAL;
|
||||
fprintf(stderr, "Row too big (rowsize=%d bytes, limit=%d bytes)\n", sval.size, vlimit);
|
||||
fprintf(stderr, "Row too big (rowsize=%d bytes, limit=%d bytes)\n", dest_val->size, vlimit);
|
||||
inc_error_count();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
bl->extracted_datasizes[i] += ft_loader_leafentry_size(skey.size, sval.size, leafentry_xid(bl, i));
|
||||
bl->extracted_datasizes[i] += ft_loader_leafentry_size(dest_key->size, dest_val->size, leafentry_xid(bl, i));
|
||||
|
||||
if (row_wont_fit(rows, skey.size + sval.size)) {
|
||||
if (row_wont_fit(rows, dest_key->size + dest_val->size)) {
|
||||
//printf("%s:%d rows.n_rows=%ld rows.n_bytes=%ld\n", __FILE__, __LINE__, rows->n_rows, rows->n_bytes);
|
||||
int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare); // cannot spawn this because of the race on rows. If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however.
|
||||
// If we do spawn this, then we must account for the additional storage in the memory_per_rowset() function.
|
||||
|
@ -1121,7 +1236,7 @@ static int process_primary_rows_internal (FTLOADER bl, struct rowset *primary_ro
|
|||
break;
|
||||
}
|
||||
}
|
||||
int r = add_row(rows, &skey, &sval);
|
||||
int r = add_row(rows, dest_key, dest_val);
|
||||
if (r != 0) {
|
||||
error_codes[i] = r;
|
||||
inc_error_count();
|
||||
|
@ -1142,7 +1257,7 @@ static int process_primary_rows_internal (FTLOADER bl, struct rowset *primary_ro
|
|||
}
|
||||
}
|
||||
|
||||
{
|
||||
if (bl->dbs[i] != bl->src_db) {
|
||||
if (skey.flags) {
|
||||
toku_free(skey.data); skey.data = NULL;
|
||||
}
|
||||
|
@ -1157,7 +1272,10 @@ static int process_primary_rows_internal (FTLOADER bl, struct rowset *primary_ro
|
|||
int r = 0;
|
||||
if (error_count > 0) {
|
||||
for (int i=0; i<bl->N; i++) {
|
||||
if (error_codes[i]) r = error_codes[i];
|
||||
if (error_codes[i]) {
|
||||
r = error_codes[i];
|
||||
ft_loader_set_panic(bl, r, false, i, nullptr, nullptr);
|
||||
}
|
||||
}
|
||||
invariant(r); // found the error
|
||||
}
|
||||
|
@ -1457,16 +1575,42 @@ static int update_progress (int N,
|
|||
|
||||
|
||||
static int write_rowset_to_file (FTLOADER bl, FIDX sfile, const struct rowset rows) {
|
||||
int r = 0;
|
||||
// Allocate a buffer if we're compressing intermediates.
|
||||
char *uncompressed_buffer = nullptr;
|
||||
if (bl->compress_intermediates) {
|
||||
MALLOC_N(MAX_UNCOMPRESSED_BUF, uncompressed_buffer);
|
||||
if (uncompressed_buffer == nullptr) {
|
||||
return ENOMEM;
|
||||
}
|
||||
}
|
||||
struct wbuf wb;
|
||||
wbuf_init(&wb, uncompressed_buffer, MAX_UNCOMPRESSED_BUF);
|
||||
|
||||
FILE *sstream = toku_bl_fidx2file(bl, sfile);
|
||||
for (size_t i=0; i<rows.n_rows; i++) {
|
||||
DBT skey = make_dbt(rows.data + rows.rows[i].off, rows.rows[i].klen);
|
||||
DBT sval = make_dbt(rows.data + rows.rows[i].off + rows.rows[i].klen, rows.rows[i].vlen);
|
||||
|
||||
uint64_t soffset=0; // don't really need this.
|
||||
int r = loader_write_row(&skey, &sval, sfile, sstream, &soffset, bl);
|
||||
if (r != 0) return r;
|
||||
r = loader_write_row(&skey, &sval, sfile, sstream, &soffset, &wb, bl);
|
||||
if (r != 0) {
|
||||
goto exit;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (bl->compress_intermediates && wb.ndone > 0) {
|
||||
r = bl_finish_compressed_write(sstream, &wb);
|
||||
if (r != 0) {
|
||||
goto exit;
|
||||
}
|
||||
}
|
||||
r = 0;
|
||||
exit:
|
||||
if (uncompressed_buffer) {
|
||||
toku_free(uncompressed_buffer);
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
|
||||
|
@ -1622,6 +1766,17 @@ int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q
|
|||
if (r!=0) result = r;
|
||||
}
|
||||
|
||||
// Allocate a buffer if we're compressing intermediates.
|
||||
char *uncompressed_buffer = nullptr;
|
||||
struct wbuf wb;
|
||||
if (bl->compress_intermediates && !to_q) {
|
||||
MALLOC_N(MAX_UNCOMPRESSED_BUF, uncompressed_buffer);
|
||||
if (uncompressed_buffer == nullptr) {
|
||||
result = ENOMEM;
|
||||
}
|
||||
}
|
||||
wbuf_init(&wb, uncompressed_buffer, MAX_UNCOMPRESSED_BUF);
|
||||
|
||||
//printf(" n_rows=%ld\n", n_rows);
|
||||
while (result==0 && pqueue_size(pq)>0) {
|
||||
int mini;
|
||||
|
@ -1663,7 +1818,7 @@ int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q
|
|||
}
|
||||
} else {
|
||||
// write it to the dest file
|
||||
int r = loader_write_row(&keys[mini], &vals[mini], dest_data, dest_stream, &dataoff[mini], bl);
|
||||
int r = loader_write_row(&keys[mini], &vals[mini], dest_data, dest_stream, &dataoff[mini], &wb, bl);
|
||||
if (r!=0) {
|
||||
result = r;
|
||||
break;
|
||||
|
@ -1710,6 +1865,10 @@ int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q
|
|||
if (0) printf("%s:%d Progress=%d\n", __FILE__, __LINE__, r);
|
||||
}
|
||||
}
|
||||
if (result == 0 && uncompressed_buffer != nullptr && wb.ndone > 0) {
|
||||
result = bl_finish_compressed_write(dest_stream, &wb);
|
||||
}
|
||||
|
||||
if (result==0 && to_q) {
|
||||
int r = queue_enq(q, (void*)output_rowset, 1, NULL);
|
||||
if (r!=0)
|
||||
|
@ -1719,6 +1878,9 @@ int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q
|
|||
}
|
||||
|
||||
// cleanup
|
||||
if (uncompressed_buffer) {
|
||||
toku_free(uncompressed_buffer);
|
||||
}
|
||||
for (int i=0; i<n_sources; i++) {
|
||||
toku_free(keys[i].data); keys[i].data = NULL;
|
||||
toku_free(vals[i].data); vals[i].data = NULL;
|
||||
|
@ -1754,7 +1916,8 @@ static int merge_some_files (const bool to_q, FIDX dest_data, QUEUE q, int n_sou
|
|||
}
|
||||
}
|
||||
if (result==0) {
|
||||
int r = create_dbufio_fileset(&bfs, n_sources, fds, memory_per_rowset_during_merge(bl, n_sources, to_q));
|
||||
int r = create_dbufio_fileset(&bfs, n_sources, fds,
|
||||
memory_per_rowset_during_merge(bl, n_sources, to_q), bl->compress_intermediates);
|
||||
if (r!=0) { result = r; }
|
||||
}
|
||||
|
||||
|
@ -1895,7 +2058,7 @@ int merge_files (struct merge_fileset *fs,
|
|||
|
||||
if (result!=0) break;
|
||||
}
|
||||
if (result) ft_loader_set_panic(bl, result, true);
|
||||
if (result) ft_loader_set_panic(bl, result, true, which_db, nullptr, nullptr);
|
||||
|
||||
{
|
||||
int r = queue_eof(output_q);
|
||||
|
@ -2249,7 +2412,7 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
|
|||
int rr = queue_deq(q, &item, NULL, NULL);
|
||||
if (rr == EOF) break;
|
||||
if (rr != 0) {
|
||||
ft_loader_set_panic(bl, rr, true);
|
||||
ft_loader_set_panic(bl, rr, true, which_db, nullptr, nullptr);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -2283,8 +2446,8 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
|
|||
n_pivots++;
|
||||
|
||||
invariant(maxkey.data != NULL);
|
||||
if ((r = bl_write_dbt(&maxkey, pivots_stream, NULL, bl))) {
|
||||
ft_loader_set_panic(bl, r, true);
|
||||
if ((r = bl_write_dbt(&maxkey, pivots_stream, NULL, nullptr, bl))) {
|
||||
ft_loader_set_panic(bl, r, true, which_db, nullptr, nullptr);
|
||||
if (result == 0) result = r;
|
||||
break;
|
||||
}
|
||||
|
@ -2294,7 +2457,7 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
|
|||
|
||||
r = allocate_block(&out, &lblock);
|
||||
if (r != 0) {
|
||||
ft_loader_set_panic(bl, r, true);
|
||||
ft_loader_set_panic(bl, r, true, which_db, nullptr, nullptr);
|
||||
if (result == 0) result = r;
|
||||
break;
|
||||
}
|
||||
|
@ -2346,7 +2509,7 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
|
|||
|
||||
{
|
||||
DBT key = make_dbt(0,0); // must write an extra DBT into the pivots file.
|
||||
r = bl_write_dbt(&key, pivots_stream, NULL, bl);
|
||||
r = bl_write_dbt(&key, pivots_stream, NULL, nullptr, bl);
|
||||
if (r) {
|
||||
result = r; goto error;
|
||||
}
|
||||
|
@ -2723,7 +2886,7 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
|
|||
result = update_progress(progress_allocation, bl, "wrote node");
|
||||
|
||||
if (result)
|
||||
ft_loader_set_panic(bl, result, true);
|
||||
ft_loader_set_panic(bl, result, true, 0, nullptr, nullptr);
|
||||
}
|
||||
|
||||
static int write_translation_table (struct dbout *out, long long *off_of_translation_p) {
|
||||
|
@ -2833,7 +2996,7 @@ static int setup_nonleaf_block (int n_children,
|
|||
|
||||
if (result == 0) {
|
||||
FILE *next_pivots_stream = toku_bl_fidx2file(bl, next_pivots_file);
|
||||
int r = bl_write_dbt(&pivots[n_children-1], next_pivots_stream, NULL, bl);
|
||||
int r = bl_write_dbt(&pivots[n_children-1], next_pivots_stream, NULL, nullptr, bl);
|
||||
if (r)
|
||||
result = r;
|
||||
}
|
||||
|
@ -2933,7 +3096,7 @@ static void write_nonleaf_node (FTLOADER bl, struct dbout *out, int64_t blocknum
|
|||
toku_free(subtree_info);
|
||||
|
||||
if (result != 0)
|
||||
ft_loader_set_panic(bl, result, true);
|
||||
ft_loader_set_panic(bl, result, true, 0, nullptr, nullptr);
|
||||
}
|
||||
|
||||
static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const DESCRIPTOR descriptor, uint32_t target_nodesize, uint32_t target_basementnodesize, enum toku_compression_method target_compression_method) {
|
||||
|
|
|
@ -26,7 +26,8 @@ int toku_ft_loader_open (FTLOADER *bl,
|
|||
const char *temp_file_template,
|
||||
LSN load_lsn,
|
||||
TOKUTXN txn,
|
||||
bool reserve_memory);
|
||||
bool reserve_memory,
|
||||
bool compress_intermediates);
|
||||
|
||||
int toku_ft_loader_put (FTLOADER bl, DBT *key, DBT *val);
|
||||
|
||||
|
|
|
@ -44,7 +44,7 @@ sub_block_header_size(int n_sub_blocks);
|
|||
void
|
||||
set_compressed_size_bound(struct sub_block *se, enum toku_compression_method method);
|
||||
|
||||
// get the sum of the sub block compressed sizes
|
||||
// get the sum of the sub block compressed bound sizes
|
||||
size_t
|
||||
get_sum_compressed_size_bound(int n_sub_blocks, struct sub_block sub_block[], enum toku_compression_method method);
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ static void test1 (size_t chars_per_file, size_t UU(bytes_per_read)) {
|
|||
}
|
||||
DBUFIO_FILESET bfs;
|
||||
{
|
||||
int r = create_dbufio_fileset(&bfs, N, fds, M);
|
||||
int r = create_dbufio_fileset(&bfs, N, fds, M, false);
|
||||
assert(r==0);
|
||||
}
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ static void test1 (size_t chars_per_file, size_t bytes_per_read) {
|
|||
}
|
||||
DBUFIO_FILESET bfs;
|
||||
{
|
||||
int r = create_dbufio_fileset(&bfs, N, fds, M);
|
||||
int r = create_dbufio_fileset(&bfs, N, fds, M, false);
|
||||
assert(r==0);
|
||||
}
|
||||
while (n_live>0) {
|
||||
|
|
|
@ -87,7 +87,7 @@ static void test_extractor(int nrows, int nrowsets, bool expect_fail) {
|
|||
}
|
||||
|
||||
FTLOADER loader;
|
||||
r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, "tempXXXXXX", ZERO_LSN, TXNID_NONE, true);
|
||||
r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, "tempXXXXXX", ZERO_LSN, TXNID_NONE, true, false);
|
||||
assert(r == 0);
|
||||
|
||||
struct rowset *rowset[nrowsets];
|
||||
|
|
|
@ -99,7 +99,7 @@ static void test_extractor(int nrows, int nrowsets, bool expect_fail, const char
|
|||
sprintf(temp, "%s/%s", testdir, "tempXXXXXX");
|
||||
|
||||
FTLOADER loader;
|
||||
r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, "tempXXXXXX", ZERO_LSN, TXNID_NONE, true);
|
||||
r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, "tempXXXXXX", ZERO_LSN, TXNID_NONE, true, false);
|
||||
assert(r == 0);
|
||||
|
||||
struct rowset *rowset[nrowsets];
|
||||
|
|
|
@ -319,7 +319,7 @@ static void test_extractor(int nrows, int nrowsets, const char *testdir) {
|
|||
sprintf(temp, "%s/%s", testdir, "tempXXXXXX");
|
||||
|
||||
FTLOADER loader;
|
||||
r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, temp, ZERO_LSN, TXNID_NONE, true);
|
||||
r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, temp, ZERO_LSN, TXNID_NONE, true, false);
|
||||
assert(r == 0);
|
||||
|
||||
struct rowset *rowset[nrowsets];
|
||||
|
|
|
@ -326,7 +326,7 @@ static void test (const char *directory, bool is_error) {
|
|||
bt_compare_functions,
|
||||
"tempxxxxxx",
|
||||
*lsnp,
|
||||
TXNID_NONE, true);
|
||||
TXNID_NONE, true, false);
|
||||
assert(r==0);
|
||||
}
|
||||
|
||||
|
@ -340,7 +340,7 @@ static void test (const char *directory, bool is_error) {
|
|||
{ int r = queue_create(&q, 1000); assert(r==0); }
|
||||
DBUFIO_FILESET bfs;
|
||||
const int MERGE_BUF_SIZE = 100000; // bigger than 64K so that we will trigger malloc issues.
|
||||
{ int r = create_dbufio_fileset(&bfs, N_SOURCES, fds, MERGE_BUF_SIZE); assert(r==0); }
|
||||
{ int r = create_dbufio_fileset(&bfs, N_SOURCES, fds, MERGE_BUF_SIZE, false); assert(r==0); }
|
||||
FIDX *XMALLOC_N(N_SOURCES, src_fidxs);
|
||||
assert(bl->file_infos.n_files==0);
|
||||
bl->file_infos.n_files = N_SOURCES;
|
||||
|
|
|
@ -57,7 +57,7 @@ static void test_loader_open(int ndbs) {
|
|||
for (i = 0; ; i++) {
|
||||
set_my_malloc_trigger(i+1);
|
||||
|
||||
r = toku_ft_loader_open(&loader, NULL, NULL, NULL, ndbs, brts, dbs, fnames, compares, "", ZERO_LSN, TXNID_NONE, true);
|
||||
r = toku_ft_loader_open(&loader, NULL, NULL, NULL, ndbs, brts, dbs, fnames, compares, "", ZERO_LSN, TXNID_NONE, true, false);
|
||||
if (r == 0)
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -187,7 +187,7 @@ static void test_read_write_rows (char *tf_template) {
|
|||
toku_fill_dbt(&key, keystrings[i], strlen(keystrings[i]));
|
||||
DBT val;
|
||||
toku_fill_dbt(&val, valstrings[i], strlen(valstrings[i]));
|
||||
r = loader_write_row(&key, &val, file, toku_bl_fidx2file(&bl, file), &dataoff, &bl);
|
||||
r = loader_write_row(&key, &val, file, toku_bl_fidx2file(&bl, file), &dataoff, nullptr, &bl);
|
||||
CKERR(r);
|
||||
actual_size+=key.size + val.size + 8;
|
||||
}
|
||||
|
|
|
@ -430,3 +430,8 @@ int toku_fsync_directory(const char *fname) {
|
|||
toku_free(dirname);
|
||||
return result;
|
||||
}
|
||||
|
||||
FILE *toku_os_fmemopen(void *buf, size_t size, const char *mode) {
|
||||
return fmemopen(buf, size, mode);
|
||||
}
|
||||
|
||||
|
|
|
@ -174,6 +174,7 @@ toku_loader_create_loader(DB_ENV *env,
|
|||
|
||||
DB_LOADER *loader = NULL;
|
||||
bool puts_allowed = !(loader_flags & LOADER_DISALLOW_PUTS);
|
||||
bool compress_intermediates = (loader_flags & LOADER_COMPRESS_INTERMEDIATES) != 0;
|
||||
XCALLOC(loader); // init to all zeroes (thus initializing the error_callback and poll_func)
|
||||
XCALLOC(loader->i); // init to all zeroes (thus initializing all pointers to NULL)
|
||||
|
||||
|
@ -252,7 +253,8 @@ toku_loader_create_loader(DB_ENV *env,
|
|||
loader->i->temp_file_template,
|
||||
load_lsn,
|
||||
ttxn,
|
||||
puts_allowed);
|
||||
puts_allowed,
|
||||
compress_intermediates);
|
||||
if ( rval!=0 ) {
|
||||
toku_free(new_inames_in_env);
|
||||
toku_free(brts);
|
||||
|
|
|
@ -546,14 +546,22 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
|
|||
add_test(ydb/loader-stress-test2.tdb loader-stress-test.tdb -r 5000 -s -e dir.loader-stress-test2.tdb)
|
||||
add_test(ydb/loader-stress-test3.tdb loader-stress-test.tdb -u -c -e dir.loader-stress-test3.tdb)
|
||||
add_test(ydb/loader-stress-test4.tdb loader-stress-test.tdb -r 10000000 -c -e dir.loader-stress-test4.tdb)
|
||||
add_test(ydb/loader-stress-test5.tdb loader-stress-test.tdb -c -z -e dir.loader-stress-test5.tdb)
|
||||
add_test(ydb/loader-stress-test0z.tdb loader-stress-test.tdb -c -e dir.loader-stress-test0z.tdb -z)
|
||||
add_test(ydb/loader-stress-test1z.tdb loader-stress-test.tdb -c -p -e dir.loader-stress-test1z.tdb -z)
|
||||
add_test(ydb/loader-stress-test2z.tdb loader-stress-test.tdb -r 5000 -s -e dir.loader-stress-test2z.tdb -z)
|
||||
add_test(ydb/loader-stress-test3z.tdb loader-stress-test.tdb -u -c -e dir.loader-stress-test3z.tdb -z)
|
||||
add_test(ydb/loader-stress-test4z.tdb loader-stress-test.tdb -r 10000000 -c -e dir.loader-stress-test4z.tdb -z)
|
||||
set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES
|
||||
dir.loader-stress-test0.tdb
|
||||
dir.loader-stress-test1.tdb
|
||||
dir.loader-stress-test2.tdb
|
||||
dir.loader-stress-test3.tdb
|
||||
dir.loader-stress-test4.tdb
|
||||
dir.loader-stress-test5.tdb
|
||||
dir.loader-stress-test0z.tdb
|
||||
dir.loader-stress-test1z.tdb
|
||||
dir.loader-stress-test2z.tdb
|
||||
dir.loader-stress-test3z.tdb
|
||||
dir.loader-stress-test4z.tdb
|
||||
)
|
||||
|
||||
list(REMOVE_ITEM loader_tests loader-dup-test.loader)
|
||||
|
@ -563,6 +571,12 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
|
|||
add_test(ydb/loader-dup-test3.tdb loader-dup-test.tdb -d 1 -s -r 100 -e dir.loader-dup-test3.tdb)
|
||||
add_test(ydb/loader-dup-test4.tdb loader-dup-test.tdb -d 1 -s -r 1000 -e dir.loader-dup-test4.tdb)
|
||||
add_test(ydb/loader-dup-test5.tdb loader-dup-test.tdb -d 1 -s -r 1000 -E -e dir.loader-dup-test5.tdb)
|
||||
add_test(ydb/loader-dup-test0z.tdb loader-dup-test.tdb -e dir.loader-dup-test0z.tdb -z)
|
||||
add_test(ydb/loader-dup-test1z.tdb loader-dup-test.tdb -d 1 -r 500000 -e dir.loader-dup-test1z.tdb -z)
|
||||
add_test(ydb/loader-dup-test2z.tdb loader-dup-test.tdb -d 1 -r 1000000 -e dir.loader-dup-test2z.tdb -z)
|
||||
add_test(ydb/loader-dup-test3z.tdb loader-dup-test.tdb -d 1 -s -r 100 -e dir.loader-dup-test3z.tdb -z)
|
||||
add_test(ydb/loader-dup-test4z.tdb loader-dup-test.tdb -d 1 -s -r 1000 -e dir.loader-dup-test4z.tdb -z)
|
||||
add_test(ydb/loader-dup-test5z.tdb loader-dup-test.tdb -d 1 -s -r 1000 -E -e dir.loader-dup-test5z.tdb -z)
|
||||
set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES
|
||||
dir.loader-dup-test0.tdb
|
||||
dir.loader-dup-test1.tdb
|
||||
|
@ -570,6 +584,12 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
|
|||
dir.loader-dup-test3.tdb
|
||||
dir.loader-dup-test4.tdb
|
||||
dir.loader-dup-test5.tdb
|
||||
dir.loader-dup-test0z.tdb
|
||||
dir.loader-dup-test1z.tdb
|
||||
dir.loader-dup-test2z.tdb
|
||||
dir.loader-dup-test3z.tdb
|
||||
dir.loader-dup-test4z.tdb
|
||||
dir.loader-dup-test5z.tdb
|
||||
)
|
||||
|
||||
## as part of #4503, we took out test 1 and 3
|
||||
|
@ -578,6 +598,8 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
|
|||
#add_test(ydb/loader-cleanup-test1.tdb loader-cleanup-test.tdb -s -r 800 -p -e dir.loader-cleanup-test1.tdb)
|
||||
add_test(ydb/loader-cleanup-test2.tdb loader-cleanup-test.tdb -s -r 8000 -e dir.loader-cleanup-test2.tdb)
|
||||
#add_test(ydb/loader-cleanup-test3.tdb loader-cleanup-test.tdb -s -r 8000 -p -e dir.loader-cleanup-test3.tdb)
|
||||
add_test(ydb/loader-cleanup-test0z.tdb loader-cleanup-test.tdb -s -r 800 -e dir.loader-cleanup-test0z.tdb -z)
|
||||
add_test(ydb/loader-cleanup-test2z.tdb loader-cleanup-test.tdb -s -r 8000 -e dir.loader-cleanup-test2z.tdb -z)
|
||||
set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES
|
||||
dir.loader-cleanup-test0.tdb
|
||||
dir.loader-cleanup-test1.tdb
|
||||
|
@ -606,11 +628,15 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
|
|||
)
|
||||
|
||||
declare_custom_tests(maxsize-for-loader.tdb)
|
||||
add_test(ydb/maxsize-for-loader-A.tdb maxsize-for-loader.tdb -e dir.maxsize-for-loader-A.tdb -f)
|
||||
add_test(ydb/maxsize-for-loader-B.tdb maxsize-for-loader.tdb -e dir.maxsize-for-loader-B.tdb)
|
||||
add_test(ydb/maxsize-for-loader-A.tdb maxsize-for-loader.tdb -e dir.maxsize-for-loader-A.tdb -f -c)
|
||||
add_test(ydb/maxsize-for-loader-B.tdb maxsize-for-loader.tdb -e dir.maxsize-for-loader-B.tdb -c)
|
||||
add_test(ydb/maxsize-for-loader-Az.tdb maxsize-for-loader.tdb -e dir.maxsize-for-loader-Az.tdb -f -z -c)
|
||||
add_test(ydb/maxsize-for-loader-Bz.tdb maxsize-for-loader.tdb -e dir.maxsize-for-loader-Bz.tdb -z -c)
|
||||
set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES
|
||||
dir.maxsize-for-loader-A.tdb
|
||||
dir.maxsize-for-loader-B.tdb
|
||||
dir.maxsize-for-loader-Az.tdb
|
||||
dir.maxsize-for-loader-Bz.tdb
|
||||
)
|
||||
|
||||
declare_custom_tests(hotindexer-undo-do-test.tdb)
|
||||
|
@ -667,6 +693,8 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
|
|||
ydb/env-put-multiple.tdb
|
||||
ydb/filesize.tdb
|
||||
ydb/loader-cleanup-test0.tdb
|
||||
ydb/loader-cleanup-test0z.tdb
|
||||
ydb/loader-cleanup-test2z.tdb
|
||||
ydb/manyfiles.tdb
|
||||
ydb/recover-loader-test.abortrecover
|
||||
ydb/recovery_fileops_stress.tdb
|
||||
|
@ -712,6 +740,7 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
|
|||
ydb/loader-stress-del.nop.loader
|
||||
ydb/loader-stress-del.p.loader
|
||||
ydb/loader-stress-del.comp.loader
|
||||
ydb/loader-stress-test4z.tdb
|
||||
ydb/test3039.tdb
|
||||
ydb/test3529.tdb
|
||||
ydb/test_update_stress.tdb
|
||||
|
|
|
@ -7,18 +7,24 @@
|
|||
#include "toku_pthread.h"
|
||||
#include <db.h>
|
||||
#include <sys/stat.h>
|
||||
#include "toku_random.h"
|
||||
|
||||
using namespace toku;
|
||||
bool fast = false;
|
||||
|
||||
DB_ENV *env;
|
||||
enum {NUM_DBS=2};
|
||||
int USE_PUTS=0;
|
||||
uint32_t USE_COMPRESS=0;
|
||||
|
||||
bool do_check = false;
|
||||
uint32_t num_rows = 1;
|
||||
uint32_t which_db_to_fail = (uint32_t) -1;
|
||||
uint32_t which_row_to_fail = (uint32_t) -1;
|
||||
enum how_to_fail { FAIL_NONE, FAIL_KSIZE, FAIL_VSIZE } how_to_fail = FAIL_NONE;
|
||||
|
||||
static struct random_data random_data[NUM_DBS];
|
||||
char random_buf[NUM_DBS][8];
|
||||
|
||||
static int put_multiple_generate(DB *dest_db,
|
||||
DB *src_db __attribute__((__unused__)),
|
||||
DBT *dest_key, DBT *dest_val,
|
||||
|
@ -52,8 +58,8 @@ static int put_multiple_generate(DB *dest_db,
|
|||
dest_val->ulen = vsize;
|
||||
}
|
||||
assert(ksize>=sizeof(uint32_t));
|
||||
for (uint32_t i=0; i<ksize; i++) ((char*)dest_key->data)[i] = random();
|
||||
for (uint32_t i=0; i<vsize; i++) ((char*)dest_val->data)[i] = random();
|
||||
for (uint32_t i=0; i<ksize; i++) ((char*)dest_key->data)[i] = myrandom_r(&random_data[which]);
|
||||
for (uint32_t i=0; i<vsize; i++) ((char*)dest_val->data)[i] = myrandom_r(&random_data[which]);
|
||||
*(uint32_t*)dest_key->data = rownum;
|
||||
dest_key->size = ksize;
|
||||
dest_val->size = vsize;
|
||||
|
@ -74,7 +80,18 @@ static void error_callback (DB *db __attribute__((__unused__)), int which_db, in
|
|||
e->error_count++;
|
||||
}
|
||||
|
||||
static void test_loader_maxsize(DB **dbs)
|
||||
static void reset_random(void) {
|
||||
int r;
|
||||
|
||||
for (int i = 0; i < NUM_DBS; i++) {
|
||||
ZERO_STRUCT(random_data[i]);
|
||||
ZERO_ARRAY(random_buf[i]);
|
||||
r = myinitstate_r(i, random_buf[i], 8, &random_data[i]);
|
||||
assert(r==0);
|
||||
}
|
||||
}
|
||||
|
||||
static void test_loader_maxsize(DB **dbs, DB **check_dbs)
|
||||
{
|
||||
int r;
|
||||
DB_TXN *txn;
|
||||
|
@ -85,7 +102,7 @@ static void test_loader_maxsize(DB **dbs)
|
|||
db_flags[i] = DB_NOOVERWRITE;
|
||||
dbt_flags[i] = 0;
|
||||
}
|
||||
uint32_t loader_flags = USE_PUTS; // set with -p option
|
||||
uint32_t loader_flags = USE_COMPRESS; // set with -p option
|
||||
|
||||
// create and initialize loader
|
||||
r = env->txn_begin(env, NULL, &txn, 0);
|
||||
|
@ -98,6 +115,7 @@ static void test_loader_maxsize(DB **dbs)
|
|||
r = loader->set_poll_function(loader, NULL, NULL);
|
||||
CKERR(r);
|
||||
|
||||
reset_random();
|
||||
// using loader->put, put values into DB
|
||||
DBT key, val;
|
||||
unsigned int k, v;
|
||||
|
@ -107,14 +125,8 @@ static void test_loader_maxsize(DB **dbs)
|
|||
dbt_init(&key, &k, sizeof(unsigned int));
|
||||
dbt_init(&val, &v, sizeof(unsigned int));
|
||||
r = loader->put(loader, &key, &val);
|
||||
if (USE_PUTS) {
|
||||
//PUT loader can return -1 if it finds an error during the puts.
|
||||
CKERR2s(r, 0,-1);
|
||||
}
|
||||
else {
|
||||
CKERR(r);
|
||||
}
|
||||
}
|
||||
|
||||
// close the loader
|
||||
if (verbose) { printf("closing"); fflush(stdout); }
|
||||
|
@ -127,16 +139,124 @@ static void test_loader_maxsize(DB **dbs)
|
|||
}
|
||||
assert(0);
|
||||
checked:
|
||||
|
||||
r = txn->commit(txn, 0);
|
||||
CKERR(r);
|
||||
|
||||
if (do_check && how_to_fail==FAIL_NONE) {
|
||||
r = env->txn_begin(env, NULL, &txn, 0);
|
||||
CKERR(r);
|
||||
reset_random();
|
||||
DBT keys[NUM_DBS];
|
||||
DBT vals[NUM_DBS];
|
||||
uint32_t flags[NUM_DBS];
|
||||
for (int i = 0; i < NUM_DBS; i++) {
|
||||
dbt_init_realloc(&keys[i]);
|
||||
dbt_init_realloc(&vals[i]);
|
||||
flags[i] = 0;
|
||||
}
|
||||
|
||||
for(uint32_t i=0;i<num_rows;i++) {
|
||||
k = i;
|
||||
v = i;
|
||||
dbt_init(&key, &k, sizeof(unsigned int));
|
||||
dbt_init(&val, &v, sizeof(unsigned int));
|
||||
DB* src_db = check_dbs[0];
|
||||
r = env->put_multiple(env, src_db, txn, &key, &val, NUM_DBS, check_dbs, keys, vals, flags);
|
||||
CKERR(r);
|
||||
}
|
||||
r = txn->commit(txn, 0);
|
||||
CKERR(r);
|
||||
r = env->txn_begin(env, NULL, &txn, 0);
|
||||
CKERR(r);
|
||||
|
||||
for (int i = 0; i < NUM_DBS; i++) {
|
||||
DBC *loader_cursor;
|
||||
DBC *check_cursor;
|
||||
r = dbs[i]->cursor(dbs[i], txn, &loader_cursor, 0);
|
||||
CKERR(r);
|
||||
r = dbs[i]->cursor(check_dbs[i], txn, &check_cursor, 0);
|
||||
CKERR(r);
|
||||
DBT loader_key;
|
||||
DBT loader_val;
|
||||
DBT check_key;
|
||||
DBT check_val;
|
||||
dbt_init_realloc(&loader_key);
|
||||
dbt_init_realloc(&loader_val);
|
||||
dbt_init_realloc(&check_key);
|
||||
dbt_init_realloc(&check_val);
|
||||
for (uint32_t x = 0; x <= num_rows; x++) {
|
||||
int r_loader = loader_cursor->c_get(loader_cursor, &loader_key, &loader_val, DB_NEXT);
|
||||
int r_check = check_cursor->c_get(check_cursor, &check_key, &check_val, DB_NEXT);
|
||||
assert(r_loader == r_check);
|
||||
if (x == num_rows) {
|
||||
CKERR2(r_loader, DB_NOTFOUND);
|
||||
CKERR2(r_check, DB_NOTFOUND);
|
||||
} else {
|
||||
CKERR(r_loader);
|
||||
CKERR(r_check);
|
||||
}
|
||||
assert(loader_key.size == check_key.size);
|
||||
assert(loader_val.size == check_val.size);
|
||||
assert(memcmp(loader_key.data, check_key.data, loader_key.size) == 0);
|
||||
assert(memcmp(loader_val.data, check_val.data, loader_val.size) == 0);
|
||||
}
|
||||
toku_free(loader_key.data);
|
||||
toku_free(loader_val.data);
|
||||
toku_free(check_key.data);
|
||||
toku_free(check_val.data);
|
||||
loader_cursor->c_close(loader_cursor);
|
||||
check_cursor->c_close(check_cursor);
|
||||
}
|
||||
|
||||
for (int i = 0; i < NUM_DBS; i++) {
|
||||
toku_free(keys[i].data);
|
||||
toku_free(vals[i].data);
|
||||
dbt_init_realloc(&keys[i]);
|
||||
dbt_init_realloc(&vals[i]);
|
||||
}
|
||||
r = txn->commit(txn, 0);
|
||||
CKERR(r);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
char *free_me = NULL;
|
||||
const char *env_dir = ENVDIR; // the default env_dir
|
||||
|
||||
static void create_and_open_dbs(DB **dbs, const char *suffix, int *idx) {
|
||||
int r;
|
||||
DBT desc;
|
||||
dbt_init(&desc, "foo", sizeof("foo"));
|
||||
enum {MAX_NAME=128};
|
||||
char name[MAX_NAME*2];
|
||||
|
||||
for(int i=0;i<NUM_DBS;i++) {
|
||||
idx[i] = i;
|
||||
r = db_create(&dbs[i], env, 0); CKERR(r);
|
||||
dbs[i]->app_private = &idx[i];
|
||||
snprintf(name, sizeof(name), "db_%04x_%s", i, suffix);
|
||||
r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r);
|
||||
IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
|
||||
{ int chk_r = dbs[i]->change_descriptor(dbs[i], txn_desc, &desc, 0); CKERR(chk_r); }
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
static int
|
||||
uint_or_size_dbt_cmp (DB *db, const DBT *a, const DBT *b) {
|
||||
assert(db && a && b);
|
||||
if (a->size == sizeof(unsigned int) && b->size == sizeof(unsigned int)) {
|
||||
return uint_dbt_cmp(db, a, b);
|
||||
}
|
||||
return a->size - b->size;
|
||||
}
|
||||
|
||||
static void run_test(uint32_t nr, uint32_t wdb, uint32_t wrow, enum how_to_fail htf) {
|
||||
num_rows = nr; which_db_to_fail = wdb; which_row_to_fail = wrow; how_to_fail = htf;
|
||||
|
||||
//since src_key/val can't be modified, and we're using generate to make the failures, we can't fail on src_db (0)
|
||||
assert(which_db_to_fail != 0);
|
||||
int r;
|
||||
{
|
||||
int len = strlen(env_dir) + 20;
|
||||
|
@ -148,7 +268,7 @@ static void run_test(uint32_t nr, uint32_t wdb, uint32_t wrow, enum how_to_fail
|
|||
r = toku_os_mkdir(env_dir, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
|
||||
|
||||
r = db_env_create(&env, 0); CKERR(r);
|
||||
r = env->set_default_bt_compare(env, uint_dbt_cmp); CKERR(r);
|
||||
r = env->set_default_bt_compare(env, uint_or_size_dbt_cmp); CKERR(r);
|
||||
r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
|
||||
CKERR(r);
|
||||
int envflags = DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOG | DB_CREATE | DB_PRIVATE;
|
||||
|
@ -157,37 +277,35 @@ static void run_test(uint32_t nr, uint32_t wdb, uint32_t wrow, enum how_to_fail
|
|||
//Disable auto-checkpointing
|
||||
r = env->checkpointing_set_period(env, 0); CKERR(r);
|
||||
|
||||
DBT desc;
|
||||
dbt_init(&desc, "foo", sizeof("foo"));
|
||||
enum {MAX_NAME=128};
|
||||
char name[MAX_NAME*2];
|
||||
|
||||
DB **dbs = (DB**)toku_malloc(sizeof(DB*) * NUM_DBS);
|
||||
assert(dbs != NULL);
|
||||
DB **XMALLOC_N(NUM_DBS, dbs);
|
||||
DB **check_dbs;
|
||||
int idx[NUM_DBS];
|
||||
for(int i=0;i<NUM_DBS;i++) {
|
||||
idx[i] = i;
|
||||
r = db_create(&dbs[i], env, 0); CKERR(r);
|
||||
dbs[i]->app_private = &idx[i];
|
||||
snprintf(name, sizeof(name), "db_%04x", i);
|
||||
r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r);
|
||||
IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
|
||||
{ int chk_r = dbs[i]->change_descriptor(dbs[i], txn_desc, &desc, 0); CKERR(chk_r); }
|
||||
});
|
||||
|
||||
create_and_open_dbs(dbs, "loader", &idx[0]);
|
||||
if (do_check && how_to_fail==FAIL_NONE) {
|
||||
XMALLOC_N(NUM_DBS, check_dbs);
|
||||
create_and_open_dbs(check_dbs, "check", &idx[0]);
|
||||
}
|
||||
|
||||
if (verbose) printf("running test_loader()\n");
|
||||
// -------------------------- //
|
||||
test_loader_maxsize(dbs);
|
||||
test_loader_maxsize(dbs, check_dbs);
|
||||
// -------------------------- //
|
||||
if (verbose) printf("done test_loader()\n");
|
||||
|
||||
for(int i=0;i<NUM_DBS;i++) {
|
||||
dbs[i]->close(dbs[i], 0); CKERR(r);
|
||||
dbs[i] = NULL;
|
||||
if (do_check && how_to_fail==FAIL_NONE) {
|
||||
check_dbs[i]->close(check_dbs[i], 0); CKERR(r);
|
||||
check_dbs[i] = NULL;
|
||||
}
|
||||
}
|
||||
r = env->close(env, 0); CKERR(r);
|
||||
toku_free(dbs);
|
||||
if (do_check && how_to_fail==FAIL_NONE) {
|
||||
toku_free(check_dbs);
|
||||
}
|
||||
}
|
||||
|
||||
// ------------ infrastructure ----------
|
||||
|
@ -199,12 +317,12 @@ int test_main(int argc, char * const *argv) {
|
|||
do_args(argc, argv);
|
||||
|
||||
run_test(1, (uint32_t) -1, (uint32_t) -1, FAIL_NONE);
|
||||
run_test(1, 0, 0, FAIL_NONE);
|
||||
run_test(1, 0, 0, FAIL_KSIZE);
|
||||
run_test(1, 0, 0, FAIL_VSIZE);
|
||||
run_test(1, 1, 0, FAIL_NONE);
|
||||
run_test(1, 1, 0, FAIL_KSIZE);
|
||||
run_test(1, 1, 0, FAIL_VSIZE);
|
||||
if (!fast) {
|
||||
run_test(1000000, 0, 500000, FAIL_KSIZE);
|
||||
run_test(1000000, 0, 500000, FAIL_VSIZE);
|
||||
run_test(1000000, 1, 500000, FAIL_KSIZE);
|
||||
run_test(1000000, 1, 500000, FAIL_VSIZE);
|
||||
}
|
||||
toku_free(free_me);
|
||||
return 0;
|
||||
|
@ -223,7 +341,8 @@ static void do_args(int argc, char * const argv[]) {
|
|||
fprintf(stderr, " -h help\n");
|
||||
fprintf(stderr, " -v verbose\n");
|
||||
fprintf(stderr, " -q quiet\n");
|
||||
fprintf(stderr, " -p use DB->put\n");
|
||||
fprintf(stderr, " -z compress intermediates\n");
|
||||
fprintf(stderr, " -c compare with regular dbs\n");
|
||||
fprintf(stderr, " -f fast (suitable for vgrind)\n");
|
||||
exit(resultcode);
|
||||
} else if (strcmp(argv[0], "-e")==0) {
|
||||
|
@ -235,13 +354,15 @@ static void do_args(int argc, char * const argv[]) {
|
|||
assert(r<len);
|
||||
env_dir = toku_strdup(full_env_dir);
|
||||
free_me = (char *) env_dir;
|
||||
} else if (strcmp(argv[0], "-c")==0) {
|
||||
do_check = true;
|
||||
} else if (strcmp(argv[0], "-v")==0) {
|
||||
verbose++;
|
||||
} else if (strcmp(argv[0],"-q")==0) {
|
||||
verbose--;
|
||||
if (verbose<0) verbose=0;
|
||||
} else if (strcmp(argv[0], "-p")==0) {
|
||||
USE_PUTS = 1;
|
||||
} else if (strcmp(argv[0], "-z")==0) {
|
||||
USE_COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
|
||||
} else if (strcmp(argv[0], "-f")==0) {
|
||||
fast = true;
|
||||
} else {
|
||||
|
|
|
@ -266,6 +266,7 @@ int toku_os_close(int fd);
|
|||
int toku_os_fclose(FILE * stream);
|
||||
ssize_t toku_os_read(int fd, void *buf, size_t count);
|
||||
ssize_t toku_os_pread(int fd, void *buf, size_t count, off_t offset);
|
||||
FILE *toku_os_fmemopen(void *buf, size_t size, const char *mode);
|
||||
|
||||
// wrapper around fsync
|
||||
void toku_file_fsync_without_accounting(int fd);
|
||||
|
|
Loading…
Add table
Reference in a new issue