From 48414ab76b1821707b38c445c29105e6b0de3e8c Mon Sep 17 00:00:00 2001 From: Rich Prohaska Date: Tue, 16 Apr 2013 23:59:05 -0400 Subject: [PATCH] create multiple sub blocks per leaf in the loader. merge -r 18961:head ptq branch to main refs[t:2351] git-svn-id: file:///svn/toku/tokudb@18992 c7de825b-a66e-492c-adef-691d508d4ae1 --- newbrt/Makefile | 1 + newbrt/brt-serialize.c | 299 +---------------------------------------- newbrt/brtdump.c | 34 +++++ newbrt/brtloader.c | 63 +++++---- newbrt/sub_block.c | 263 ++++++++++++++++++++++++++++++++++++ newbrt/sub_block.h | 95 +++++++++++++ 6 files changed, 431 insertions(+), 324 deletions(-) create mode 100644 newbrt/sub_block.c create mode 100644 newbrt/sub_block.h diff --git a/newbrt/Makefile b/newbrt/Makefile index fef307db62f..c8f7043b29d 100644 --- a/newbrt/Makefile +++ b/newbrt/Makefile @@ -67,6 +67,7 @@ BRT_SOURCES = \ recover \ roll \ rollback \ + sub_block \ ule \ threadpool \ toku_worker \ diff --git a/newbrt/brt-serialize.c b/newbrt/brt-serialize.c index ac011e3121b..dfd8d554ef3 100644 --- a/newbrt/brt-serialize.c +++ b/newbrt/brt-serialize.c @@ -262,203 +262,7 @@ enum { uncompressed_version_offset = 8, }; -struct sub_block { - void *uncompressed_ptr; - u_int32_t uncompressed_size; - - void *compressed_ptr; - u_int32_t compressed_size; // real compressed size - u_int32_t compressed_size_bound; // estimated compressed size - - u_int32_t xsum; // sub block checksum -}; - -struct stored_sub_block { - u_int32_t uncompressed_size; - u_int32_t compressed_size; - u_int32_t xsum; -}; - -static void -sub_block_init(struct sub_block *sub_block) { - sub_block->uncompressed_ptr = 0; - sub_block->uncompressed_size = 0; - - sub_block->compressed_ptr = 0; - sub_block->compressed_size_bound = 0; - sub_block->compressed_size = 0; - - sub_block->xsum = 0; -} - -// get the size of the compression header -static size_t -sub_block_header_size(int n_sub_blocks) { - return sizeof (u_int32_t) + n_sub_blocks * sizeof (struct stored_sub_block); -} - -// get the sum of the sub block compressed sizes -static size_t -get_sum_compressed_size_bound(int n_sub_blocks, struct sub_block sub_block[]) { - size_t compressed_size_bound = 0; - for (int i = 0; i < n_sub_blocks; i++) { - sub_block[i].compressed_size_bound = compressBound(sub_block[i].uncompressed_size); - compressed_size_bound += sub_block[i].compressed_size_bound; - } - return compressed_size_bound; -} - -// get the sum of the sub block uncompressed sizes -static size_t -get_sum_uncompressed_size(int n_sub_blocks, struct sub_block sub_block[]) { - size_t uncompressed_size = 0; - for (int i = 0; i < n_sub_blocks; i++) - uncompressed_size += sub_block[i].uncompressed_size; - return uncompressed_size; -} - -// round up n -static inline int -alignup32(int a, int b) { - return ((a+b-1) / b) * b; -} - -static const int max_sub_blocks = 8; - -// Choose n_sub_blocks and sub_block_size such that the product is >= total_size and the sub_block_size is at -// least >= the target_sub_block_size. -static void -choose_sub_block_size(int total_size, int n_sub_blocks_limit, int *sub_block_size_ret, int *n_sub_blocks_ret) { - - const int target_sub_block_size = 512*1024; - const int alignment = 32; - - int n_sub_blocks, sub_block_size; - n_sub_blocks = total_size / target_sub_block_size; - if (n_sub_blocks <= 1) { - n_sub_blocks = 1; - sub_block_size = total_size; - } else { - if (n_sub_blocks > n_sub_blocks_limit) // limit the number of sub-blocks - n_sub_blocks = n_sub_blocks_limit; - sub_block_size = alignup32(total_size / n_sub_blocks, alignment); - while (sub_block_size * n_sub_blocks < total_size) // round up the sub-block size until big enough - sub_block_size += alignment; - } - - *sub_block_size_ret = sub_block_size; - *n_sub_blocks_ret = n_sub_blocks; -} - -static void -set_all_sub_block_sizes(int total_size, int sub_block_size, int n_sub_blocks, struct sub_block sub_block[]) { - int size_left = total_size; - int i; - for (i = 0; i < n_sub_blocks-1; i++) { - sub_block[i].uncompressed_size = sub_block_size; - size_left -= sub_block_size; - } - if (i == 0 || size_left > 0) - sub_block[i].uncompressed_size = size_left; -} - -#include "workset.h" - -struct compress_work { - struct work base; - struct sub_block *sub_block; -}; - -static void -compress_work_init(struct compress_work *w, struct sub_block *sub_block) { - w->sub_block = sub_block; -} - -static void -compress_sub_block(struct sub_block *sub_block) { - // compress it - Bytef *uncompressed_ptr = (Bytef *) sub_block->uncompressed_ptr; - Bytef *compressed_ptr = (Bytef *) sub_block->compressed_ptr; - uLongf uncompressed_len = sub_block->uncompressed_size; - uLongf real_compressed_len = sub_block->compressed_size_bound; - int compression_level = 5; - - int r = compress2((Bytef*)compressed_ptr, &real_compressed_len, - (Bytef*)uncompressed_ptr, uncompressed_len, - compression_level); - assert(r == Z_OK); - sub_block->compressed_size = real_compressed_len; // replace the compressed size estimate with the real size - - // checksum it - sub_block->xsum = x1764_memory(sub_block->compressed_ptr, sub_block->compressed_size); -} - -static void * -compress_worker(void *arg) { - struct workset *ws = (struct workset *) arg; - while (1) { - struct compress_work *w = (struct compress_work *) workset_get(ws); - if (w == NULL) - break; - compress_sub_block(w->sub_block); - } - return arg; -} - -static size_t -compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *uncompressed_ptr, char *compressed_ptr) { - char *compressed_base_ptr = compressed_ptr; - size_t compressed_len; - - if (n_sub_blocks == 1) { - // single sub-block - sub_block[0].uncompressed_ptr = uncompressed_ptr; - sub_block[0].compressed_ptr = compressed_ptr; - compress_sub_block(&sub_block[0]); - compressed_len = sub_block[0].compressed_size; - } else { - // multiple sub-blocks - int T = num_cores; // T = min(num_cores, n_sub_blocks) - 1 - if (T > n_sub_blocks) - T = n_sub_blocks; - if (T > 0) - T = T - 1; // threads in addition to the running thread - - struct workset ws; - workset_init(&ws); - - struct compress_work work[n_sub_blocks]; - workset_lock(&ws); - for (int i = 0; i < n_sub_blocks; i++) { - sub_block[i].uncompressed_ptr = uncompressed_ptr; - sub_block[i].compressed_ptr = compressed_ptr; - compress_work_init(&work[i], &sub_block[i]); - workset_put_locked(&ws, &work[i].base); - uncompressed_ptr += sub_block[i].uncompressed_size; - compressed_ptr += sub_block[i].compressed_size_bound; - } - workset_unlock(&ws); - - // compress the sub-blocks - if (0) printf("%s:%d T=%d N=%d\n", __FUNCTION__, __LINE__, T, n_sub_blocks); - toku_pthread_t tids[T]; - threadset_create(tids, &T, compress_worker, &ws); - compress_worker(&ws); - - // wait for all of the work to complete - threadset_join(tids, T); - - // squeeze out the holes not used by the compress bound - compressed_ptr = compressed_base_ptr + sub_block[0].compressed_size; - for (int i = 1; i < n_sub_blocks; i++) { - memmove(compressed_ptr, sub_block[i].compressed_ptr, sub_block[i].compressed_size); - compressed_ptr += sub_block[i].compressed_size; - } - - compressed_len = compressed_ptr - compressed_base_ptr; - } - return compressed_len; -} +#include "sub_block.h" static void serialize_node_header(BRTNODE node, struct wbuf *wbuf) { @@ -672,7 +476,7 @@ toku_serialize_brtnode_to_memory (BRTNODE node, int UU(n_workitems), int UU(n_th // compress all of the sub blocks char *uncompressed_ptr = buf + node_header_overhead; char *compressed_ptr = compressed_buf + header_len; - compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr); + compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr, num_cores); //if (0) printf("Block %" PRId64 " Size before compressing %u, after compression %"PRIu64"\n", blocknum.b, calculated_size-node_header_overhead, (uint64_t) compressed_len); @@ -687,7 +491,7 @@ toku_serialize_brtnode_to_memory (BRTNODE node, int UU(n_workitems), int UU(n_th } // compute the header checksum and serialize it - uint32_t header_length = (void *)ptr - (void *) compressed_buf; + uint32_t header_length = (char *)ptr - (char *)compressed_buf; uint32_t xsum = x1764_memory(compressed_buf, header_length); *ptr = toku_htod32(xsum); @@ -969,101 +773,6 @@ deserialize_brtnode_from_rbuf (BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *b return 0; } -#include "workset.h" - -struct decompress_work { - struct work base; - void *compress_ptr; - void *uncompress_ptr; - u_int32_t compress_size; - u_int32_t uncompress_size; - u_int32_t xsum; - int error; -}; - -// initialize the decompression work -static void -decompress_work_init(struct decompress_work *dw, - void *compress_ptr, u_int32_t compress_size, - void *uncompress_ptr, u_int32_t uncompress_size, - u_int32_t xsum) { - dw->compress_ptr = compress_ptr; - dw->compress_size = compress_size; - dw->uncompress_ptr = uncompress_ptr; - dw->uncompress_size = uncompress_size; - dw->xsum = xsum; - dw->error = 0; -} - -// decompress one block -static int -decompress_sub_block(void *compress_ptr, u_int32_t compress_size, void *uncompress_ptr, u_int32_t uncompress_size, u_int32_t expected_xsum) { - // verify checksum - u_int32_t xsum = x1764_memory(compress_ptr, compress_size); - assert(xsum == expected_xsum); - - // decompress - uLongf destlen = uncompress_size; - int r = uncompress(uncompress_ptr, &destlen, compress_ptr, compress_size); - assert(destlen == uncompress_size); - assert(r==Z_OK); - - return 0; -} - -// decompress blocks until there is no more work to do -static void * -decompress_worker(void *arg) { - struct workset *ws = (struct workset *) arg; - while (1) { - struct decompress_work *dw = (struct decompress_work *) workset_get(ws); - if (dw == NULL) - break; - dw->error = decompress_sub_block(dw->compress_ptr, dw->compress_size, dw->uncompress_ptr, dw->uncompress_size, dw->xsum); - } - return arg; -} - -static void -decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data) { - if (n_sub_blocks == 1) { - decompress_sub_block(compressed_data, sub_block[0].compressed_size, uncompressed_data, sub_block[0].uncompressed_size, sub_block[0].xsum); - } else { - // compute the number of additional threads needed for decompressing this node - int T = num_cores; // T = min(#cores, #blocks) - 1 - if (T > n_sub_blocks) - T = n_sub_blocks; - if (T > 0) - T = T - 1; // threads in addition to the running thread - - // init the decompression work set - struct workset ws; - workset_init(&ws); - - // initialize the decompression work and add to the work set - struct decompress_work decompress_work[n_sub_blocks]; - workset_lock(&ws); - for (int i = 0; i < n_sub_blocks; i++) { - decompress_work_init(&decompress_work[i], compressed_data, sub_block[i].compressed_size, uncompressed_data, sub_block[i].uncompressed_size, sub_block[i].xsum); - workset_put_locked(&ws, &decompress_work[i].base); - - uncompressed_data += sub_block[i].uncompressed_size; - compressed_data += sub_block[i].compressed_size; - } - workset_unlock(&ws); - - // decompress the sub-blocks - if (0) printf("%s:%d Cores=%d Blocks=%d T=%d\n", __FUNCTION__, __LINE__, num_cores, n_sub_blocks, T); - toku_pthread_t tids[T]; - threadset_create(tids, &T, decompress_worker, &ws); - decompress_worker(&ws); - - // cleanup - threadset_join(tids, T); - workset_destroy(&ws); - } -} - static int decompress_brtnode_from_raw_block_into_rbuf(u_int8_t *raw_block, struct rbuf *rb, BLOCKNUM blocknum) { toku_trace("decompress"); @@ -1116,7 +825,7 @@ decompress_brtnode_from_raw_block_into_rbuf(u_int8_t *raw_block, struct rbuf *rb unsigned char *uncompressed_data = rb->buf + node_header_overhead; // decompress all the compressed sub blocks into the uncompressed buffer - decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data); + decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, num_cores); toku_trace("decompress done"); diff --git a/newbrt/brtdump.c b/newbrt/brtdump.c index cc8580441ef..d91462e9618 100644 --- a/newbrt/brtdump.c +++ b/newbrt/brtdump.c @@ -284,6 +284,7 @@ dump_block(int f, BLOCKNUM blocknum, struct brt_header *h) { toku_free(vp); } +#if 0 static void hex_dump(unsigned char *vp, u_int64_t offset, u_int64_t size) { u_int64_t i; @@ -298,6 +299,39 @@ hex_dump(unsigned char *vp, u_int64_t offset, u_int64_t size) { } printf("\n"); } +#endif + +static void +hex_dump(unsigned char *vp, u_int64_t offset, u_int64_t size) { + u_int64_t n = size / 32; + for (u_int64_t i = 0; i < n; i++) { + printf("%"PRIu64": ", offset); + for (u_int64_t j = 0; j < 32; j++) { + unsigned char c = vp[j]; + printf("%2.2X", c); + if (((j+1) % 4) == 0) + printf(" "); + } + for (u_int64_t j = 0; j < 32; j++) { + unsigned char c = vp[j]; + printf("%c", isprint(c) ? c : ' '); + } + printf("\n"); + vp += 32; + offset += 32; + } + size = size % 32; + for (u_int64_t i=0; idbuf.off - n_uncompressed_bytes_at_beginning; - int bound = compressBound(uncompressed_len); - unsigned char *MALLOC_N(header_len + bound, compressed_buf); - uLongf real_compressed_len = bound; - { - int r = compress2((Bytef*)(compressed_buf + header_len), &real_compressed_len, - (Bytef*)(lbuf->dbuf.buf + n_uncompressed_bytes_at_beginning), uncompressed_len, - compression_level); - assert(r==Z_OK); - } - - // checksum the sub block - u_int32_t xsum0 = x1764_memory(compressed_buf + header_len, real_compressed_len); + // choose sub block size and number + int sub_block_size, n_sub_blocks; + choose_sub_block_size(uncompressed_len, max_sub_blocks, &sub_block_size, &n_sub_blocks); + + int header_len = n_uncompressed_bytes_at_beginning + sub_block_header_size(n_sub_blocks) + sizeof (uint32_t); + + // initialize the sub blocks + struct sub_block sub_block[n_sub_blocks]; + for (int i = 0; i < n_sub_blocks; i++) + sub_block_init(&sub_block[i]); + set_all_sub_block_sizes(uncompressed_len, sub_block_size, n_sub_blocks, sub_block); + + // allocate space for the compressed bufer + int bound = get_sum_compressed_size_bound(n_sub_blocks, sub_block); + unsigned char *MALLOC_N(header_len + bound, compressed_buf); + + // compress and checksum the sub blocks + int compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, + (char *) (lbuf->dbuf.buf + n_uncompressed_bytes_at_beginning), + (char *) (compressed_buf + header_len), 2); + + // cppy the uncompressed header to the compressed buffer memcpy(compressed_buf, lbuf->dbuf.buf, n_uncompressed_bytes_at_beginning); - int compressed_len = real_compressed_len; - int n_compressed_blocks = 1; - memcpy(compressed_buf+16, &n_compressed_blocks, 4); - memcpy(compressed_buf+20, &compressed_len, 4); - memcpy(compressed_buf+24, &uncompressed_len, 4); - memcpy(compressed_buf+28, &xsum0, 4); + + // serialize the sub block header + memcpy(compressed_buf+16, &n_sub_blocks, 4); + for (int i = 0; i < n_sub_blocks; i++) { + memcpy(compressed_buf+20+12*i+0, &sub_block[i].compressed_size, 4); + memcpy(compressed_buf+20+12*i+4, &sub_block[i].uncompressed_size, 4); + memcpy(compressed_buf+20+12*i+8, &sub_block[i].xsum, 4); + } // compute the header checksum and serialize it u_int32_t header_xsum = x1764_memory(compressed_buf, header_len - sizeof (u_int32_t)); - memcpy(compressed_buf+32, &header_xsum, 4); + memcpy(compressed_buf + header_len - sizeof (u_int32_t), &header_xsum, 4); //#ifndef CILK_STUB // ttable_and_write_lock->lock(); //#endif long long off_of_leaf = out->current_off; - int size = real_compressed_len + header_len; + int size = header_len + compressed_len; if (0) { fprintf(stderr, "uncompressed buf size=%d (amount of data compressed)\n", uncompressed_len); - fprintf(stderr, "compressed buf size=%lu, off=%lld\n", real_compressed_len, off_of_leaf); + fprintf(stderr, "compressed buf size=%u, off=%lld\n", compressed_len, off_of_leaf); fprintf(stderr, "compressed bytes are:"); //for (int i=0; i +#include +#include + +#include "toku_portability.h" +#include "toku_assert.h" +#include "x1764.h" +#include "sub_block.h" + +void +sub_block_init(struct sub_block *sub_block) { + sub_block->uncompressed_ptr = 0; + sub_block->uncompressed_size = 0; + + sub_block->compressed_ptr = 0; + sub_block->compressed_size_bound = 0; + sub_block->compressed_size = 0; + + sub_block->xsum = 0; +} + +// get the size of the compression header +size_t +sub_block_header_size(int n_sub_blocks) { + return sizeof (u_int32_t) + n_sub_blocks * sizeof (struct stored_sub_block); +} + +// get the sum of the sub block compressed sizes +size_t +get_sum_compressed_size_bound(int n_sub_blocks, struct sub_block sub_block[]) { + size_t compressed_size_bound = 0; + for (int i = 0; i < n_sub_blocks; i++) { + sub_block[i].compressed_size_bound = compressBound(sub_block[i].uncompressed_size); + compressed_size_bound += sub_block[i].compressed_size_bound; + } + return compressed_size_bound; +} + +// get the sum of the sub block uncompressed sizes +size_t +get_sum_uncompressed_size(int n_sub_blocks, struct sub_block sub_block[]) { + size_t uncompressed_size = 0; + for (int i = 0; i < n_sub_blocks; i++) + uncompressed_size += sub_block[i].uncompressed_size; + return uncompressed_size; +} + +// round up n +static inline int +alignup32(int a, int b) { + return ((a+b-1) / b) * b; +} + +// Choose n_sub_blocks and sub_block_size such that the product is >= total_size and the sub_block_size is at +// least >= the target_sub_block_size. +void +choose_sub_block_size(int total_size, int n_sub_blocks_limit, int *sub_block_size_ret, int *n_sub_blocks_ret) { + const int alignment = 32; + + int n_sub_blocks, sub_block_size; + n_sub_blocks = total_size / target_sub_block_size; + if (n_sub_blocks <= 1) { + n_sub_blocks = 1; + sub_block_size = total_size; + } else { + if (n_sub_blocks > n_sub_blocks_limit) // limit the number of sub-blocks + n_sub_blocks = n_sub_blocks_limit; + sub_block_size = alignup32(total_size / n_sub_blocks, alignment); + while (sub_block_size * n_sub_blocks < total_size) // round up the sub-block size until big enough + sub_block_size += alignment; + } + + *sub_block_size_ret = sub_block_size; + *n_sub_blocks_ret = n_sub_blocks; +} + +void +set_all_sub_block_sizes(int total_size, int sub_block_size, int n_sub_blocks, struct sub_block sub_block[]) { + int size_left = total_size; + int i; + for (i = 0; i < n_sub_blocks-1; i++) { + sub_block[i].uncompressed_size = sub_block_size; + size_left -= sub_block_size; + } + if (i == 0 || size_left > 0) + sub_block[i].uncompressed_size = size_left; +} + +#include "workset.h" + +void +compress_work_init(struct compress_work *w, struct sub_block *sub_block) { + w->sub_block = sub_block; +} + +void +compress_sub_block(struct sub_block *sub_block) { + // compress it + Bytef *uncompressed_ptr = (Bytef *) sub_block->uncompressed_ptr; + Bytef *compressed_ptr = (Bytef *) sub_block->compressed_ptr; + uLongf uncompressed_len = sub_block->uncompressed_size; + uLongf real_compressed_len = sub_block->compressed_size_bound; + int compression_level = 5; + + int r = compress2((Bytef*)compressed_ptr, &real_compressed_len, + (Bytef*)uncompressed_ptr, uncompressed_len, + compression_level); + assert(r == Z_OK); + sub_block->compressed_size = real_compressed_len; // replace the compressed size estimate with the real size + + // checksum it + sub_block->xsum = x1764_memory(sub_block->compressed_ptr, sub_block->compressed_size); +} + +void * +compress_worker(void *arg) { + struct workset *ws = (struct workset *) arg; + while (1) { + struct compress_work *w = (struct compress_work *) workset_get(ws); + if (w == NULL) + break; + compress_sub_block(w->sub_block); + } + return arg; +} + +size_t +compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *uncompressed_ptr, char *compressed_ptr, int num_cores) { + char *compressed_base_ptr = compressed_ptr; + size_t compressed_len; + + if (n_sub_blocks == 1) { + // single sub-block + sub_block[0].uncompressed_ptr = uncompressed_ptr; + sub_block[0].compressed_ptr = compressed_ptr; + compress_sub_block(&sub_block[0]); + compressed_len = sub_block[0].compressed_size; + } else { + // multiple sub-blocks + int T = num_cores; // T = min(num_cores, n_sub_blocks) - 1 + if (T > n_sub_blocks) + T = n_sub_blocks; + if (T > 0) + T = T - 1; // threads in addition to the running thread + + struct workset ws; + workset_init(&ws); + + struct compress_work work[n_sub_blocks]; + workset_lock(&ws); + for (int i = 0; i < n_sub_blocks; i++) { + sub_block[i].uncompressed_ptr = uncompressed_ptr; + sub_block[i].compressed_ptr = compressed_ptr; + compress_work_init(&work[i], &sub_block[i]); + workset_put_locked(&ws, &work[i].base); + uncompressed_ptr += sub_block[i].uncompressed_size; + compressed_ptr += sub_block[i].compressed_size_bound; + } + workset_unlock(&ws); + + // compress the sub-blocks + if (0) printf("%s:%d T=%d N=%d\n", __FUNCTION__, __LINE__, T, n_sub_blocks); + toku_pthread_t tids[T]; + threadset_create(tids, &T, compress_worker, &ws); + compress_worker(&ws); + + // wait for all of the work to complete + threadset_join(tids, T); + + // squeeze out the holes not used by the compress bound + compressed_ptr = compressed_base_ptr + sub_block[0].compressed_size; + for (int i = 1; i < n_sub_blocks; i++) { + memmove(compressed_ptr, sub_block[i].compressed_ptr, sub_block[i].compressed_size); + compressed_ptr += sub_block[i].compressed_size; + } + + compressed_len = compressed_ptr - compressed_base_ptr; + } + return compressed_len; +} + +// initialize the decompression work +void +decompress_work_init(struct decompress_work *dw, + void *compress_ptr, u_int32_t compress_size, + void *uncompress_ptr, u_int32_t uncompress_size, + u_int32_t xsum) { + dw->compress_ptr = compress_ptr; + dw->compress_size = compress_size; + dw->uncompress_ptr = uncompress_ptr; + dw->uncompress_size = uncompress_size; + dw->xsum = xsum; + dw->error = 0; +} + +// decompress one block +int +decompress_sub_block(void *compress_ptr, u_int32_t compress_size, void *uncompress_ptr, u_int32_t uncompress_size, u_int32_t expected_xsum) { + // verify checksum + u_int32_t xsum = x1764_memory(compress_ptr, compress_size); + assert(xsum == expected_xsum); + + // decompress + uLongf destlen = uncompress_size; + int r = uncompress(uncompress_ptr, &destlen, compress_ptr, compress_size); + assert(destlen == uncompress_size); + assert(r==Z_OK); + + return 0; +} + +// decompress blocks until there is no more work to do +void * +decompress_worker(void *arg) { + struct workset *ws = (struct workset *) arg; + while (1) { + struct decompress_work *dw = (struct decompress_work *) workset_get(ws); + if (dw == NULL) + break; + dw->error = decompress_sub_block(dw->compress_ptr, dw->compress_size, dw->uncompress_ptr, dw->uncompress_size, dw->xsum); + } + return arg; +} + +void +decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data, int num_cores) { + if (n_sub_blocks == 1) { + decompress_sub_block(compressed_data, sub_block[0].compressed_size, uncompressed_data, sub_block[0].uncompressed_size, sub_block[0].xsum); + } else { + // compute the number of additional threads needed for decompressing this node + int T = num_cores; // T = min(#cores, #blocks) - 1 + if (T > n_sub_blocks) + T = n_sub_blocks; + if (T > 0) + T = T - 1; // threads in addition to the running thread + + // init the decompression work set + struct workset ws; + workset_init(&ws); + + // initialize the decompression work and add to the work set + struct decompress_work decompress_work[n_sub_blocks]; + workset_lock(&ws); + for (int i = 0; i < n_sub_blocks; i++) { + decompress_work_init(&decompress_work[i], compressed_data, sub_block[i].compressed_size, uncompressed_data, sub_block[i].uncompressed_size, sub_block[i].xsum); + workset_put_locked(&ws, &decompress_work[i].base); + + uncompressed_data += sub_block[i].uncompressed_size; + compressed_data += sub_block[i].compressed_size; + } + workset_unlock(&ws); + + // decompress the sub-blocks + if (0) printf("%s:%d Cores=%d Blocks=%d T=%d\n", __FUNCTION__, __LINE__, num_cores, n_sub_blocks, T); + toku_pthread_t tids[T]; + threadset_create(tids, &T, decompress_worker, &ws); + decompress_worker(&ws); + + // cleanup + threadset_join(tids, T); + workset_destroy(&ws); + } +} diff --git a/newbrt/sub_block.h b/newbrt/sub_block.h new file mode 100644 index 00000000000..a87cb4436b7 --- /dev/null +++ b/newbrt/sub_block.h @@ -0,0 +1,95 @@ +#ifndef TOKU_SUB_BLOCK_H +#define TOKU_SUB_BLOCK_H + +static const int max_sub_blocks = 8; +static const int target_sub_block_size = 512*1024; + +struct sub_block { + void *uncompressed_ptr; + u_int32_t uncompressed_size; + + void *compressed_ptr; + u_int32_t compressed_size; // real compressed size + u_int32_t compressed_size_bound; // estimated compressed size + + u_int32_t xsum; // sub block checksum +}; + +struct stored_sub_block { + u_int32_t uncompressed_size; + u_int32_t compressed_size; + u_int32_t xsum; +}; + +void +sub_block_init(struct sub_block *sub_block); + +// get the size of the compression header +size_t +sub_block_header_size(int n_sub_blocks); + +// get the sum of the sub block compressed sizes +size_t +get_sum_compressed_size_bound(int n_sub_blocks, struct sub_block sub_block[]); + +// get the sum of the sub block uncompressed sizes +size_t +get_sum_uncompressed_size(int n_sub_blocks, struct sub_block sub_block[]); + +// Choose n_sub_blocks and sub_block_size such that the product is >= total_size and the sub_block_size is at +// least >= the target_sub_block_size. +void +choose_sub_block_size(int total_size, int n_sub_blocks_limit, int *sub_block_size_ret, int *n_sub_blocks_ret); + +void +set_all_sub_block_sizes(int total_size, int sub_block_size, int n_sub_blocks, struct sub_block sub_block[]); + +#include "workset.h" + +struct compress_work { + struct work base; + struct sub_block *sub_block; +}; + +void +compress_work_init(struct compress_work *w, struct sub_block *sub_block); + +void +compress_sub_block(struct sub_block *sub_block); + +void * +compress_worker(void *arg); + +size_t +compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *uncompressed_ptr, char *compressed_ptr, int num_cores); + +struct decompress_work { + struct work base; + void *compress_ptr; + void *uncompress_ptr; + u_int32_t compress_size; + u_int32_t uncompress_size; + u_int32_t xsum; + int error; +}; + +// initialize the decompression work +void +decompress_work_init(struct decompress_work *dw, + void *compress_ptr, u_int32_t compress_size, + void *uncompress_ptr, u_int32_t uncompress_size, + u_int32_t xsum); + +// decompress one block +int +decompress_sub_block(void *compress_ptr, u_int32_t compress_size, void *uncompress_ptr, u_int32_t uncompress_size, u_int32_t expected_xsum); + +// decompress blocks until there is no more work to do +void * +decompress_worker(void *arg); + +void +decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data, int num_cores); + + +#endif