From 12ccf6ae0ad2d893747d02196efb463fd606f0fa Mon Sep 17 00:00:00 2001 From: Yoni Fogel Date: Wed, 17 Apr 2013 00:01:24 -0400 Subject: [PATCH] refs #5663 Merge #5663 onto main git-svn-id: file:///svn/toku/tokudb@51238 c7de825b-a66e-492c-adef-691d508d4ae1 --- ft/dbufio.cc | 193 +++++++++++- ft/dbufio.h | 2 +- ft/ft.h | 3 + ft/ft_node-serialize.cc | 20 +- ft/ftloader-internal.h | 19 +- ft/ftloader.cc | 295 ++++++++++++++----- ft/ftloader.h | 3 +- ft/sub_block.h | 2 +- ft/tests/dbufio-test-destroy.cc | 2 +- ft/tests/dbufio-test.cc | 2 +- ft/tests/ftloader-test-bad-generate.cc | 2 +- ft/tests/ftloader-test-extractor-errors.cc | 2 +- ft/tests/ftloader-test-extractor.cc | 2 +- ft/tests/ftloader-test-merge-files-dbufio.cc | 4 +- ft/tests/ftloader-test-open.cc | 2 +- ft/tests/ftloader-test.cc | 2 +- portability/file.cc | 5 + src/loader.cc | 4 +- src/tests/CMakeLists.txt | 37 ++- src/tests/maxsize-for-loader.cc | 201 ++++++++++--- toku_include/toku_portability.h | 1 + 21 files changed, 660 insertions(+), 143 deletions(-) diff --git a/ft/dbufio.cc b/ft/dbufio.cc index 5130c2f1864..abf74cff28e 100644 --- a/ft/dbufio.cc +++ b/ft/dbufio.cc @@ -11,6 +11,9 @@ #include #include "memory.h" #include +#include "ftloader-internal.h" +#include "ft-internal.h" +#include "ft.h" struct dbufio_file { // i/o thread owns these @@ -18,7 +21,7 @@ struct dbufio_file { // consumers own these size_t offset_in_buf; - toku_off_t offset_in_file; + toku_off_t offset_in_uncompressed_file; // need the mutex to modify these struct dbufio_file *next; @@ -49,6 +52,7 @@ struct dbufio_fileset { size_t bufsize; // the bufsize is the constant (the same for all buffers). bool panic; + bool compressed; int panic_errno; toku_pthread_t iothread; }; @@ -75,6 +79,162 @@ static bool paniced (DBUFIO_FILESET bfs) { return bfs->panic; } +static ssize_t dbf_read_some_compressed(struct dbufio_file *dbf, char *buf, size_t bufsize) { + ssize_t ret; + invariant(bufsize >= MAX_UNCOMPRESSED_BUF); + unsigned char *raw_block = NULL; + + // deserialize the sub block header + + // total_size + // num_sub_blocks + // compressed_size,uncompressed_size,xsum (repeated num_sub_blocks times) + ssize_t readcode; + const uint32_t header_size = sizeof(uint32_t); + char header[header_size]; + + readcode = toku_os_read(dbf->fd, &header, header_size); + if (readcode < 0) { + ret = -1; + goto exit; + } + if (readcode == 0) { + ret = 0; + goto exit; + } + if (readcode < header_size) { + errno = TOKUDB_NO_DATA; + ret = -1; + goto exit; + } + uint32_t total_size; + { + uint32_t *p = (uint32_t *) &header[0]; + total_size = toku_dtoh32(p[0]); + } + if (total_size == 0 || total_size > (1<<30)) { + errno = toku_db_badformat(); + ret = -1; + goto exit; + } + + //Cannot use XMALLOC + MALLOC_N(total_size, raw_block); + if (raw_block == nullptr) { + errno = ENOMEM; + ret = -1; + goto exit; + } + readcode = toku_os_read(dbf->fd, raw_block, total_size); + if (readcode < 0) { + ret = -1; + goto exit; + } + if (readcode < total_size) { + errno = TOKUDB_NO_DATA; + ret = -1; + goto exit; + } + + struct sub_block sub_block[max_sub_blocks]; + uint32_t *sub_block_header; + sub_block_header = (uint32_t *) &raw_block[0]; + int32_t n_sub_blocks; + n_sub_blocks = toku_dtoh32(sub_block_header[0]); + sub_block_header++; + size_t size_subblock_header; + size_subblock_header = sub_block_header_size(n_sub_blocks); + if (n_sub_blocks == 0 || n_sub_blocks > max_sub_blocks || size_subblock_header > total_size) { + errno = toku_db_badformat(); + ret = -1; + goto exit; + } + for (int i = 0; i < n_sub_blocks; i++) { + sub_block_init(&sub_block[i]); + sub_block[i].compressed_size = toku_dtoh32(sub_block_header[0]); + sub_block[i].uncompressed_size = toku_dtoh32(sub_block_header[1]); + sub_block[i].xsum = toku_dtoh32(sub_block_header[2]); + sub_block_header += 3; + } + + // verify sub block sizes + size_t total_compressed_size; + total_compressed_size = 0; + for (int i = 0; i < n_sub_blocks; i++) { + uint32_t compressed_size = sub_block[i].compressed_size; + if (compressed_size<=0 || compressed_size>(1<<30)) { + errno = toku_db_badformat(); + ret = -1; + goto exit; + } + + uint32_t uncompressed_size = sub_block[i].uncompressed_size; + if (uncompressed_size<=0 || uncompressed_size>(1<<30)) { + errno = toku_db_badformat(); + ret = -1; + goto exit; + } + total_compressed_size += compressed_size; + } + if (total_size != total_compressed_size + size_subblock_header) { + errno = toku_db_badformat(); + ret = -1; + goto exit; + } + + // sum up the uncompressed size of the sub blocks + size_t uncompressed_size; + uncompressed_size = get_sum_uncompressed_size(n_sub_blocks, sub_block); + if (uncompressed_size > bufsize || uncompressed_size > MAX_UNCOMPRESSED_BUF) { + errno = toku_db_badformat(); + ret = -1; + goto exit; + } + + unsigned char *uncompressed_data; + uncompressed_data = (unsigned char *)buf; + + // point at the start of the compressed data (past the node header, the sub block header, and the header checksum) + unsigned char *compressed_data; + compressed_data = raw_block + size_subblock_header; + + // decompress all the compressed sub blocks into the uncompressed buffer + { + int r; + r = decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, get_num_cores(), get_ft_pool()); + if (r != 0) { + fprintf(stderr, "%s:%d loader failed %d at %p size %" PRIu32"\n", __FUNCTION__, __LINE__, r, raw_block, total_size); + dump_bad_block(raw_block, total_size); + errno = r; + ret = -1; + goto exit; + } + } + ret = uncompressed_size; +exit: + if (raw_block) { + toku_free(raw_block); + } + return ret; +} + +static ssize_t dbf_read_compressed(struct dbufio_file *dbf, char *buf, size_t bufsize) { + invariant(bufsize >= MAX_UNCOMPRESSED_BUF); + size_t count = 0; + + while (count + MAX_UNCOMPRESSED_BUF <= bufsize) { + ssize_t readcode = dbf_read_some_compressed(dbf, buf + count, bufsize - count); + if (readcode < 0) { + return readcode; + } + count += readcode; + if (readcode == 0) { + break; + } + } + return count; +} + static void* io_thread (void *v) // The dbuf_thread does all the asynchronous I/O. { @@ -118,7 +278,13 @@ static void* io_thread (void *v) toku_mutex_unlock(&bfs->mutex); //printf("%s:%d Doing read fd=%d\n", __FILE__, __LINE__, dbf->fd); { - ssize_t readcode = toku_os_read(dbf->fd, dbf->buf[1], bfs->bufsize); + ssize_t readcode; + if (bfs->compressed) { + readcode = dbf_read_compressed(dbf, dbf->buf[1], bfs->bufsize); + } + else { + readcode = toku_os_read(dbf->fd, dbf->buf[1], bfs->bufsize); + } //printf("%s:%d readcode=%ld\n", __FILE__, __LINE__, readcode); if (readcode==-1) { // a real error. Save the real error. @@ -159,11 +325,14 @@ static void* io_thread (void *v) } } -int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t bufsize) { +int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t bufsize, bool compressed) { //printf("%s:%d here\n", __FILE__, __LINE__); int result = 0; DBUFIO_FILESET CALLOC(bfs); if (bfs==0) { result = get_error_errno(); } + + bfs->compressed = compressed; + bool mutex_inited = false, cond_inited = false; if (result==0) { CALLOC_N(N, bfs->files); @@ -190,7 +359,7 @@ int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t b for (int i=0; ifiles[i].fd = fds[i]; bfs->files[i].offset_in_buf = 0; - bfs->files[i].offset_in_file = 0; + bfs->files[i].offset_in_uncompressed_file = 0; bfs->files[i].next = NULL; bfs->files[i].second_buf_ready = false; for (int j=0; j<2; j++) { @@ -202,12 +371,18 @@ int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t b bfs->files[i].error_code[j] = 0; } bfs->files[i].io_done = false; - { - ssize_t r = toku_os_read(bfs->files[i].fd, bfs->files[i].buf[0], bufsize); + ssize_t r; + if (bfs->compressed) { + r = dbf_read_compressed(&bfs->files[i], bfs->files[i].buf[0], bufsize); + } else { + //TODO: 5663 If compressed need to read differently + r = toku_os_read(bfs->files[i].fd, bfs->files[i].buf[0], bufsize); + } + { if (r<0) { result=get_error_errno(); break; - } else if (r==0) { + } else if (r==0) { // it's EOF bfs->files[i].io_done = true; bfs->n_not_done--; @@ -297,7 +472,7 @@ int dbufio_fileset_read (DBUFIO_FILESET bfs, int filenum, void *buf_v, size_t co // Enough data is present to do it all now memcpy(buf, dbf->buf[0]+dbf->offset_in_buf, count); dbf->offset_in_buf += count; - dbf->offset_in_file += count; + dbf->offset_in_uncompressed_file += count; *n_read = count; return 0; } else if (dbf->n_in_buf[0] > dbf->offset_in_buf) { @@ -306,7 +481,7 @@ int dbufio_fileset_read (DBUFIO_FILESET bfs, int filenum, void *buf_v, size_t co assert(dbf->offset_in_buf + this_count <= bfs->bufsize); memcpy(buf, dbf->buf[0]+dbf->offset_in_buf, this_count); dbf->offset_in_buf += this_count; - dbf->offset_in_file += this_count; + dbf->offset_in_uncompressed_file += this_count; size_t sub_n_read; int r = dbufio_fileset_read(bfs, filenum, buf+this_count, count-this_count, &sub_n_read); if (r==0) { diff --git a/ft/dbufio.h b/ft/dbufio.h index 9aa2ae93fa8..ad1c5d92280 100644 --- a/ft/dbufio.h +++ b/ft/dbufio.h @@ -14,7 +14,7 @@ /* An implementation would typically use a separate thread or asynchronous I/O to fetch ahead data for each file. The system will typically fill two buffers of size M for each file. One buffer is being read out of using dbuf_read(), and the other buffer is either empty (waiting on the asynchronous I/O to start), being filled in by the asynchronous I/O mechanism, or is waiting for the caller to read data from it. */ typedef struct dbufio_fileset *DBUFIO_FILESET; -int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t bufsize); +int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t bufsize, bool compressed); int destroy_dbufio_fileset(DBUFIO_FILESET); diff --git a/ft/ft.h b/ft/ft.h index acccb1840e7..c5d92deeac2 100644 --- a/ft/ft.h +++ b/ft/ft.h @@ -110,4 +110,7 @@ void toku_ft_set_blackhole(FT_HANDLE ft_handle); // The difference between the two is MVCC garbage. void toku_ft_get_garbage(FT ft, uint64_t *total_space, uint64_t *used_space); +int get_num_cores(void); +struct toku_thread_pool *get_ft_pool(void); +void dump_bad_block(unsigned char *vp, uint64_t size); #endif diff --git a/ft/ft_node-serialize.cc b/ft/ft_node-serialize.cc index e65546fe9ce..5538eb46269 100644 --- a/ft/ft_node-serialize.cc +++ b/ft/ft_node-serialize.cc @@ -10,6 +10,7 @@ #include #include #include +#include "ft.h" static FT_UPGRADE_STATUS_S ft_upgrade_status; @@ -57,6 +58,14 @@ static inline void do_toku_trace(const char *cp, int len) { static int num_cores = 0; // cache the number of cores for the parallelization static struct toku_thread_pool *ft_pool = NULL; +int get_num_cores(void) { + return num_cores; +} + +struct toku_thread_pool *get_ft_pool(void) { + return ft_pool; +} + void toku_ft_serialize_layer_init(void) { num_cores = toku_os_get_number_active_processors(); @@ -980,7 +989,7 @@ deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf, // dump a buffer to stderr // no locking around this for now -static void +void dump_bad_block(unsigned char *vp, uint64_t size) { const uint64_t linesize = 64; uint64_t n = size / linesize; @@ -1181,14 +1190,7 @@ read_and_decompress_sub_block(struct rbuf *rb, struct sub_block *sb) goto exit; } - sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size); - - toku_decompress( - (Bytef *) sb->uncompressed_ptr, - sb->uncompressed_size, - (Bytef *) sb->compressed_ptr, - sb->compressed_size - ); + just_decompress_sub_block(sb); exit: return r; } diff --git a/ft/ftloader-internal.h b/ft/ftloader-internal.h index 44a285099f4..0772a0309fd 100644 --- a/ft/ftloader-internal.h +++ b/ft/ftloader-internal.h @@ -12,6 +12,19 @@ #include #include "dbufio.h" +enum { EXTRACTOR_QUEUE_DEPTH = 2, + FILE_BUFFER_SIZE = 1<<24, + MIN_ROWSET_MEMORY = 1<<23, + MIN_MERGE_FANIN = 2, + FRACTAL_WRITER_QUEUE_DEPTH = 3, + FRACTAL_WRITER_ROWSETS = FRACTAL_WRITER_QUEUE_DEPTH + 2, + DBUFIO_DEPTH = 2, + TARGET_MERGE_BUF_SIZE = 1<<24, // we'd like the merge buffer to be this big. + MIN_MERGE_BUF_SIZE = 1<<20, // always use at least this much + MAX_UNCOMPRESSED_BUF = MIN_MERGE_BUF_SIZE +}; + + /* These functions are exported to allow the tests to compile. */ /* These structures maintain a collection of all the open temporary files used by the loader. */ @@ -56,7 +69,7 @@ int init_rowset (struct rowset *rows, uint64_t memory_budget); void destroy_rowset (struct rowset *rows); int add_row (struct rowset *rows, DBT *key, DBT *val); -int loader_write_row(DBT *key, DBT *val, FIDX data, FILE*, uint64_t *dataoff, FTLOADER bl); +int loader_write_row(DBT *key, DBT *val, FIDX data, FILE*, uint64_t *dataoff, struct wbuf *wb, FTLOADER bl); int loader_read_row (FILE *f, DBT *key, DBT *val); struct merge_fileset { @@ -146,6 +159,7 @@ struct ft_loader_s { CACHETABLE cachetable; bool did_reserve_memory; + bool compress_intermediates; uint64_t reserved_memory; // how much memory are we allowed to use? /* To make it easier to recover from errors, we don't use FILE*, instead we use an index into the file_infos. */ @@ -243,7 +257,8 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp, const char *temp_file_template, LSN load_lsn, TOKUTXN txn, - bool reserve_memory); + bool reserve_memory, + bool compress_intermediates); void toku_ft_loader_internal_destroy (FTLOADER bl, bool is_error); diff --git a/ft/ftloader.cc b/ft/ftloader.cc index 37e2c22aec7..b70d7baeb8b 100644 --- a/ft/ftloader.cc +++ b/ft/ftloader.cc @@ -47,18 +47,6 @@ static uint32_t size_factor = 1024; static uint32_t default_loader_nodesize = FT_DEFAULT_NODE_SIZE; static uint32_t default_loader_basementnodesize = FT_DEFAULT_BASEMENT_NODE_SIZE; -enum { EXTRACTOR_QUEUE_DEPTH = 2, - FILE_BUFFER_SIZE = 1<<24, - MIN_ROWSET_MEMORY = 1<<23, - MIN_MERGE_FANIN = 2, - FRACTAL_WRITER_QUEUE_DEPTH = 3, - FRACTAL_WRITER_ROWSETS = FRACTAL_WRITER_QUEUE_DEPTH + 2, - DBUFIO_DEPTH = 2, - TARGET_MERGE_BUF_SIZE = 1<<24, // we'd like the merge buffer to be this big. - MIN_MERGE_BUF_SIZE = 1<<20 // always use at least this much -}; - - void toku_ft_loader_set_size_factor(uint32_t factor) { // For test purposes only @@ -468,7 +456,8 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp, const char *temp_file_template, LSN load_lsn, TOKUTXN txn, - bool reserve_memory) + bool reserve_memory, + bool compress_intermediates) // Effect: Allocate and initialize a FTLOADER, but do not create the extractor thread. { FTLOADER CALLOC(bl); // initialized to all zeros (hence CALLOC) @@ -484,6 +473,7 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp, bl->did_reserve_memory = false; bl->reserved_memory = 512*1024*1024; // if no cache table use 512MB. } + bl->compress_intermediates = compress_intermediates; //printf("Reserved memory=%ld\n", bl->reserved_memory); bl->src_db = src_db; @@ -570,7 +560,8 @@ int toku_ft_loader_open (/* out */ FTLOADER *blp, const char *temp_file_template, LSN load_lsn, TOKUTXN txn, - bool reserve_memory) + bool reserve_memory, + bool compress_intermediates) /* Effect: called by DB_ENV->create_loader to create a brt loader. * Arguments: * blp Return the brt loader here. @@ -592,7 +583,8 @@ int toku_ft_loader_open (/* out */ FTLOADER *blp, temp_file_template, load_lsn, txn, - reserve_memory); + reserve_memory, + compress_intermediates); if (r!=0) result = r; } if (result==0) { @@ -608,8 +600,12 @@ int toku_ft_loader_open (/* out */ FTLOADER *blp, return result; } -static void ft_loader_set_panic(FTLOADER bl, int error, bool callback) { - int r = ft_loader_set_error(&bl->error_callback, error, NULL, 0, NULL, NULL); +static void ft_loader_set_panic(FTLOADER bl, int error, bool callback, int which_db, DBT *key, DBT *val) { + DB *db = nullptr; + if (bl && bl->dbs && which_db >= 0 && which_db < bl->N) { + db = bl->dbs[which_db]; + } + int r = ft_loader_set_error(&bl->error_callback, error, db, which_db, key, val); if (r == 0 && callback) ft_loader_call_error_function(&bl->error_callback); } @@ -624,26 +620,134 @@ FILE *toku_bl_fidx2file (FTLOADER bl, FIDX i) { return result; } -static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, FTLOADER UU(bl)) +static int bl_finish_compressed_write(FILE *stream, struct wbuf *wb) { + int r; + char *compressed_buf = NULL; + const size_t data_size = wb->ndone; + invariant(data_size > 0); + invariant(data_size <= MAX_UNCOMPRESSED_BUF); + + int n_sub_blocks = 0; + int sub_block_size = 0; + + r = choose_sub_block_size(wb->ndone, max_sub_blocks, &sub_block_size, &n_sub_blocks); + invariant(r==0); + invariant(0 < n_sub_blocks && n_sub_blocks <= max_sub_blocks); + invariant(sub_block_size > 0); + + struct sub_block sub_block[max_sub_blocks]; + // set the initial sub block size for all of the sub blocks + for (int i = 0; i < n_sub_blocks; i++) { + sub_block_init(&sub_block[i]); + } + set_all_sub_block_sizes(data_size, sub_block_size, n_sub_blocks, sub_block); + + size_t compressed_len = get_sum_compressed_size_bound(n_sub_blocks, sub_block, TOKU_DEFAULT_COMPRESSION_METHOD); + const size_t sub_block_header_len = sub_block_header_size(n_sub_blocks); + const size_t other_overhead = sizeof(uint32_t); //total_size + const size_t header_len = sub_block_header_len + other_overhead; + MALLOC_N(header_len + compressed_len, compressed_buf); + if (compressed_buf == nullptr) { + return ENOMEM; + } + + // compress all of the sub blocks + char *uncompressed_ptr = (char*)wb->buf; + char *compressed_ptr = compressed_buf + header_len; + compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr, + get_num_cores(), get_ft_pool(), TOKU_DEFAULT_COMPRESSION_METHOD); + + //total_size does NOT include itself + uint32_t total_size = compressed_len + sub_block_header_len; + // serialize the sub block header + uint32_t *ptr = (uint32_t *)(compressed_buf); + *ptr++ = toku_htod32(total_size); + *ptr++ = toku_htod32(n_sub_blocks); + for (int i=0; indone = 0; + + size_t size_to_write = total_size + 4; // Includes writing total_size + + { + size_t written = do_fwrite(compressed_buf, 1, size_to_write, stream); + if (written!=size_to_write) { + if (os_fwrite_fun) // if using hook to induce artificial errors (for testing) ... + r = get_maybe_error_errno(); // ... then there is no error in the stream, but there is one in errno + else + r = ferror(stream); + invariant(r!=0); + goto exit; + } + } + r = 0; +exit: + if (compressed_buf) { + toku_free(compressed_buf); + } + return r; +} + +static int bl_compressed_write(void *ptr, size_t nbytes, FILE *stream, struct wbuf *wb) { + invariant(wb->size <= MAX_UNCOMPRESSED_BUF); + size_t bytes_left = nbytes; + char *buf = (char*)ptr; + + while (bytes_left > 0) { + size_t bytes_to_copy = bytes_left; + if (wb->ndone + bytes_to_copy > wb->size) { + bytes_to_copy = wb->size - wb->ndone; + } + wbuf_nocrc_literal_bytes(wb, buf, bytes_to_copy); + if (wb->ndone == wb->size) { + //Compress, write to disk, and empty out wb + int r = bl_finish_compressed_write(stream, wb); + if (r != 0) { + errno = r; + return -1; + } + wb->ndone = 0; + } + bytes_left -= bytes_to_copy; + buf += bytes_to_copy; + } + return 0; +} + +static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, struct wbuf *wb, FTLOADER bl) /* Effect: this is a wrapper for fwrite that returns 0 on success, otherwise returns an error number. * Arguments: * ptr the data to be writen. * size the amount of data to be written. * nmemb the number of units of size to be written. * stream write the data here. + * wb where to write uncompressed data (if we're compressing) or ignore if NULL * bl passed so we can panic the ft_loader if something goes wrong (recording the error number). * Return value: 0 on success, an error number otherwise. */ { - size_t r = do_fwrite(ptr, size, nmemb, stream); - if (r!=nmemb) { - int e; - if (os_fwrite_fun) // if using hook to induce artificial errors (for testing) ... - e = get_maybe_error_errno(); // ... then there is no error in the stream, but there is one in errno - else - e = ferror(stream); - invariant(e!=0); - return e; + if (!bl->compress_intermediates || !wb) { + size_t r = do_fwrite(ptr, size, nmemb, stream); + if (r!=nmemb) { + int e; + if (os_fwrite_fun) // if using hook to induce artificial errors (for testing) ... + e = get_maybe_error_errno(); // ... then there is no error in the stream, but there is one in errno + else + e = ferror(stream); + invariant(e!=0); + return e; + } + } else { + size_t num_bytes = size * nmemb; + int r = bl_compressed_write(ptr, num_bytes, stream, wb); + if (r != 0) { + return r; + } } return 0; } @@ -674,12 +778,12 @@ static int bl_fread (void *ptr, size_t size, size_t nmemb, FILE *stream) } } -static int bl_write_dbt (DBT *dbt, FILE* datafile, uint64_t *dataoff, FTLOADER bl) +static int bl_write_dbt (DBT *dbt, FILE* datafile, uint64_t *dataoff, struct wbuf *wb, FTLOADER bl) { int r; int dlen = dbt->size; - if ((r=bl_fwrite(&dlen, sizeof(dlen), 1, datafile, bl))) return r; - if ((r=bl_fwrite(dbt->data, 1, dlen, datafile, bl))) return r; + if ((r=bl_fwrite(&dlen, sizeof(dlen), 1, datafile, wb, bl))) return r; + if ((r=bl_fwrite(dbt->data, 1, dlen, datafile, wb, bl))) return r; if (dataoff) *dataoff += dlen + sizeof(dlen); return 0; @@ -741,12 +845,13 @@ static int bl_read_dbt_from_dbufio (/*in*/DBT *dbt, DBUFIO_FILESET bfs, int file } -int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, uint64_t *dataoff, FTLOADER bl) +int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, uint64_t *dataoff, struct wbuf *wb, FTLOADER bl) /* Effect: Given a key and a val (both DBTs), write them to a file. Increment *dataoff so that it's up to date. * Arguments: * key, val write these. * data the file to write them to * dataoff a pointer to a counter that keeps track of the amount of data written so far. + * wb a pointer (possibly NULL) to buffer uncompressed output * bl the ft_loader (passed so we can panic if needed). * Return value: 0 on success, an error number otherwise. */ @@ -755,8 +860,8 @@ int loader_write_row(DBT *key, DBT *val, FIDX data, FILE *dataf, uint64_t *datao //int vlen = val->size; int r; // we have a chance to handle the errors because when we close we can delete all the files. - if ((r=bl_write_dbt(key, dataf, dataoff, bl))) return r; - if ((r=bl_write_dbt(val, dataf, dataoff, bl))) return r; + if ((r=bl_write_dbt(key, dataf, dataoff, wb, bl))) return r; + if ((r=bl_write_dbt(val, dataf, dataoff, wb, bl))) return r; toku_mutex_lock(&bl->file_infos.lock); bl->file_infos.file_infos[data.idx].n_rows++; toku_mutex_unlock(&bl->file_infos.lock); @@ -948,7 +1053,7 @@ static void* extractor_thread (void *blv) { { r = process_primary_rows(bl, primary_rowset); if (r) - ft_loader_set_panic(bl, r, false); + ft_loader_set_panic(bl, r, false, 0, nullptr, nullptr); } } @@ -956,7 +1061,7 @@ static void* extractor_thread (void *blv) { if (r == 0) { r = finish_primary_rows(bl); if (r) - ft_loader_set_panic(bl, r, false); + ft_loader_set_panic(bl, r, false, 0, nullptr, nullptr); } return NULL; @@ -1086,31 +1191,41 @@ static int process_primary_rows_internal (FTLOADER bl, struct rowset *primary_ro pkey.size = prow->klen; pval.data = primary_rowset->data + prow->off + prow->klen; pval.size = prow->vlen; - + + DBT *dest_key = &skey; + DBT *dest_val = &sval; + { - int r = bl->generate_row_for_put(bl->dbs[i], bl->src_db, &skey, &sval, &pkey, &pval); - if (r != 0) { - error_codes[i] = r; + int r; + + if (bl->dbs[i] != bl->src_db) { + r = bl->generate_row_for_put(bl->dbs[i], bl->src_db, dest_key, dest_val, &pkey, &pval); + if (r != 0) { + error_codes[i] = r; + inc_error_count(); + break; + } + } else { + dest_key = &pkey; + dest_val = &pval; + } + if (dest_key->size > klimit) { + error_codes[i] = EINVAL; + fprintf(stderr, "Key too big (keysize=%d bytes, limit=%d bytes)\n", dest_key->size, klimit); inc_error_count(); break; } - if (skey.size > klimit) { + if (dest_val->size > vlimit) { error_codes[i] = EINVAL; - fprintf(stderr, "Key too big (keysize=%d bytes, limit=%d bytes)\n", skey.size, klimit); - inc_error_count(); - break; - } - if (sval.size > vlimit) { - error_codes[i] = EINVAL; - fprintf(stderr, "Row too big (rowsize=%d bytes, limit=%d bytes)\n", sval.size, vlimit); + fprintf(stderr, "Row too big (rowsize=%d bytes, limit=%d bytes)\n", dest_val->size, vlimit); inc_error_count(); break; } } - bl->extracted_datasizes[i] += ft_loader_leafentry_size(skey.size, sval.size, leafentry_xid(bl, i)); + bl->extracted_datasizes[i] += ft_loader_leafentry_size(dest_key->size, dest_val->size, leafentry_xid(bl, i)); - if (row_wont_fit(rows, skey.size + sval.size)) { + if (row_wont_fit(rows, dest_key->size + dest_val->size)) { //printf("%s:%d rows.n_rows=%ld rows.n_bytes=%ld\n", __FILE__, __LINE__, rows->n_rows, rows->n_bytes); int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare); // cannot spawn this because of the race on rows. If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however. // If we do spawn this, then we must account for the additional storage in the memory_per_rowset() function. @@ -1121,7 +1236,7 @@ static int process_primary_rows_internal (FTLOADER bl, struct rowset *primary_ro break; } } - int r = add_row(rows, &skey, &sval); + int r = add_row(rows, dest_key, dest_val); if (r != 0) { error_codes[i] = r; inc_error_count(); @@ -1142,7 +1257,7 @@ static int process_primary_rows_internal (FTLOADER bl, struct rowset *primary_ro } } - { + if (bl->dbs[i] != bl->src_db) { if (skey.flags) { toku_free(skey.data); skey.data = NULL; } @@ -1157,7 +1272,10 @@ static int process_primary_rows_internal (FTLOADER bl, struct rowset *primary_ro int r = 0; if (error_count > 0) { for (int i=0; iN; i++) { - if (error_codes[i]) r = error_codes[i]; + if (error_codes[i]) { + r = error_codes[i]; + ft_loader_set_panic(bl, r, false, i, nullptr, nullptr); + } } invariant(r); // found the error } @@ -1457,16 +1575,42 @@ static int update_progress (int N, static int write_rowset_to_file (FTLOADER bl, FIDX sfile, const struct rowset rows) { + int r = 0; + // Allocate a buffer if we're compressing intermediates. + char *uncompressed_buffer = nullptr; + if (bl->compress_intermediates) { + MALLOC_N(MAX_UNCOMPRESSED_BUF, uncompressed_buffer); + if (uncompressed_buffer == nullptr) { + return ENOMEM; + } + } + struct wbuf wb; + wbuf_init(&wb, uncompressed_buffer, MAX_UNCOMPRESSED_BUF); + FILE *sstream = toku_bl_fidx2file(bl, sfile); for (size_t i=0; icompress_intermediates && wb.ndone > 0) { + r = bl_finish_compressed_write(sstream, &wb); + if (r != 0) { + goto exit; + } + } + r = 0; +exit: + if (uncompressed_buffer) { + toku_free(uncompressed_buffer); + } + return r; } @@ -1621,6 +1765,17 @@ int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q int r = init_rowset(output_rowset, memory_per_rowset_during_merge(bl, n_sources, to_q)); if (r!=0) result = r; } + + // Allocate a buffer if we're compressing intermediates. + char *uncompressed_buffer = nullptr; + struct wbuf wb; + if (bl->compress_intermediates && !to_q) { + MALLOC_N(MAX_UNCOMPRESSED_BUF, uncompressed_buffer); + if (uncompressed_buffer == nullptr) { + result = ENOMEM; + } + } + wbuf_init(&wb, uncompressed_buffer, MAX_UNCOMPRESSED_BUF); //printf(" n_rows=%ld\n", n_rows); while (result==0 && pqueue_size(pq)>0) { @@ -1663,7 +1818,7 @@ int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q } } else { // write it to the dest file - int r = loader_write_row(&keys[mini], &vals[mini], dest_data, dest_stream, &dataoff[mini], bl); + int r = loader_write_row(&keys[mini], &vals[mini], dest_data, dest_stream, &dataoff[mini], &wb, bl); if (r!=0) { result = r; break; @@ -1710,6 +1865,10 @@ int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q if (0) printf("%s:%d Progress=%d\n", __FILE__, __LINE__, r); } } + if (result == 0 && uncompressed_buffer != nullptr && wb.ndone > 0) { + result = bl_finish_compressed_write(dest_stream, &wb); + } + if (result==0 && to_q) { int r = queue_enq(q, (void*)output_rowset, 1, NULL); if (r!=0) @@ -1719,6 +1878,9 @@ int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q } // cleanup + if (uncompressed_buffer) { + toku_free(uncompressed_buffer); + } for (int i=0; icompress_intermediates); if (r!=0) { result = r; } } @@ -1895,7 +2058,7 @@ int merge_files (struct merge_fileset *fs, if (result!=0) break; } - if (result) ft_loader_set_panic(bl, result, true); + if (result) ft_loader_set_panic(bl, result, true, which_db, nullptr, nullptr); { int r = queue_eof(output_q); @@ -2249,7 +2412,7 @@ static int toku_loader_write_ft_from_q (FTLOADER bl, int rr = queue_deq(q, &item, NULL, NULL); if (rr == EOF) break; if (rr != 0) { - ft_loader_set_panic(bl, rr, true); + ft_loader_set_panic(bl, rr, true, which_db, nullptr, nullptr); break; } } @@ -2283,8 +2446,8 @@ static int toku_loader_write_ft_from_q (FTLOADER bl, n_pivots++; invariant(maxkey.data != NULL); - if ((r = bl_write_dbt(&maxkey, pivots_stream, NULL, bl))) { - ft_loader_set_panic(bl, r, true); + if ((r = bl_write_dbt(&maxkey, pivots_stream, NULL, nullptr, bl))) { + ft_loader_set_panic(bl, r, true, which_db, nullptr, nullptr); if (result == 0) result = r; break; } @@ -2294,7 +2457,7 @@ static int toku_loader_write_ft_from_q (FTLOADER bl, r = allocate_block(&out, &lblock); if (r != 0) { - ft_loader_set_panic(bl, r, true); + ft_loader_set_panic(bl, r, true, which_db, nullptr, nullptr); if (result == 0) result = r; break; } @@ -2346,7 +2509,7 @@ static int toku_loader_write_ft_from_q (FTLOADER bl, { DBT key = make_dbt(0,0); // must write an extra DBT into the pivots file. - r = bl_write_dbt(&key, pivots_stream, NULL, bl); + r = bl_write_dbt(&key, pivots_stream, NULL, nullptr, bl); if (r) { result = r; goto error; } @@ -2723,7 +2886,7 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr result = update_progress(progress_allocation, bl, "wrote node"); if (result) - ft_loader_set_panic(bl, result, true); + ft_loader_set_panic(bl, result, true, 0, nullptr, nullptr); } static int write_translation_table (struct dbout *out, long long *off_of_translation_p) { @@ -2833,7 +2996,7 @@ static int setup_nonleaf_block (int n_children, if (result == 0) { FILE *next_pivots_stream = toku_bl_fidx2file(bl, next_pivots_file); - int r = bl_write_dbt(&pivots[n_children-1], next_pivots_stream, NULL, bl); + int r = bl_write_dbt(&pivots[n_children-1], next_pivots_stream, NULL, nullptr, bl); if (r) result = r; } @@ -2933,7 +3096,7 @@ static void write_nonleaf_node (FTLOADER bl, struct dbout *out, int64_t blocknum toku_free(subtree_info); if (result != 0) - ft_loader_set_panic(bl, result, true); + ft_loader_set_panic(bl, result, true, 0, nullptr, nullptr); } static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const DESCRIPTOR descriptor, uint32_t target_nodesize, uint32_t target_basementnodesize, enum toku_compression_method target_compression_method) { diff --git a/ft/ftloader.h b/ft/ftloader.h index c7c9d2fea32..626a8b205f7 100644 --- a/ft/ftloader.h +++ b/ft/ftloader.h @@ -26,7 +26,8 @@ int toku_ft_loader_open (FTLOADER *bl, const char *temp_file_template, LSN load_lsn, TOKUTXN txn, - bool reserve_memory); + bool reserve_memory, + bool compress_intermediates); int toku_ft_loader_put (FTLOADER bl, DBT *key, DBT *val); diff --git a/ft/sub_block.h b/ft/sub_block.h index 0f0a4840dd5..9abe1f8d430 100644 --- a/ft/sub_block.h +++ b/ft/sub_block.h @@ -44,7 +44,7 @@ sub_block_header_size(int n_sub_blocks); void set_compressed_size_bound(struct sub_block *se, enum toku_compression_method method); -// get the sum of the sub block compressed sizes +// get the sum of the sub block compressed bound sizes size_t get_sum_compressed_size_bound(int n_sub_blocks, struct sub_block sub_block[], enum toku_compression_method method); diff --git a/ft/tests/dbufio-test-destroy.cc b/ft/tests/dbufio-test-destroy.cc index ecef5017083..c2605c2e43a 100644 --- a/ft/tests/dbufio-test-destroy.cc +++ b/ft/tests/dbufio-test-destroy.cc @@ -42,7 +42,7 @@ static void test1 (size_t chars_per_file, size_t UU(bytes_per_read)) { } DBUFIO_FILESET bfs; { - int r = create_dbufio_fileset(&bfs, N, fds, M); + int r = create_dbufio_fileset(&bfs, N, fds, M, false); assert(r==0); } diff --git a/ft/tests/dbufio-test.cc b/ft/tests/dbufio-test.cc index 3661ed02399..9ebfe9baaae 100644 --- a/ft/tests/dbufio-test.cc +++ b/ft/tests/dbufio-test.cc @@ -41,7 +41,7 @@ static void test1 (size_t chars_per_file, size_t bytes_per_read) { } DBUFIO_FILESET bfs; { - int r = create_dbufio_fileset(&bfs, N, fds, M); + int r = create_dbufio_fileset(&bfs, N, fds, M, false); assert(r==0); } while (n_live>0) { diff --git a/ft/tests/ftloader-test-bad-generate.cc b/ft/tests/ftloader-test-bad-generate.cc index 0d3720207e5..4f3ad7af069 100644 --- a/ft/tests/ftloader-test-bad-generate.cc +++ b/ft/tests/ftloader-test-bad-generate.cc @@ -87,7 +87,7 @@ static void test_extractor(int nrows, int nrowsets, bool expect_fail) { } FTLOADER loader; - r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, "tempXXXXXX", ZERO_LSN, TXNID_NONE, true); + r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, "tempXXXXXX", ZERO_LSN, TXNID_NONE, true, false); assert(r == 0); struct rowset *rowset[nrowsets]; diff --git a/ft/tests/ftloader-test-extractor-errors.cc b/ft/tests/ftloader-test-extractor-errors.cc index 0d189c85092..b653a4d8406 100644 --- a/ft/tests/ftloader-test-extractor-errors.cc +++ b/ft/tests/ftloader-test-extractor-errors.cc @@ -99,7 +99,7 @@ static void test_extractor(int nrows, int nrowsets, bool expect_fail, const char sprintf(temp, "%s/%s", testdir, "tempXXXXXX"); FTLOADER loader; - r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, "tempXXXXXX", ZERO_LSN, TXNID_NONE, true); + r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, "tempXXXXXX", ZERO_LSN, TXNID_NONE, true, false); assert(r == 0); struct rowset *rowset[nrowsets]; diff --git a/ft/tests/ftloader-test-extractor.cc b/ft/tests/ftloader-test-extractor.cc index 38ea83fd152..1e7987c68f5 100644 --- a/ft/tests/ftloader-test-extractor.cc +++ b/ft/tests/ftloader-test-extractor.cc @@ -319,7 +319,7 @@ static void test_extractor(int nrows, int nrowsets, const char *testdir) { sprintf(temp, "%s/%s", testdir, "tempXXXXXX"); FTLOADER loader; - r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, temp, ZERO_LSN, TXNID_NONE, true); + r = toku_ft_loader_open(&loader, NULL, generate, NULL, N, brts, dbs, fnames, compares, temp, ZERO_LSN, TXNID_NONE, true, false); assert(r == 0); struct rowset *rowset[nrowsets]; diff --git a/ft/tests/ftloader-test-merge-files-dbufio.cc b/ft/tests/ftloader-test-merge-files-dbufio.cc index 4d10c1c73b4..3161fd805d1 100644 --- a/ft/tests/ftloader-test-merge-files-dbufio.cc +++ b/ft/tests/ftloader-test-merge-files-dbufio.cc @@ -326,7 +326,7 @@ static void test (const char *directory, bool is_error) { bt_compare_functions, "tempxxxxxx", *lsnp, - TXNID_NONE, true); + TXNID_NONE, true, false); assert(r==0); } @@ -340,7 +340,7 @@ static void test (const char *directory, bool is_error) { { int r = queue_create(&q, 1000); assert(r==0); } DBUFIO_FILESET bfs; const int MERGE_BUF_SIZE = 100000; // bigger than 64K so that we will trigger malloc issues. - { int r = create_dbufio_fileset(&bfs, N_SOURCES, fds, MERGE_BUF_SIZE); assert(r==0); } + { int r = create_dbufio_fileset(&bfs, N_SOURCES, fds, MERGE_BUF_SIZE, false); assert(r==0); } FIDX *XMALLOC_N(N_SOURCES, src_fidxs); assert(bl->file_infos.n_files==0); bl->file_infos.n_files = N_SOURCES; diff --git a/ft/tests/ftloader-test-open.cc b/ft/tests/ftloader-test-open.cc index 9d439500ce2..c1bb6a2d061 100644 --- a/ft/tests/ftloader-test-open.cc +++ b/ft/tests/ftloader-test-open.cc @@ -57,7 +57,7 @@ static void test_loader_open(int ndbs) { for (i = 0; ; i++) { set_my_malloc_trigger(i+1); - r = toku_ft_loader_open(&loader, NULL, NULL, NULL, ndbs, brts, dbs, fnames, compares, "", ZERO_LSN, TXNID_NONE, true); + r = toku_ft_loader_open(&loader, NULL, NULL, NULL, ndbs, brts, dbs, fnames, compares, "", ZERO_LSN, TXNID_NONE, true, false); if (r == 0) break; } diff --git a/ft/tests/ftloader-test.cc b/ft/tests/ftloader-test.cc index 852e82624c0..5cbdc05f765 100644 --- a/ft/tests/ftloader-test.cc +++ b/ft/tests/ftloader-test.cc @@ -187,7 +187,7 @@ static void test_read_write_rows (char *tf_template) { toku_fill_dbt(&key, keystrings[i], strlen(keystrings[i])); DBT val; toku_fill_dbt(&val, valstrings[i], strlen(valstrings[i])); - r = loader_write_row(&key, &val, file, toku_bl_fidx2file(&bl, file), &dataoff, &bl); + r = loader_write_row(&key, &val, file, toku_bl_fidx2file(&bl, file), &dataoff, nullptr, &bl); CKERR(r); actual_size+=key.size + val.size + 8; } diff --git a/portability/file.cc b/portability/file.cc index 4fd8892b71f..9d54ccafa07 100644 --- a/portability/file.cc +++ b/portability/file.cc @@ -430,3 +430,8 @@ int toku_fsync_directory(const char *fname) { toku_free(dirname); return result; } + +FILE *toku_os_fmemopen(void *buf, size_t size, const char *mode) { + return fmemopen(buf, size, mode); +} + diff --git a/src/loader.cc b/src/loader.cc index d66e67ecd41..dddcde4ac9a 100644 --- a/src/loader.cc +++ b/src/loader.cc @@ -174,6 +174,7 @@ toku_loader_create_loader(DB_ENV *env, DB_LOADER *loader = NULL; bool puts_allowed = !(loader_flags & LOADER_DISALLOW_PUTS); + bool compress_intermediates = (loader_flags & LOADER_COMPRESS_INTERMEDIATES) != 0; XCALLOC(loader); // init to all zeroes (thus initializing the error_callback and poll_func) XCALLOC(loader->i); // init to all zeroes (thus initializing all pointers to NULL) @@ -252,7 +253,8 @@ toku_loader_create_loader(DB_ENV *env, loader->i->temp_file_template, load_lsn, ttxn, - puts_allowed); + puts_allowed, + compress_intermediates); if ( rval!=0 ) { toku_free(new_inames_in_env); toku_free(brts); diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 82118e6979b..8965ce7a017 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -546,14 +546,22 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS) add_test(ydb/loader-stress-test2.tdb loader-stress-test.tdb -r 5000 -s -e dir.loader-stress-test2.tdb) add_test(ydb/loader-stress-test3.tdb loader-stress-test.tdb -u -c -e dir.loader-stress-test3.tdb) add_test(ydb/loader-stress-test4.tdb loader-stress-test.tdb -r 10000000 -c -e dir.loader-stress-test4.tdb) - add_test(ydb/loader-stress-test5.tdb loader-stress-test.tdb -c -z -e dir.loader-stress-test5.tdb) + add_test(ydb/loader-stress-test0z.tdb loader-stress-test.tdb -c -e dir.loader-stress-test0z.tdb -z) + add_test(ydb/loader-stress-test1z.tdb loader-stress-test.tdb -c -p -e dir.loader-stress-test1z.tdb -z) + add_test(ydb/loader-stress-test2z.tdb loader-stress-test.tdb -r 5000 -s -e dir.loader-stress-test2z.tdb -z) + add_test(ydb/loader-stress-test3z.tdb loader-stress-test.tdb -u -c -e dir.loader-stress-test3z.tdb -z) + add_test(ydb/loader-stress-test4z.tdb loader-stress-test.tdb -r 10000000 -c -e dir.loader-stress-test4z.tdb -z) set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES dir.loader-stress-test0.tdb dir.loader-stress-test1.tdb dir.loader-stress-test2.tdb dir.loader-stress-test3.tdb dir.loader-stress-test4.tdb - dir.loader-stress-test5.tdb + dir.loader-stress-test0z.tdb + dir.loader-stress-test1z.tdb + dir.loader-stress-test2z.tdb + dir.loader-stress-test3z.tdb + dir.loader-stress-test4z.tdb ) list(REMOVE_ITEM loader_tests loader-dup-test.loader) @@ -563,6 +571,12 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS) add_test(ydb/loader-dup-test3.tdb loader-dup-test.tdb -d 1 -s -r 100 -e dir.loader-dup-test3.tdb) add_test(ydb/loader-dup-test4.tdb loader-dup-test.tdb -d 1 -s -r 1000 -e dir.loader-dup-test4.tdb) add_test(ydb/loader-dup-test5.tdb loader-dup-test.tdb -d 1 -s -r 1000 -E -e dir.loader-dup-test5.tdb) + add_test(ydb/loader-dup-test0z.tdb loader-dup-test.tdb -e dir.loader-dup-test0z.tdb -z) + add_test(ydb/loader-dup-test1z.tdb loader-dup-test.tdb -d 1 -r 500000 -e dir.loader-dup-test1z.tdb -z) + add_test(ydb/loader-dup-test2z.tdb loader-dup-test.tdb -d 1 -r 1000000 -e dir.loader-dup-test2z.tdb -z) + add_test(ydb/loader-dup-test3z.tdb loader-dup-test.tdb -d 1 -s -r 100 -e dir.loader-dup-test3z.tdb -z) + add_test(ydb/loader-dup-test4z.tdb loader-dup-test.tdb -d 1 -s -r 1000 -e dir.loader-dup-test4z.tdb -z) + add_test(ydb/loader-dup-test5z.tdb loader-dup-test.tdb -d 1 -s -r 1000 -E -e dir.loader-dup-test5z.tdb -z) set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES dir.loader-dup-test0.tdb dir.loader-dup-test1.tdb @@ -570,6 +584,12 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS) dir.loader-dup-test3.tdb dir.loader-dup-test4.tdb dir.loader-dup-test5.tdb + dir.loader-dup-test0z.tdb + dir.loader-dup-test1z.tdb + dir.loader-dup-test2z.tdb + dir.loader-dup-test3z.tdb + dir.loader-dup-test4z.tdb + dir.loader-dup-test5z.tdb ) ## as part of #4503, we took out test 1 and 3 @@ -578,6 +598,8 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS) #add_test(ydb/loader-cleanup-test1.tdb loader-cleanup-test.tdb -s -r 800 -p -e dir.loader-cleanup-test1.tdb) add_test(ydb/loader-cleanup-test2.tdb loader-cleanup-test.tdb -s -r 8000 -e dir.loader-cleanup-test2.tdb) #add_test(ydb/loader-cleanup-test3.tdb loader-cleanup-test.tdb -s -r 8000 -p -e dir.loader-cleanup-test3.tdb) + add_test(ydb/loader-cleanup-test0z.tdb loader-cleanup-test.tdb -s -r 800 -e dir.loader-cleanup-test0z.tdb -z) + add_test(ydb/loader-cleanup-test2z.tdb loader-cleanup-test.tdb -s -r 8000 -e dir.loader-cleanup-test2z.tdb -z) set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES dir.loader-cleanup-test0.tdb dir.loader-cleanup-test1.tdb @@ -606,11 +628,15 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS) ) declare_custom_tests(maxsize-for-loader.tdb) - add_test(ydb/maxsize-for-loader-A.tdb maxsize-for-loader.tdb -e dir.maxsize-for-loader-A.tdb -f) - add_test(ydb/maxsize-for-loader-B.tdb maxsize-for-loader.tdb -e dir.maxsize-for-loader-B.tdb) + add_test(ydb/maxsize-for-loader-A.tdb maxsize-for-loader.tdb -e dir.maxsize-for-loader-A.tdb -f -c) + add_test(ydb/maxsize-for-loader-B.tdb maxsize-for-loader.tdb -e dir.maxsize-for-loader-B.tdb -c) + add_test(ydb/maxsize-for-loader-Az.tdb maxsize-for-loader.tdb -e dir.maxsize-for-loader-Az.tdb -f -z -c) + add_test(ydb/maxsize-for-loader-Bz.tdb maxsize-for-loader.tdb -e dir.maxsize-for-loader-Bz.tdb -z -c) set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES dir.maxsize-for-loader-A.tdb dir.maxsize-for-loader-B.tdb + dir.maxsize-for-loader-Az.tdb + dir.maxsize-for-loader-Bz.tdb ) declare_custom_tests(hotindexer-undo-do-test.tdb) @@ -667,6 +693,8 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS) ydb/env-put-multiple.tdb ydb/filesize.tdb ydb/loader-cleanup-test0.tdb + ydb/loader-cleanup-test0z.tdb + ydb/loader-cleanup-test2z.tdb ydb/manyfiles.tdb ydb/recover-loader-test.abortrecover ydb/recovery_fileops_stress.tdb @@ -712,6 +740,7 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS) ydb/loader-stress-del.nop.loader ydb/loader-stress-del.p.loader ydb/loader-stress-del.comp.loader + ydb/loader-stress-test4z.tdb ydb/test3039.tdb ydb/test3529.tdb ydb/test_update_stress.tdb diff --git a/src/tests/maxsize-for-loader.cc b/src/tests/maxsize-for-loader.cc index bb542ea1466..c398589afe8 100644 --- a/src/tests/maxsize-for-loader.cc +++ b/src/tests/maxsize-for-loader.cc @@ -7,18 +7,24 @@ #include "toku_pthread.h" #include #include +#include "toku_random.h" +using namespace toku; bool fast = false; DB_ENV *env; enum {NUM_DBS=2}; -int USE_PUTS=0; +uint32_t USE_COMPRESS=0; +bool do_check = false; uint32_t num_rows = 1; uint32_t which_db_to_fail = (uint32_t) -1; uint32_t which_row_to_fail = (uint32_t) -1; enum how_to_fail { FAIL_NONE, FAIL_KSIZE, FAIL_VSIZE } how_to_fail = FAIL_NONE; +static struct random_data random_data[NUM_DBS]; +char random_buf[NUM_DBS][8]; + static int put_multiple_generate(DB *dest_db, DB *src_db __attribute__((__unused__)), DBT *dest_key, DBT *dest_val, @@ -52,8 +58,8 @@ static int put_multiple_generate(DB *dest_db, dest_val->ulen = vsize; } assert(ksize>=sizeof(uint32_t)); - for (uint32_t i=0; idata)[i] = random(); - for (uint32_t i=0; idata)[i] = random(); + for (uint32_t i=0; idata)[i] = myrandom_r(&random_data[which]); + for (uint32_t i=0; idata)[i] = myrandom_r(&random_data[which]); *(uint32_t*)dest_key->data = rownum; dest_key->size = ksize; dest_val->size = vsize; @@ -74,7 +80,18 @@ static void error_callback (DB *db __attribute__((__unused__)), int which_db, in e->error_count++; } -static void test_loader_maxsize(DB **dbs) +static void reset_random(void) { + int r; + + for (int i = 0; i < NUM_DBS; i++) { + ZERO_STRUCT(random_data[i]); + ZERO_ARRAY(random_buf[i]); + r = myinitstate_r(i, random_buf[i], 8, &random_data[i]); + assert(r==0); + } +} + +static void test_loader_maxsize(DB **dbs, DB **check_dbs) { int r; DB_TXN *txn; @@ -85,10 +102,10 @@ static void test_loader_maxsize(DB **dbs) db_flags[i] = DB_NOOVERWRITE; dbt_flags[i] = 0; } - uint32_t loader_flags = USE_PUTS; // set with -p option + uint32_t loader_flags = USE_COMPRESS; // set with -p option // create and initialize loader - r = env->txn_begin(env, NULL, &txn, 0); + r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags); CKERR(r); @@ -98,6 +115,7 @@ static void test_loader_maxsize(DB **dbs) r = loader->set_poll_function(loader, NULL, NULL); CKERR(r); + reset_random(); // using loader->put, put values into DB DBT key, val; unsigned int k, v; @@ -107,13 +125,7 @@ static void test_loader_maxsize(DB **dbs) dbt_init(&key, &k, sizeof(unsigned int)); dbt_init(&val, &v, sizeof(unsigned int)); r = loader->put(loader, &key, &val); - if (USE_PUTS) { - //PUT loader can return -1 if it finds an error during the puts. - CKERR2s(r, 0,-1); - } - else { - CKERR(r); - } + CKERR(r); } // close the loader @@ -127,16 +139,124 @@ static void test_loader_maxsize(DB **dbs) } assert(0); checked: - r = txn->commit(txn, 0); CKERR(r); + + if (do_check && how_to_fail==FAIL_NONE) { + r = env->txn_begin(env, NULL, &txn, 0); + CKERR(r); + reset_random(); + DBT keys[NUM_DBS]; + DBT vals[NUM_DBS]; + uint32_t flags[NUM_DBS]; + for (int i = 0; i < NUM_DBS; i++) { + dbt_init_realloc(&keys[i]); + dbt_init_realloc(&vals[i]); + flags[i] = 0; + } + + for(uint32_t i=0;iput_multiple(env, src_db, txn, &key, &val, NUM_DBS, check_dbs, keys, vals, flags); + CKERR(r); + } + r = txn->commit(txn, 0); + CKERR(r); + r = env->txn_begin(env, NULL, &txn, 0); + CKERR(r); + + for (int i = 0; i < NUM_DBS; i++) { + DBC *loader_cursor; + DBC *check_cursor; + r = dbs[i]->cursor(dbs[i], txn, &loader_cursor, 0); + CKERR(r); + r = dbs[i]->cursor(check_dbs[i], txn, &check_cursor, 0); + CKERR(r); + DBT loader_key; + DBT loader_val; + DBT check_key; + DBT check_val; + dbt_init_realloc(&loader_key); + dbt_init_realloc(&loader_val); + dbt_init_realloc(&check_key); + dbt_init_realloc(&check_val); + for (uint32_t x = 0; x <= num_rows; x++) { + int r_loader = loader_cursor->c_get(loader_cursor, &loader_key, &loader_val, DB_NEXT); + int r_check = check_cursor->c_get(check_cursor, &check_key, &check_val, DB_NEXT); + assert(r_loader == r_check); + if (x == num_rows) { + CKERR2(r_loader, DB_NOTFOUND); + CKERR2(r_check, DB_NOTFOUND); + } else { + CKERR(r_loader); + CKERR(r_check); + } + assert(loader_key.size == check_key.size); + assert(loader_val.size == check_val.size); + assert(memcmp(loader_key.data, check_key.data, loader_key.size) == 0); + assert(memcmp(loader_val.data, check_val.data, loader_val.size) == 0); + } + toku_free(loader_key.data); + toku_free(loader_val.data); + toku_free(check_key.data); + toku_free(check_val.data); + loader_cursor->c_close(loader_cursor); + check_cursor->c_close(check_cursor); + } + + for (int i = 0; i < NUM_DBS; i++) { + toku_free(keys[i].data); + toku_free(vals[i].data); + dbt_init_realloc(&keys[i]); + dbt_init_realloc(&vals[i]); + } + r = txn->commit(txn, 0); + CKERR(r); + } + + } char *free_me = NULL; const char *env_dir = ENVDIR; // the default env_dir +static void create_and_open_dbs(DB **dbs, const char *suffix, int *idx) { + int r; + DBT desc; + dbt_init(&desc, "foo", sizeof("foo")); + enum {MAX_NAME=128}; + char name[MAX_NAME*2]; + + for(int i=0;iapp_private = &idx[i]; + snprintf(name, sizeof(name), "db_%04x_%s", i, suffix); + r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r); + IN_TXN_COMMIT(env, NULL, txn_desc, 0, { + { int chk_r = dbs[i]->change_descriptor(dbs[i], txn_desc, &desc, 0); CKERR(chk_r); } + }); + } +} + +static int +uint_or_size_dbt_cmp (DB *db, const DBT *a, const DBT *b) { + assert(db && a && b); + if (a->size == sizeof(unsigned int) && b->size == sizeof(unsigned int)) { + return uint_dbt_cmp(db, a, b); + } + return a->size - b->size; +} + static void run_test(uint32_t nr, uint32_t wdb, uint32_t wrow, enum how_to_fail htf) { num_rows = nr; which_db_to_fail = wdb; which_row_to_fail = wrow; how_to_fail = htf; + + //since src_key/val can't be modified, and we're using generate to make the failures, we can't fail on src_db (0) + assert(which_db_to_fail != 0); int r; { int len = strlen(env_dir) + 20; @@ -148,7 +268,7 @@ static void run_test(uint32_t nr, uint32_t wdb, uint32_t wrow, enum how_to_fail r = toku_os_mkdir(env_dir, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r = db_env_create(&env, 0); CKERR(r); - r = env->set_default_bt_compare(env, uint_dbt_cmp); CKERR(r); + r = env->set_default_bt_compare(env, uint_or_size_dbt_cmp); CKERR(r); r = env->set_generate_row_callback_for_put(env, put_multiple_generate); CKERR(r); int envflags = DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOG | DB_CREATE | DB_PRIVATE; @@ -157,37 +277,35 @@ static void run_test(uint32_t nr, uint32_t wdb, uint32_t wrow, enum how_to_fail //Disable auto-checkpointing r = env->checkpointing_set_period(env, 0); CKERR(r); - DBT desc; - dbt_init(&desc, "foo", sizeof("foo")); - enum {MAX_NAME=128}; - char name[MAX_NAME*2]; - - DB **dbs = (DB**)toku_malloc(sizeof(DB*) * NUM_DBS); - assert(dbs != NULL); + DB **XMALLOC_N(NUM_DBS, dbs); + DB **check_dbs; int idx[NUM_DBS]; - for(int i=0;iapp_private = &idx[i]; - snprintf(name, sizeof(name), "db_%04x", i); - r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r); - IN_TXN_COMMIT(env, NULL, txn_desc, 0, { - { int chk_r = dbs[i]->change_descriptor(dbs[i], txn_desc, &desc, 0); CKERR(chk_r); } - }); + + create_and_open_dbs(dbs, "loader", &idx[0]); + if (do_check && how_to_fail==FAIL_NONE) { + XMALLOC_N(NUM_DBS, check_dbs); + create_and_open_dbs(check_dbs, "check", &idx[0]); } if (verbose) printf("running test_loader()\n"); // -------------------------- // - test_loader_maxsize(dbs); + test_loader_maxsize(dbs, check_dbs); // -------------------------- // if (verbose) printf("done test_loader()\n"); for(int i=0;iclose(dbs[i], 0); CKERR(r); dbs[i] = NULL; + if (do_check && how_to_fail==FAIL_NONE) { + check_dbs[i]->close(check_dbs[i], 0); CKERR(r); + check_dbs[i] = NULL; + } } r = env->close(env, 0); CKERR(r); toku_free(dbs); + if (do_check && how_to_fail==FAIL_NONE) { + toku_free(check_dbs); + } } // ------------ infrastructure ---------- @@ -199,12 +317,12 @@ int test_main(int argc, char * const *argv) { do_args(argc, argv); run_test(1, (uint32_t) -1, (uint32_t) -1, FAIL_NONE); - run_test(1, 0, 0, FAIL_NONE); - run_test(1, 0, 0, FAIL_KSIZE); - run_test(1, 0, 0, FAIL_VSIZE); + run_test(1, 1, 0, FAIL_NONE); + run_test(1, 1, 0, FAIL_KSIZE); + run_test(1, 1, 0, FAIL_VSIZE); if (!fast) { - run_test(1000000, 0, 500000, FAIL_KSIZE); - run_test(1000000, 0, 500000, FAIL_VSIZE); + run_test(1000000, 1, 500000, FAIL_KSIZE); + run_test(1000000, 1, 500000, FAIL_VSIZE); } toku_free(free_me); return 0; @@ -223,7 +341,8 @@ static void do_args(int argc, char * const argv[]) { fprintf(stderr, " -h help\n"); fprintf(stderr, " -v verbose\n"); fprintf(stderr, " -q quiet\n"); - fprintf(stderr, " -p use DB->put\n"); + fprintf(stderr, " -z compress intermediates\n"); + fprintf(stderr, " -c compare with regular dbs\n"); fprintf(stderr, " -f fast (suitable for vgrind)\n"); exit(resultcode); } else if (strcmp(argv[0], "-e")==0) { @@ -235,13 +354,15 @@ static void do_args(int argc, char * const argv[]) { assert(r