test large malloc failures in the brtloader writer. merge from tokudb.2603.extractor.tests. refs[t:2603]

git-svn-id: file:///svn/toku/tokudb@20276 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
Rich Prohaska 2013-04-16 23:59:12 -04:00 committed by Yoni Fogel
parent 0108e1d27a
commit 1abb93de95
3 changed files with 158 additions and 111 deletions

View file

@ -88,12 +88,27 @@ toku_brtloader_set_size_factor(uint32_t factor) {
}
static void add_big_buffer(struct file_info *file) {
if (file->buffer == NULL)
static int add_big_buffer(struct file_info *file) {
int result = 0;
BOOL newbuffer = FALSE;
if (file->buffer == NULL) {
file->buffer = toku_malloc(file->buffer_size);
if (file->buffer) {
int r = setvbuf(file->file, (char *) file->buffer, _IOFBF, file->buffer_size); assert(r == 0);
if (file->buffer == NULL)
result = errno;
else
newbuffer = TRUE;
}
if (result == 0) {
int r = setvbuf(file->file, (char *) file->buffer, _IOFBF, file->buffer_size);
if (r != 0) {
result = errno;
if (newbuffer) {
toku_free(file->buffer);
file->buffer = NULL;
}
}
}
return result;
}
static void cleanup_big_buffer(struct file_info *file) {
@ -143,11 +158,12 @@ void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error)
fi->file_infos = NULL;
}
static void open_file_add (struct file_infos *fi,
static int open_file_add (struct file_infos *fi,
FILE *file,
char *fname,
/* out */ FIDX *idx)
{
int result = 0;
int r = toku_pthread_mutex_lock(&fi->lock); assert(r==0);
if (fi->n_files >= fi->n_files_limit) {
fi->n_files_limit *=2;
@ -161,12 +177,15 @@ static void open_file_add (struct file_infos *fi,
fi->file_infos[fi->n_files].n_rows = 0;
fi->file_infos[fi->n_files].buffer_size = 1<<20;
fi->file_infos[fi->n_files].buffer = NULL;
add_big_buffer(&fi->file_infos[fi->n_files]);
idx->idx = fi->n_files;
fi->n_files++;
fi->n_files_extant++;
fi->n_files_open++;
result = add_big_buffer(&fi->file_infos[fi->n_files]);
if (result == 0) {
idx->idx = fi->n_files;
fi->n_files++;
fi->n_files_extant++;
fi->n_files_open++;
}
r = toku_pthread_mutex_unlock(&fi->lock); assert(r==0);
return result;
}
int brtloader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode) {
@ -224,17 +243,25 @@ int brtloader_open_temp_file (BRTLOADER bl, FIDX *file_idx)
* The open file will be saved in bl->file_infos so that even if errors happen we can free them all.
*/
{
int result = 0;
char *fname = toku_strdup(bl->temp_file_template);
int fd = mkstemp(fname);
if (fd<0) { int r = errno; toku_free(fname); return r; }
FILE *f = fdopen(fd, "r+");
if (f==NULL) { int r = errno; toku_free(fname); close(fd); return r; }
open_file_add(&bl->file_infos, f, fname, file_idx);
static int counter=0;
//fprintf(stderr, "%s:%d %d: %s\n", __FILE__, __LINE__, counter, fname);
counter++;
return 0;
if (fd < 0) {
result = errno;
} else {
FILE *f = fdopen(fd, "r+");
if (f == NULL)
result = errno;
else
result = open_file_add(&bl->file_infos, f, fname, file_idx);
}
if (result != 0) {
if (fd >= 0)
close(fd);
if (fname != NULL)
toku_free(fname);
}
return result;
}
static void brtloader_destroy (BRTLOADER bl, BOOL is_error) {
@ -2289,6 +2316,8 @@ static int write_literal(struct dbout *out, void*data, size_t len) {
CILK_BEGIN
static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progress_allocation, BRTLOADER bl) {
int result = 0;
//printf(" finishing leaf node progress=%d fin at %d\n", bl->progress, bl->progress+progress_allocation);
//printf("local_fingerprint=%8x\n", lbuf->local_fingerprint);
putbuf_int32_at(&lbuf->dbuf, lbuf->local_fingerprint_p, lbuf->local_fingerprint);
@ -2332,51 +2361,55 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
// allocate space for the compressed bufer
int bound = get_sum_compressed_size_bound(n_sub_blocks, sub_block);
unsigned char *MALLOC_N(header_len + bound, compressed_buf);
assert(compressed_buf); // LAZY
if (compressed_buf == NULL) {
result = errno;
} else {
// compress and checksum the sub blocks
int compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block,
(char *) (lbuf->dbuf.buf + n_uncompressed_bytes_at_beginning),
(char *) (compressed_buf + header_len), 1);
// compress and checksum the sub blocks
int compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block,
(char *) (lbuf->dbuf.buf + n_uncompressed_bytes_at_beginning),
(char *) (compressed_buf + header_len), 1);
// cppy the uncompressed header to the compressed buffer
memcpy(compressed_buf, lbuf->dbuf.buf, n_uncompressed_bytes_at_beginning);
// cppy the uncompressed header to the compressed buffer
memcpy(compressed_buf, lbuf->dbuf.buf, n_uncompressed_bytes_at_beginning);
// serialize the sub block header
memcpy(compressed_buf+16, &n_sub_blocks, 4);
for (int i = 0; i < n_sub_blocks; i++) {
memcpy(compressed_buf+20+12*i+0, &sub_block[i].compressed_size, 4);
memcpy(compressed_buf+20+12*i+4, &sub_block[i].uncompressed_size, 4);
memcpy(compressed_buf+20+12*i+8, &sub_block[i].xsum, 4);
}
// compute the header checksum and serialize it
u_int32_t header_xsum = x1764_memory(compressed_buf, header_len - sizeof (u_int32_t));
memcpy(compressed_buf + header_len - sizeof (u_int32_t), &header_xsum, 4);
// serialize the sub block header
memcpy(compressed_buf+16, &n_sub_blocks, 4);
for (int i = 0; i < n_sub_blocks; i++) {
memcpy(compressed_buf+20+12*i+0, &sub_block[i].compressed_size, 4);
memcpy(compressed_buf+20+12*i+4, &sub_block[i].uncompressed_size, 4);
memcpy(compressed_buf+20+12*i+8, &sub_block[i].xsum, 4);
}
// compute the header checksum and serialize it
u_int32_t header_xsum = x1764_memory(compressed_buf, header_len - sizeof (u_int32_t));
memcpy(compressed_buf + header_len - sizeof (u_int32_t), &header_xsum, 4);
dbout_lock(out);
long long off_of_leaf = out->current_off;
int size = header_len + compressed_len;
if (0) {
fprintf(stderr, "uncompressed buf size=%d (amount of data compressed)\n", uncompressed_len);
fprintf(stderr, "compressed buf size=%d, off=%lld\n", compressed_len, off_of_leaf);
fprintf(stderr, "compressed bytes are:");
//for (int i=0; i<compressed_len; i++) {
// unsigned char c = compressed_buf[28+i];
// if (isprint(c)) fprintf(stderr, "%c", c);
// else fprintf(stderr, "\\%03o", compressed_buf[28+i]);
//}
fprintf(stderr, "\ntotal bytes written = %d, last byte is \\%o\n", size, compressed_buf[size-1]);
}
dbout_lock(out);
long long off_of_leaf = out->current_off;
int size = header_len + compressed_len;
if (0) {
fprintf(stderr, "uncompressed buf size=%d (amount of data compressed)\n", uncompressed_len);
fprintf(stderr, "compressed buf size=%d, off=%lld\n", compressed_len, off_of_leaf);
fprintf(stderr, "compressed bytes are:");
//for (int i=0; i<compressed_len; i++) {
// unsigned char c = compressed_buf[28+i];
// if (isprint(c)) fprintf(stderr, "%c", c);
// else fprintf(stderr, "\\%03o", compressed_buf[28+i]);
//}
fprintf(stderr, "\ntotal bytes written = %d, last byte is \\%o\n", size, compressed_buf[size-1]);
}
int result = write_literal(out, compressed_buf, size);
if (result == 0) {
//printf("translation[%lld].off = %lld\n", lbuf->blocknum, off_of_leaf);
out->translation[lbuf->blocknum].off = off_of_leaf;
out->translation[lbuf->blocknum].size = size;
seek_align_locked(out);
result = write_literal(out, compressed_buf, size);
if (result == 0) {
//printf("translation[%lld].off = %lld\n", lbuf->blocknum, off_of_leaf);
out->translation[lbuf->blocknum].off = off_of_leaf;
out->translation[lbuf->blocknum].size = size;
seek_align_locked(out);
}
dbout_unlock(out);
}
dbout_unlock(out);
toku_free(sub_block); // RFP cilk++ bug
@ -2653,7 +2686,10 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
}
FIDX next_pivots_file;
brtloader_open_temp_file (bl, &next_pivots_file);
{
int r = brtloader_open_temp_file (bl, &next_pivots_file);
if (r != 0) { result = r; break; }
}
struct subtrees_info next_sts;
subtrees_info_init(&next_sts);

View file

@ -88,20 +88,24 @@ static void reset_my_malloc_counts(void) {
static void *my_malloc(size_t n) {
void *caller = __builtin_return_address(0);
if ((void*)toku_malloc <= caller && caller <= (void*)toku_xcalloc) {
my_malloc_count++;
if (n >= 64*1024) {
my_big_malloc_count++;
if (my_malloc_event) {
event_count++;
if (event_count == event_count_trigger) {
event_hit();
errno = ENOMEM;
return NULL;
}
if (!((void*)toku_malloc <= caller && caller <= (void*)toku_free))
goto skip;
my_malloc_count++;
if (n >= 64*1024) {
my_big_malloc_count++;
if (my_malloc_event) {
caller = __builtin_return_address(1);
if ((void*)toku_xmalloc <= caller && caller <= (void*)toku_malloc_report)
goto skip;
event_count++;
if (event_count == event_count_trigger) {
event_hit();
errno = ENOMEM;
return NULL;
}
}
}
skip:
return malloc(n);
}
@ -206,6 +210,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
brt_loader_set_poll_function(&bl.poll_callback, loader_poll_callback, NULL);
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2);
// if (!(expect_error ? r != 0 : r == 0)) printf("WARNING%%d expect_error=%d r=%d\n", __LINE__, expect_error, r);
assert(expect_error ? r != 0 : r == 0);
toku_set_func_malloc(NULL);
@ -217,14 +222,20 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
brt_loader_destroy_poll_callback(&bl.poll_callback);
r = queue_destroy(q2);
assert(r==0);
if (r != 0) printf("WARNING%d r=%d\n", __LINE__, r);
//assert(r==0);
destroy_merge_fileset(&fs);
brtloader_fi_destroy(&bl.file_infos, expect_error);
}
static int usage(const char *progname, int n) {
fprintf(stderr, "Usage:\n %s [-v] [-q] [-r %d] [-s] directory\n", progname, n);
fprintf(stderr, "Usage:\n %s [-v] [-q] [-r %d] [-s] [-m] directory\n", progname, n);
fprintf(stderr, "[-v] turn on verbose\n");
fprintf(stderr, "[-q] turn off verbose\n");
fprintf(stderr, "[-r %d] set the number of rows\n", n);
fprintf(stderr, "[-s] set the small loader size factor\n");
fprintf(stderr, "[-m] inject big malloc failures\n");
return 1;
}

View file

@ -26,15 +26,6 @@ void *toku_malloc(size_t size) {
return p;
}
void *
toku_xcalloc(size_t nmemb, size_t size)
{
size_t newsize = nmemb * size;
void *vp = toku_xmalloc(newsize);
if (vp) memset(vp, 0, newsize);
return vp;
}
void *
toku_calloc(size_t nmemb, size_t size)
{
@ -44,21 +35,6 @@ toku_calloc(size_t nmemb, size_t size)
return vp;
}
void *
toku_xmalloc(size_t size) {
void *r = toku_malloc(size);
if (r==0) abort();
return r;
}
void *
toku_xrealloc(void *v, size_t size)
{
void *r = toku_realloc(v, size);
if (r==0) abort();
return r;
}
void *
toku_tagmalloc(size_t size, enum typ_tag typtag)
{
@ -80,6 +56,20 @@ toku_realloc(void *p, size_t size)
return q;
}
void *
toku_memdup (const void *v, size_t len)
{
void *r=toku_malloc(len);
if (r) memcpy(r,v,len);
return r;
}
char *
toku_strdup (const char *s)
{
return toku_memdup(s, strlen(s)+1);
}
void
toku_free(void *p)
{
@ -95,6 +85,30 @@ toku_free_n(void* p, size_t size __attribute__((unused)))
toku_free(p);
}
void *
toku_xmalloc(size_t size) {
void *r = toku_malloc(size);
if (r==0) abort();
return r;
}
void *
toku_xcalloc(size_t nmemb, size_t size)
{
size_t newsize = nmemb * size;
void *vp = toku_xmalloc(newsize);
if (vp) memset(vp, 0, newsize);
return vp;
}
void *
toku_xrealloc(void *v, size_t size)
{
void *r = toku_realloc(v, size);
if (r==0) abort();
return r;
}
void *
toku_xmemdup (const void *v, size_t len)
{
@ -103,26 +117,12 @@ toku_xmemdup (const void *v, size_t len)
return r;
}
void *
toku_memdup (const void *v, size_t len)
{
void *r=toku_malloc(len);
if (r) memcpy(r,v,len);
return r;
}
char *
toku_xstrdup (const char *s)
{
return toku_xmemdup(s, strlen(s)+1);
}
char *
toku_strdup (const char *s)
{
return toku_memdup(s, strlen(s)+1);
}
void
toku_memory_check_all_free (void)
{