mirror of
https://github.com/MariaDB/server.git
synced 2025-01-22 14:54:20 +01:00
create multiple sub blocks per leaf in the loader. merge -r 18961:head ptq branch to main refs[t:2351]
git-svn-id: file:///svn/toku/tokudb@18992 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
parent
c1b0e91b95
commit
48414ab76b
6 changed files with 431 additions and 324 deletions
|
@ -67,6 +67,7 @@ BRT_SOURCES = \
|
|||
recover \
|
||||
roll \
|
||||
rollback \
|
||||
sub_block \
|
||||
ule \
|
||||
threadpool \
|
||||
toku_worker \
|
||||
|
|
|
@ -262,203 +262,7 @@ enum {
|
|||
uncompressed_version_offset = 8,
|
||||
};
|
||||
|
||||
struct sub_block {
|
||||
void *uncompressed_ptr;
|
||||
u_int32_t uncompressed_size;
|
||||
|
||||
void *compressed_ptr;
|
||||
u_int32_t compressed_size; // real compressed size
|
||||
u_int32_t compressed_size_bound; // estimated compressed size
|
||||
|
||||
u_int32_t xsum; // sub block checksum
|
||||
};
|
||||
|
||||
struct stored_sub_block {
|
||||
u_int32_t uncompressed_size;
|
||||
u_int32_t compressed_size;
|
||||
u_int32_t xsum;
|
||||
};
|
||||
|
||||
static void
|
||||
sub_block_init(struct sub_block *sub_block) {
|
||||
sub_block->uncompressed_ptr = 0;
|
||||
sub_block->uncompressed_size = 0;
|
||||
|
||||
sub_block->compressed_ptr = 0;
|
||||
sub_block->compressed_size_bound = 0;
|
||||
sub_block->compressed_size = 0;
|
||||
|
||||
sub_block->xsum = 0;
|
||||
}
|
||||
|
||||
// get the size of the compression header
|
||||
static size_t
|
||||
sub_block_header_size(int n_sub_blocks) {
|
||||
return sizeof (u_int32_t) + n_sub_blocks * sizeof (struct stored_sub_block);
|
||||
}
|
||||
|
||||
// get the sum of the sub block compressed sizes
|
||||
static size_t
|
||||
get_sum_compressed_size_bound(int n_sub_blocks, struct sub_block sub_block[]) {
|
||||
size_t compressed_size_bound = 0;
|
||||
for (int i = 0; i < n_sub_blocks; i++) {
|
||||
sub_block[i].compressed_size_bound = compressBound(sub_block[i].uncompressed_size);
|
||||
compressed_size_bound += sub_block[i].compressed_size_bound;
|
||||
}
|
||||
return compressed_size_bound;
|
||||
}
|
||||
|
||||
// get the sum of the sub block uncompressed sizes
|
||||
static size_t
|
||||
get_sum_uncompressed_size(int n_sub_blocks, struct sub_block sub_block[]) {
|
||||
size_t uncompressed_size = 0;
|
||||
for (int i = 0; i < n_sub_blocks; i++)
|
||||
uncompressed_size += sub_block[i].uncompressed_size;
|
||||
return uncompressed_size;
|
||||
}
|
||||
|
||||
// round up n
|
||||
static inline int
|
||||
alignup32(int a, int b) {
|
||||
return ((a+b-1) / b) * b;
|
||||
}
|
||||
|
||||
static const int max_sub_blocks = 8;
|
||||
|
||||
// Choose n_sub_blocks and sub_block_size such that the product is >= total_size and the sub_block_size is at
|
||||
// least >= the target_sub_block_size.
|
||||
static void
|
||||
choose_sub_block_size(int total_size, int n_sub_blocks_limit, int *sub_block_size_ret, int *n_sub_blocks_ret) {
|
||||
|
||||
const int target_sub_block_size = 512*1024;
|
||||
const int alignment = 32;
|
||||
|
||||
int n_sub_blocks, sub_block_size;
|
||||
n_sub_blocks = total_size / target_sub_block_size;
|
||||
if (n_sub_blocks <= 1) {
|
||||
n_sub_blocks = 1;
|
||||
sub_block_size = total_size;
|
||||
} else {
|
||||
if (n_sub_blocks > n_sub_blocks_limit) // limit the number of sub-blocks
|
||||
n_sub_blocks = n_sub_blocks_limit;
|
||||
sub_block_size = alignup32(total_size / n_sub_blocks, alignment);
|
||||
while (sub_block_size * n_sub_blocks < total_size) // round up the sub-block size until big enough
|
||||
sub_block_size += alignment;
|
||||
}
|
||||
|
||||
*sub_block_size_ret = sub_block_size;
|
||||
*n_sub_blocks_ret = n_sub_blocks;
|
||||
}
|
||||
|
||||
static void
|
||||
set_all_sub_block_sizes(int total_size, int sub_block_size, int n_sub_blocks, struct sub_block sub_block[]) {
|
||||
int size_left = total_size;
|
||||
int i;
|
||||
for (i = 0; i < n_sub_blocks-1; i++) {
|
||||
sub_block[i].uncompressed_size = sub_block_size;
|
||||
size_left -= sub_block_size;
|
||||
}
|
||||
if (i == 0 || size_left > 0)
|
||||
sub_block[i].uncompressed_size = size_left;
|
||||
}
|
||||
|
||||
#include "workset.h"
|
||||
|
||||
struct compress_work {
|
||||
struct work base;
|
||||
struct sub_block *sub_block;
|
||||
};
|
||||
|
||||
static void
|
||||
compress_work_init(struct compress_work *w, struct sub_block *sub_block) {
|
||||
w->sub_block = sub_block;
|
||||
}
|
||||
|
||||
static void
|
||||
compress_sub_block(struct sub_block *sub_block) {
|
||||
// compress it
|
||||
Bytef *uncompressed_ptr = (Bytef *) sub_block->uncompressed_ptr;
|
||||
Bytef *compressed_ptr = (Bytef *) sub_block->compressed_ptr;
|
||||
uLongf uncompressed_len = sub_block->uncompressed_size;
|
||||
uLongf real_compressed_len = sub_block->compressed_size_bound;
|
||||
int compression_level = 5;
|
||||
|
||||
int r = compress2((Bytef*)compressed_ptr, &real_compressed_len,
|
||||
(Bytef*)uncompressed_ptr, uncompressed_len,
|
||||
compression_level);
|
||||
assert(r == Z_OK);
|
||||
sub_block->compressed_size = real_compressed_len; // replace the compressed size estimate with the real size
|
||||
|
||||
// checksum it
|
||||
sub_block->xsum = x1764_memory(sub_block->compressed_ptr, sub_block->compressed_size);
|
||||
}
|
||||
|
||||
static void *
|
||||
compress_worker(void *arg) {
|
||||
struct workset *ws = (struct workset *) arg;
|
||||
while (1) {
|
||||
struct compress_work *w = (struct compress_work *) workset_get(ws);
|
||||
if (w == NULL)
|
||||
break;
|
||||
compress_sub_block(w->sub_block);
|
||||
}
|
||||
return arg;
|
||||
}
|
||||
|
||||
static size_t
|
||||
compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *uncompressed_ptr, char *compressed_ptr) {
|
||||
char *compressed_base_ptr = compressed_ptr;
|
||||
size_t compressed_len;
|
||||
|
||||
if (n_sub_blocks == 1) {
|
||||
// single sub-block
|
||||
sub_block[0].uncompressed_ptr = uncompressed_ptr;
|
||||
sub_block[0].compressed_ptr = compressed_ptr;
|
||||
compress_sub_block(&sub_block[0]);
|
||||
compressed_len = sub_block[0].compressed_size;
|
||||
} else {
|
||||
// multiple sub-blocks
|
||||
int T = num_cores; // T = min(num_cores, n_sub_blocks) - 1
|
||||
if (T > n_sub_blocks)
|
||||
T = n_sub_blocks;
|
||||
if (T > 0)
|
||||
T = T - 1; // threads in addition to the running thread
|
||||
|
||||
struct workset ws;
|
||||
workset_init(&ws);
|
||||
|
||||
struct compress_work work[n_sub_blocks];
|
||||
workset_lock(&ws);
|
||||
for (int i = 0; i < n_sub_blocks; i++) {
|
||||
sub_block[i].uncompressed_ptr = uncompressed_ptr;
|
||||
sub_block[i].compressed_ptr = compressed_ptr;
|
||||
compress_work_init(&work[i], &sub_block[i]);
|
||||
workset_put_locked(&ws, &work[i].base);
|
||||
uncompressed_ptr += sub_block[i].uncompressed_size;
|
||||
compressed_ptr += sub_block[i].compressed_size_bound;
|
||||
}
|
||||
workset_unlock(&ws);
|
||||
|
||||
// compress the sub-blocks
|
||||
if (0) printf("%s:%d T=%d N=%d\n", __FUNCTION__, __LINE__, T, n_sub_blocks);
|
||||
toku_pthread_t tids[T];
|
||||
threadset_create(tids, &T, compress_worker, &ws);
|
||||
compress_worker(&ws);
|
||||
|
||||
// wait for all of the work to complete
|
||||
threadset_join(tids, T);
|
||||
|
||||
// squeeze out the holes not used by the compress bound
|
||||
compressed_ptr = compressed_base_ptr + sub_block[0].compressed_size;
|
||||
for (int i = 1; i < n_sub_blocks; i++) {
|
||||
memmove(compressed_ptr, sub_block[i].compressed_ptr, sub_block[i].compressed_size);
|
||||
compressed_ptr += sub_block[i].compressed_size;
|
||||
}
|
||||
|
||||
compressed_len = compressed_ptr - compressed_base_ptr;
|
||||
}
|
||||
return compressed_len;
|
||||
}
|
||||
#include "sub_block.h"
|
||||
|
||||
static void
|
||||
serialize_node_header(BRTNODE node, struct wbuf *wbuf) {
|
||||
|
@ -672,7 +476,7 @@ toku_serialize_brtnode_to_memory (BRTNODE node, int UU(n_workitems), int UU(n_th
|
|||
// compress all of the sub blocks
|
||||
char *uncompressed_ptr = buf + node_header_overhead;
|
||||
char *compressed_ptr = compressed_buf + header_len;
|
||||
compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr);
|
||||
compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr, num_cores);
|
||||
|
||||
//if (0) printf("Block %" PRId64 " Size before compressing %u, after compression %"PRIu64"\n", blocknum.b, calculated_size-node_header_overhead, (uint64_t) compressed_len);
|
||||
|
||||
|
@ -687,7 +491,7 @@ toku_serialize_brtnode_to_memory (BRTNODE node, int UU(n_workitems), int UU(n_th
|
|||
}
|
||||
|
||||
// compute the header checksum and serialize it
|
||||
uint32_t header_length = (void *)ptr - (void *) compressed_buf;
|
||||
uint32_t header_length = (char *)ptr - (char *)compressed_buf;
|
||||
uint32_t xsum = x1764_memory(compressed_buf, header_length);
|
||||
*ptr = toku_htod32(xsum);
|
||||
|
||||
|
@ -969,101 +773,6 @@ deserialize_brtnode_from_rbuf (BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *b
|
|||
return 0;
|
||||
}
|
||||
|
||||
#include "workset.h"
|
||||
|
||||
struct decompress_work {
|
||||
struct work base;
|
||||
void *compress_ptr;
|
||||
void *uncompress_ptr;
|
||||
u_int32_t compress_size;
|
||||
u_int32_t uncompress_size;
|
||||
u_int32_t xsum;
|
||||
int error;
|
||||
};
|
||||
|
||||
// initialize the decompression work
|
||||
static void
|
||||
decompress_work_init(struct decompress_work *dw,
|
||||
void *compress_ptr, u_int32_t compress_size,
|
||||
void *uncompress_ptr, u_int32_t uncompress_size,
|
||||
u_int32_t xsum) {
|
||||
dw->compress_ptr = compress_ptr;
|
||||
dw->compress_size = compress_size;
|
||||
dw->uncompress_ptr = uncompress_ptr;
|
||||
dw->uncompress_size = uncompress_size;
|
||||
dw->xsum = xsum;
|
||||
dw->error = 0;
|
||||
}
|
||||
|
||||
// decompress one block
|
||||
static int
|
||||
decompress_sub_block(void *compress_ptr, u_int32_t compress_size, void *uncompress_ptr, u_int32_t uncompress_size, u_int32_t expected_xsum) {
|
||||
// verify checksum
|
||||
u_int32_t xsum = x1764_memory(compress_ptr, compress_size);
|
||||
assert(xsum == expected_xsum);
|
||||
|
||||
// decompress
|
||||
uLongf destlen = uncompress_size;
|
||||
int r = uncompress(uncompress_ptr, &destlen, compress_ptr, compress_size);
|
||||
assert(destlen == uncompress_size);
|
||||
assert(r==Z_OK);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// decompress blocks until there is no more work to do
|
||||
static void *
|
||||
decompress_worker(void *arg) {
|
||||
struct workset *ws = (struct workset *) arg;
|
||||
while (1) {
|
||||
struct decompress_work *dw = (struct decompress_work *) workset_get(ws);
|
||||
if (dw == NULL)
|
||||
break;
|
||||
dw->error = decompress_sub_block(dw->compress_ptr, dw->compress_size, dw->uncompress_ptr, dw->uncompress_size, dw->xsum);
|
||||
}
|
||||
return arg;
|
||||
}
|
||||
|
||||
static void
|
||||
decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data) {
|
||||
if (n_sub_blocks == 1) {
|
||||
decompress_sub_block(compressed_data, sub_block[0].compressed_size, uncompressed_data, sub_block[0].uncompressed_size, sub_block[0].xsum);
|
||||
} else {
|
||||
// compute the number of additional threads needed for decompressing this node
|
||||
int T = num_cores; // T = min(#cores, #blocks) - 1
|
||||
if (T > n_sub_blocks)
|
||||
T = n_sub_blocks;
|
||||
if (T > 0)
|
||||
T = T - 1; // threads in addition to the running thread
|
||||
|
||||
// init the decompression work set
|
||||
struct workset ws;
|
||||
workset_init(&ws);
|
||||
|
||||
// initialize the decompression work and add to the work set
|
||||
struct decompress_work decompress_work[n_sub_blocks];
|
||||
workset_lock(&ws);
|
||||
for (int i = 0; i < n_sub_blocks; i++) {
|
||||
decompress_work_init(&decompress_work[i], compressed_data, sub_block[i].compressed_size, uncompressed_data, sub_block[i].uncompressed_size, sub_block[i].xsum);
|
||||
workset_put_locked(&ws, &decompress_work[i].base);
|
||||
|
||||
uncompressed_data += sub_block[i].uncompressed_size;
|
||||
compressed_data += sub_block[i].compressed_size;
|
||||
}
|
||||
workset_unlock(&ws);
|
||||
|
||||
// decompress the sub-blocks
|
||||
if (0) printf("%s:%d Cores=%d Blocks=%d T=%d\n", __FUNCTION__, __LINE__, num_cores, n_sub_blocks, T);
|
||||
toku_pthread_t tids[T];
|
||||
threadset_create(tids, &T, decompress_worker, &ws);
|
||||
decompress_worker(&ws);
|
||||
|
||||
// cleanup
|
||||
threadset_join(tids, T);
|
||||
workset_destroy(&ws);
|
||||
}
|
||||
}
|
||||
|
||||
static int
|
||||
decompress_brtnode_from_raw_block_into_rbuf(u_int8_t *raw_block, struct rbuf *rb, BLOCKNUM blocknum) {
|
||||
toku_trace("decompress");
|
||||
|
@ -1116,7 +825,7 @@ decompress_brtnode_from_raw_block_into_rbuf(u_int8_t *raw_block, struct rbuf *rb
|
|||
unsigned char *uncompressed_data = rb->buf + node_header_overhead;
|
||||
|
||||
// decompress all the compressed sub blocks into the uncompressed buffer
|
||||
decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data);
|
||||
decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, num_cores);
|
||||
|
||||
toku_trace("decompress done");
|
||||
|
||||
|
|
|
@ -284,6 +284,7 @@ dump_block(int f, BLOCKNUM blocknum, struct brt_header *h) {
|
|||
toku_free(vp);
|
||||
}
|
||||
|
||||
#if 0
|
||||
static void
|
||||
hex_dump(unsigned char *vp, u_int64_t offset, u_int64_t size) {
|
||||
u_int64_t i;
|
||||
|
@ -298,6 +299,39 @@ hex_dump(unsigned char *vp, u_int64_t offset, u_int64_t size) {
|
|||
}
|
||||
printf("\n");
|
||||
}
|
||||
#endif
|
||||
|
||||
static void
|
||||
hex_dump(unsigned char *vp, u_int64_t offset, u_int64_t size) {
|
||||
u_int64_t n = size / 32;
|
||||
for (u_int64_t i = 0; i < n; i++) {
|
||||
printf("%"PRIu64": ", offset);
|
||||
for (u_int64_t j = 0; j < 32; j++) {
|
||||
unsigned char c = vp[j];
|
||||
printf("%2.2X", c);
|
||||
if (((j+1) % 4) == 0)
|
||||
printf(" ");
|
||||
}
|
||||
for (u_int64_t j = 0; j < 32; j++) {
|
||||
unsigned char c = vp[j];
|
||||
printf("%c", isprint(c) ? c : ' ');
|
||||
}
|
||||
printf("\n");
|
||||
vp += 32;
|
||||
offset += 32;
|
||||
}
|
||||
size = size % 32;
|
||||
for (u_int64_t i=0; i<size; i++) {
|
||||
if ((i % 32) == 0)
|
||||
printf("%"PRIu64": ", offset+i);
|
||||
printf("%2.2X", vp[i]);
|
||||
if (((i+1) % 4) == 0)
|
||||
printf(" ");
|
||||
if (((i+1) % 32) == 0)
|
||||
printf("\n");
|
||||
}
|
||||
printf("\n");
|
||||
}
|
||||
|
||||
static void
|
||||
dump_file(int f, u_int64_t offset, u_int64_t size) {
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
#include "brtloader-internal.h"
|
||||
#include "brt-internal.h"
|
||||
#include "sub_block.h"
|
||||
|
||||
static size_t (*os_fwrite_fun)(const void *,size_t,size_t,FILE*)=NULL;
|
||||
void brtloader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)) {
|
||||
|
@ -1178,48 +1179,52 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
|
|||
+4 // layout version
|
||||
+4 // layout version original
|
||||
);
|
||||
int n_extra_bytes_for_compression = (+4 // n_sub blocks
|
||||
+4 // compressed size
|
||||
+4 // compressed size
|
||||
+4 // sub block checksum
|
||||
+4 // header checksum
|
||||
);
|
||||
int header_len = n_uncompressed_bytes_at_beginning + n_extra_bytes_for_compression;
|
||||
int compression_level = 5;
|
||||
int uncompressed_len = lbuf->dbuf.off - n_uncompressed_bytes_at_beginning;
|
||||
int bound = compressBound(uncompressed_len);
|
||||
unsigned char *MALLOC_N(header_len + bound, compressed_buf);
|
||||
uLongf real_compressed_len = bound;
|
||||
{
|
||||
int r = compress2((Bytef*)(compressed_buf + header_len), &real_compressed_len,
|
||||
(Bytef*)(lbuf->dbuf.buf + n_uncompressed_bytes_at_beginning), uncompressed_len,
|
||||
compression_level);
|
||||
assert(r==Z_OK);
|
||||
}
|
||||
|
||||
// checksum the sub block
|
||||
u_int32_t xsum0 = x1764_memory(compressed_buf + header_len, real_compressed_len);
|
||||
|
||||
// choose sub block size and number
|
||||
int sub_block_size, n_sub_blocks;
|
||||
choose_sub_block_size(uncompressed_len, max_sub_blocks, &sub_block_size, &n_sub_blocks);
|
||||
|
||||
int header_len = n_uncompressed_bytes_at_beginning + sub_block_header_size(n_sub_blocks) + sizeof (uint32_t);
|
||||
|
||||
// initialize the sub blocks
|
||||
struct sub_block sub_block[n_sub_blocks];
|
||||
for (int i = 0; i < n_sub_blocks; i++)
|
||||
sub_block_init(&sub_block[i]);
|
||||
set_all_sub_block_sizes(uncompressed_len, sub_block_size, n_sub_blocks, sub_block);
|
||||
|
||||
// allocate space for the compressed bufer
|
||||
int bound = get_sum_compressed_size_bound(n_sub_blocks, sub_block);
|
||||
unsigned char *MALLOC_N(header_len + bound, compressed_buf);
|
||||
|
||||
// compress and checksum the sub blocks
|
||||
int compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block,
|
||||
(char *) (lbuf->dbuf.buf + n_uncompressed_bytes_at_beginning),
|
||||
(char *) (compressed_buf + header_len), 2);
|
||||
|
||||
// cppy the uncompressed header to the compressed buffer
|
||||
memcpy(compressed_buf, lbuf->dbuf.buf, n_uncompressed_bytes_at_beginning);
|
||||
int compressed_len = real_compressed_len;
|
||||
int n_compressed_blocks = 1;
|
||||
memcpy(compressed_buf+16, &n_compressed_blocks, 4);
|
||||
memcpy(compressed_buf+20, &compressed_len, 4);
|
||||
memcpy(compressed_buf+24, &uncompressed_len, 4);
|
||||
memcpy(compressed_buf+28, &xsum0, 4);
|
||||
|
||||
// serialize the sub block header
|
||||
memcpy(compressed_buf+16, &n_sub_blocks, 4);
|
||||
for (int i = 0; i < n_sub_blocks; i++) {
|
||||
memcpy(compressed_buf+20+12*i+0, &sub_block[i].compressed_size, 4);
|
||||
memcpy(compressed_buf+20+12*i+4, &sub_block[i].uncompressed_size, 4);
|
||||
memcpy(compressed_buf+20+12*i+8, &sub_block[i].xsum, 4);
|
||||
}
|
||||
|
||||
// compute the header checksum and serialize it
|
||||
u_int32_t header_xsum = x1764_memory(compressed_buf, header_len - sizeof (u_int32_t));
|
||||
memcpy(compressed_buf+32, &header_xsum, 4);
|
||||
memcpy(compressed_buf + header_len - sizeof (u_int32_t), &header_xsum, 4);
|
||||
|
||||
//#ifndef CILK_STUB
|
||||
// ttable_and_write_lock->lock();
|
||||
//#endif
|
||||
long long off_of_leaf = out->current_off;
|
||||
int size = real_compressed_len + header_len;
|
||||
int size = header_len + compressed_len;
|
||||
if (0) {
|
||||
fprintf(stderr, "uncompressed buf size=%d (amount of data compressed)\n", uncompressed_len);
|
||||
fprintf(stderr, "compressed buf size=%lu, off=%lld\n", real_compressed_len, off_of_leaf);
|
||||
fprintf(stderr, "compressed buf size=%u, off=%lld\n", compressed_len, off_of_leaf);
|
||||
fprintf(stderr, "compressed bytes are:");
|
||||
//for (int i=0; i<compressed_len; i++) {
|
||||
// unsigned char c = compressed_buf[28+i];
|
||||
|
|
263
newbrt/sub_block.c
Normal file
263
newbrt/sub_block.c
Normal file
|
@ -0,0 +1,263 @@
|
|||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <zlib.h>
|
||||
|
||||
#include "toku_portability.h"
|
||||
#include "toku_assert.h"
|
||||
#include "x1764.h"
|
||||
#include "sub_block.h"
|
||||
|
||||
void
|
||||
sub_block_init(struct sub_block *sub_block) {
|
||||
sub_block->uncompressed_ptr = 0;
|
||||
sub_block->uncompressed_size = 0;
|
||||
|
||||
sub_block->compressed_ptr = 0;
|
||||
sub_block->compressed_size_bound = 0;
|
||||
sub_block->compressed_size = 0;
|
||||
|
||||
sub_block->xsum = 0;
|
||||
}
|
||||
|
||||
// get the size of the compression header
|
||||
size_t
|
||||
sub_block_header_size(int n_sub_blocks) {
|
||||
return sizeof (u_int32_t) + n_sub_blocks * sizeof (struct stored_sub_block);
|
||||
}
|
||||
|
||||
// get the sum of the sub block compressed sizes
|
||||
size_t
|
||||
get_sum_compressed_size_bound(int n_sub_blocks, struct sub_block sub_block[]) {
|
||||
size_t compressed_size_bound = 0;
|
||||
for (int i = 0; i < n_sub_blocks; i++) {
|
||||
sub_block[i].compressed_size_bound = compressBound(sub_block[i].uncompressed_size);
|
||||
compressed_size_bound += sub_block[i].compressed_size_bound;
|
||||
}
|
||||
return compressed_size_bound;
|
||||
}
|
||||
|
||||
// get the sum of the sub block uncompressed sizes
|
||||
size_t
|
||||
get_sum_uncompressed_size(int n_sub_blocks, struct sub_block sub_block[]) {
|
||||
size_t uncompressed_size = 0;
|
||||
for (int i = 0; i < n_sub_blocks; i++)
|
||||
uncompressed_size += sub_block[i].uncompressed_size;
|
||||
return uncompressed_size;
|
||||
}
|
||||
|
||||
// round up n
|
||||
static inline int
|
||||
alignup32(int a, int b) {
|
||||
return ((a+b-1) / b) * b;
|
||||
}
|
||||
|
||||
// Choose n_sub_blocks and sub_block_size such that the product is >= total_size and the sub_block_size is at
|
||||
// least >= the target_sub_block_size.
|
||||
void
|
||||
choose_sub_block_size(int total_size, int n_sub_blocks_limit, int *sub_block_size_ret, int *n_sub_blocks_ret) {
|
||||
const int alignment = 32;
|
||||
|
||||
int n_sub_blocks, sub_block_size;
|
||||
n_sub_blocks = total_size / target_sub_block_size;
|
||||
if (n_sub_blocks <= 1) {
|
||||
n_sub_blocks = 1;
|
||||
sub_block_size = total_size;
|
||||
} else {
|
||||
if (n_sub_blocks > n_sub_blocks_limit) // limit the number of sub-blocks
|
||||
n_sub_blocks = n_sub_blocks_limit;
|
||||
sub_block_size = alignup32(total_size / n_sub_blocks, alignment);
|
||||
while (sub_block_size * n_sub_blocks < total_size) // round up the sub-block size until big enough
|
||||
sub_block_size += alignment;
|
||||
}
|
||||
|
||||
*sub_block_size_ret = sub_block_size;
|
||||
*n_sub_blocks_ret = n_sub_blocks;
|
||||
}
|
||||
|
||||
void
|
||||
set_all_sub_block_sizes(int total_size, int sub_block_size, int n_sub_blocks, struct sub_block sub_block[]) {
|
||||
int size_left = total_size;
|
||||
int i;
|
||||
for (i = 0; i < n_sub_blocks-1; i++) {
|
||||
sub_block[i].uncompressed_size = sub_block_size;
|
||||
size_left -= sub_block_size;
|
||||
}
|
||||
if (i == 0 || size_left > 0)
|
||||
sub_block[i].uncompressed_size = size_left;
|
||||
}
|
||||
|
||||
#include "workset.h"
|
||||
|
||||
void
|
||||
compress_work_init(struct compress_work *w, struct sub_block *sub_block) {
|
||||
w->sub_block = sub_block;
|
||||
}
|
||||
|
||||
void
|
||||
compress_sub_block(struct sub_block *sub_block) {
|
||||
// compress it
|
||||
Bytef *uncompressed_ptr = (Bytef *) sub_block->uncompressed_ptr;
|
||||
Bytef *compressed_ptr = (Bytef *) sub_block->compressed_ptr;
|
||||
uLongf uncompressed_len = sub_block->uncompressed_size;
|
||||
uLongf real_compressed_len = sub_block->compressed_size_bound;
|
||||
int compression_level = 5;
|
||||
|
||||
int r = compress2((Bytef*)compressed_ptr, &real_compressed_len,
|
||||
(Bytef*)uncompressed_ptr, uncompressed_len,
|
||||
compression_level);
|
||||
assert(r == Z_OK);
|
||||
sub_block->compressed_size = real_compressed_len; // replace the compressed size estimate with the real size
|
||||
|
||||
// checksum it
|
||||
sub_block->xsum = x1764_memory(sub_block->compressed_ptr, sub_block->compressed_size);
|
||||
}
|
||||
|
||||
void *
|
||||
compress_worker(void *arg) {
|
||||
struct workset *ws = (struct workset *) arg;
|
||||
while (1) {
|
||||
struct compress_work *w = (struct compress_work *) workset_get(ws);
|
||||
if (w == NULL)
|
||||
break;
|
||||
compress_sub_block(w->sub_block);
|
||||
}
|
||||
return arg;
|
||||
}
|
||||
|
||||
size_t
|
||||
compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *uncompressed_ptr, char *compressed_ptr, int num_cores) {
|
||||
char *compressed_base_ptr = compressed_ptr;
|
||||
size_t compressed_len;
|
||||
|
||||
if (n_sub_blocks == 1) {
|
||||
// single sub-block
|
||||
sub_block[0].uncompressed_ptr = uncompressed_ptr;
|
||||
sub_block[0].compressed_ptr = compressed_ptr;
|
||||
compress_sub_block(&sub_block[0]);
|
||||
compressed_len = sub_block[0].compressed_size;
|
||||
} else {
|
||||
// multiple sub-blocks
|
||||
int T = num_cores; // T = min(num_cores, n_sub_blocks) - 1
|
||||
if (T > n_sub_blocks)
|
||||
T = n_sub_blocks;
|
||||
if (T > 0)
|
||||
T = T - 1; // threads in addition to the running thread
|
||||
|
||||
struct workset ws;
|
||||
workset_init(&ws);
|
||||
|
||||
struct compress_work work[n_sub_blocks];
|
||||
workset_lock(&ws);
|
||||
for (int i = 0; i < n_sub_blocks; i++) {
|
||||
sub_block[i].uncompressed_ptr = uncompressed_ptr;
|
||||
sub_block[i].compressed_ptr = compressed_ptr;
|
||||
compress_work_init(&work[i], &sub_block[i]);
|
||||
workset_put_locked(&ws, &work[i].base);
|
||||
uncompressed_ptr += sub_block[i].uncompressed_size;
|
||||
compressed_ptr += sub_block[i].compressed_size_bound;
|
||||
}
|
||||
workset_unlock(&ws);
|
||||
|
||||
// compress the sub-blocks
|
||||
if (0) printf("%s:%d T=%d N=%d\n", __FUNCTION__, __LINE__, T, n_sub_blocks);
|
||||
toku_pthread_t tids[T];
|
||||
threadset_create(tids, &T, compress_worker, &ws);
|
||||
compress_worker(&ws);
|
||||
|
||||
// wait for all of the work to complete
|
||||
threadset_join(tids, T);
|
||||
|
||||
// squeeze out the holes not used by the compress bound
|
||||
compressed_ptr = compressed_base_ptr + sub_block[0].compressed_size;
|
||||
for (int i = 1; i < n_sub_blocks; i++) {
|
||||
memmove(compressed_ptr, sub_block[i].compressed_ptr, sub_block[i].compressed_size);
|
||||
compressed_ptr += sub_block[i].compressed_size;
|
||||
}
|
||||
|
||||
compressed_len = compressed_ptr - compressed_base_ptr;
|
||||
}
|
||||
return compressed_len;
|
||||
}
|
||||
|
||||
// initialize the decompression work
|
||||
void
|
||||
decompress_work_init(struct decompress_work *dw,
|
||||
void *compress_ptr, u_int32_t compress_size,
|
||||
void *uncompress_ptr, u_int32_t uncompress_size,
|
||||
u_int32_t xsum) {
|
||||
dw->compress_ptr = compress_ptr;
|
||||
dw->compress_size = compress_size;
|
||||
dw->uncompress_ptr = uncompress_ptr;
|
||||
dw->uncompress_size = uncompress_size;
|
||||
dw->xsum = xsum;
|
||||
dw->error = 0;
|
||||
}
|
||||
|
||||
// decompress one block
|
||||
int
|
||||
decompress_sub_block(void *compress_ptr, u_int32_t compress_size, void *uncompress_ptr, u_int32_t uncompress_size, u_int32_t expected_xsum) {
|
||||
// verify checksum
|
||||
u_int32_t xsum = x1764_memory(compress_ptr, compress_size);
|
||||
assert(xsum == expected_xsum);
|
||||
|
||||
// decompress
|
||||
uLongf destlen = uncompress_size;
|
||||
int r = uncompress(uncompress_ptr, &destlen, compress_ptr, compress_size);
|
||||
assert(destlen == uncompress_size);
|
||||
assert(r==Z_OK);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// decompress blocks until there is no more work to do
|
||||
void *
|
||||
decompress_worker(void *arg) {
|
||||
struct workset *ws = (struct workset *) arg;
|
||||
while (1) {
|
||||
struct decompress_work *dw = (struct decompress_work *) workset_get(ws);
|
||||
if (dw == NULL)
|
||||
break;
|
||||
dw->error = decompress_sub_block(dw->compress_ptr, dw->compress_size, dw->uncompress_ptr, dw->uncompress_size, dw->xsum);
|
||||
}
|
||||
return arg;
|
||||
}
|
||||
|
||||
void
|
||||
decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data, int num_cores) {
|
||||
if (n_sub_blocks == 1) {
|
||||
decompress_sub_block(compressed_data, sub_block[0].compressed_size, uncompressed_data, sub_block[0].uncompressed_size, sub_block[0].xsum);
|
||||
} else {
|
||||
// compute the number of additional threads needed for decompressing this node
|
||||
int T = num_cores; // T = min(#cores, #blocks) - 1
|
||||
if (T > n_sub_blocks)
|
||||
T = n_sub_blocks;
|
||||
if (T > 0)
|
||||
T = T - 1; // threads in addition to the running thread
|
||||
|
||||
// init the decompression work set
|
||||
struct workset ws;
|
||||
workset_init(&ws);
|
||||
|
||||
// initialize the decompression work and add to the work set
|
||||
struct decompress_work decompress_work[n_sub_blocks];
|
||||
workset_lock(&ws);
|
||||
for (int i = 0; i < n_sub_blocks; i++) {
|
||||
decompress_work_init(&decompress_work[i], compressed_data, sub_block[i].compressed_size, uncompressed_data, sub_block[i].uncompressed_size, sub_block[i].xsum);
|
||||
workset_put_locked(&ws, &decompress_work[i].base);
|
||||
|
||||
uncompressed_data += sub_block[i].uncompressed_size;
|
||||
compressed_data += sub_block[i].compressed_size;
|
||||
}
|
||||
workset_unlock(&ws);
|
||||
|
||||
// decompress the sub-blocks
|
||||
if (0) printf("%s:%d Cores=%d Blocks=%d T=%d\n", __FUNCTION__, __LINE__, num_cores, n_sub_blocks, T);
|
||||
toku_pthread_t tids[T];
|
||||
threadset_create(tids, &T, decompress_worker, &ws);
|
||||
decompress_worker(&ws);
|
||||
|
||||
// cleanup
|
||||
threadset_join(tids, T);
|
||||
workset_destroy(&ws);
|
||||
}
|
||||
}
|
95
newbrt/sub_block.h
Normal file
95
newbrt/sub_block.h
Normal file
|
@ -0,0 +1,95 @@
|
|||
#ifndef TOKU_SUB_BLOCK_H
|
||||
#define TOKU_SUB_BLOCK_H
|
||||
|
||||
static const int max_sub_blocks = 8;
|
||||
static const int target_sub_block_size = 512*1024;
|
||||
|
||||
struct sub_block {
|
||||
void *uncompressed_ptr;
|
||||
u_int32_t uncompressed_size;
|
||||
|
||||
void *compressed_ptr;
|
||||
u_int32_t compressed_size; // real compressed size
|
||||
u_int32_t compressed_size_bound; // estimated compressed size
|
||||
|
||||
u_int32_t xsum; // sub block checksum
|
||||
};
|
||||
|
||||
struct stored_sub_block {
|
||||
u_int32_t uncompressed_size;
|
||||
u_int32_t compressed_size;
|
||||
u_int32_t xsum;
|
||||
};
|
||||
|
||||
void
|
||||
sub_block_init(struct sub_block *sub_block);
|
||||
|
||||
// get the size of the compression header
|
||||
size_t
|
||||
sub_block_header_size(int n_sub_blocks);
|
||||
|
||||
// get the sum of the sub block compressed sizes
|
||||
size_t
|
||||
get_sum_compressed_size_bound(int n_sub_blocks, struct sub_block sub_block[]);
|
||||
|
||||
// get the sum of the sub block uncompressed sizes
|
||||
size_t
|
||||
get_sum_uncompressed_size(int n_sub_blocks, struct sub_block sub_block[]);
|
||||
|
||||
// Choose n_sub_blocks and sub_block_size such that the product is >= total_size and the sub_block_size is at
|
||||
// least >= the target_sub_block_size.
|
||||
void
|
||||
choose_sub_block_size(int total_size, int n_sub_blocks_limit, int *sub_block_size_ret, int *n_sub_blocks_ret);
|
||||
|
||||
void
|
||||
set_all_sub_block_sizes(int total_size, int sub_block_size, int n_sub_blocks, struct sub_block sub_block[]);
|
||||
|
||||
#include "workset.h"
|
||||
|
||||
struct compress_work {
|
||||
struct work base;
|
||||
struct sub_block *sub_block;
|
||||
};
|
||||
|
||||
void
|
||||
compress_work_init(struct compress_work *w, struct sub_block *sub_block);
|
||||
|
||||
void
|
||||
compress_sub_block(struct sub_block *sub_block);
|
||||
|
||||
void *
|
||||
compress_worker(void *arg);
|
||||
|
||||
size_t
|
||||
compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *uncompressed_ptr, char *compressed_ptr, int num_cores);
|
||||
|
||||
struct decompress_work {
|
||||
struct work base;
|
||||
void *compress_ptr;
|
||||
void *uncompress_ptr;
|
||||
u_int32_t compress_size;
|
||||
u_int32_t uncompress_size;
|
||||
u_int32_t xsum;
|
||||
int error;
|
||||
};
|
||||
|
||||
// initialize the decompression work
|
||||
void
|
||||
decompress_work_init(struct decompress_work *dw,
|
||||
void *compress_ptr, u_int32_t compress_size,
|
||||
void *uncompress_ptr, u_int32_t uncompress_size,
|
||||
u_int32_t xsum);
|
||||
|
||||
// decompress one block
|
||||
int
|
||||
decompress_sub_block(void *compress_ptr, u_int32_t compress_size, void *uncompress_ptr, u_int32_t uncompress_size, u_int32_t expected_xsum);
|
||||
|
||||
// decompress blocks until there is no more work to do
|
||||
void *
|
||||
decompress_worker(void *arg);
|
||||
|
||||
void
|
||||
decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data, int num_cores);
|
||||
|
||||
|
||||
#endif
|
Loading…
Add table
Reference in a new issue