From f3dc3fdea27ea3df1c17642be88d903a78bbb287 Mon Sep 17 00:00:00 2001 From: Zardosht Kasheff Date: Wed, 17 Apr 2013 00:01:27 -0400 Subject: [PATCH] refs #5842, merge Direct I/O to main git-svn-id: file:///svn/toku/tokudb@52238 c7de825b-a66e-492c-adef-691d508d4ae1 --- buildheader/make_tdb.cc | 1 + cmake_modules/TokuFeatureDetection.cmake | 3 + cmake_modules/TokuSetupCompiler.cmake | 2 +- ft/block_allocator.cc | 6 +- ft/block_table.cc | 38 +-- ft/ft-ops.cc | 20 +- ft/ft-ops.h | 1 + ft/ft-serialize.cc | 103 ++++---- ft/ft_node-serialize.cc | 93 ++++--- ft/ftloader.cc | 23 +- ft/tests/block_allocator_test.cc | 31 +-- ft/tests/ftloader-test-merge-files-dbufio.cc | 4 +- ft/tests/ftloader-test-open.cc | 2 +- portability/CMakeLists.txt | 1 + portability/file.cc | 27 ++ portability/memory.cc | 69 +++++- portability/os_malloc.cc | 230 ++++++++++++++++++ .../tests/test-cache-line-boundary-fails.cc | 10 +- portability/tests/test-pwrite4g.cc | 7 +- src/export.map | 1 + src/ydb_env_func.cc | 4 + toku_include/config.h.in | 2 + toku_include/memory.h | 22 +- toku_include/toku_portability.h | 32 +++ 24 files changed, 594 insertions(+), 138 deletions(-) diff --git a/buildheader/make_tdb.cc b/buildheader/make_tdb.cc index 930f5906145..cac3188c224 100644 --- a/buildheader/make_tdb.cc +++ b/buildheader/make_tdb.cc @@ -693,6 +693,7 @@ int main (int argc, char *const argv[] __attribute__((__unused__))) { printf("int log_compare (const DB_LSN*, const DB_LSN *) %s;\n", VISIBLE); printf("int toku_set_trace_file (const char *fname) %s;\n", VISIBLE); printf("int toku_close_trace_file (void) %s;\n", VISIBLE); + printf("void db_env_set_direct_io (bool direct_io_on) %s;\n", VISIBLE); printf("void db_env_set_func_fsync (int (*)(int)) %s;\n", VISIBLE); printf("void db_env_set_func_free (void (*)(void*)) %s;\n", VISIBLE); printf("void db_env_set_func_malloc (void *(*)(size_t)) %s;\n", VISIBLE); diff --git a/cmake_modules/TokuFeatureDetection.cmake b/cmake_modules/TokuFeatureDetection.cmake index f233b88592e..4bdcf58e2ae 100644 --- a/cmake_modules/TokuFeatureDetection.cmake +++ b/cmake_modules/TokuFeatureDetection.cmake @@ -53,6 +53,9 @@ include(CheckSymbolExists) check_symbol_exists(M_MMAP_THRESHOLD "malloc.h" HAVE_M_MMAP_THRESHOLD) ## check whether we have CLOCK_REALTIME check_symbol_exists(CLOCK_REALTIME "time.h" HAVE_CLOCK_REALTIME) +## check how to do direct I/O +check_symbol_exists(O_DIRECT "fcntl.h" HAVE_O_DIRECT) +check_symbol_exists(F_NOCACHE "fcntl.h" HAVE_F_NOCACHE) include(CheckFunctionExists) diff --git a/cmake_modules/TokuSetupCompiler.cmake b/cmake_modules/TokuSetupCompiler.cmake index 5fc68ea26ad..86eb5f39be4 100644 --- a/cmake_modules/TokuSetupCompiler.cmake +++ b/cmake_modules/TokuSetupCompiler.cmake @@ -108,7 +108,7 @@ set_ldflags_if_supported( ## set extra debugging flags and preprocessor definitions set(CMAKE_C_FLAGS_DEBUG "-g3 -O0 ${CMAKE_C_FLAGS_DEBUG}") set(CMAKE_CXX_FLAGS_DEBUG "-g3 -O0 ${CMAKE_CXX_FLAGS_DEBUG}") -set_property(DIRECTORY APPEND PROPERTY COMPILE_DEFINITIONS_DEBUG _FORTIFY_SOURCE=2) +#set_property(DIRECTORY APPEND PROPERTY COMPILE_DEFINITIONS_DEBUG _FORTIFY_SOURCE=2) ## set extra release flags if (APPLE AND CMAKE_CXX_COMPILER_ID STREQUAL Clang) diff --git a/ft/block_allocator.cc b/ft/block_allocator.cc index 13405bee7ed..07fbfd6164a 100644 --- a/ft/block_allocator.cc +++ b/ft/block_allocator.cc @@ -58,6 +58,7 @@ block_allocator_print (BLOCK_ALLOCATOR ba) { void create_block_allocator (BLOCK_ALLOCATOR *ba, uint64_t reserve_at_beginning, uint64_t alignment) { + assert(alignment>=512 && 0==(alignment%512)); // the alignment must be at least 512 and aligned with 512 to make DIRECT_IO happy. BLOCK_ALLOCATOR XMALLOC(result); result->reserve_at_beginning = reserve_at_beginning; result->alignment = alignment; @@ -176,8 +177,9 @@ align (uint64_t value, BLOCK_ALLOCATOR ba) return ((value+ba->alignment-1)/ba->alignment)*ba->alignment; } -void -block_allocator_alloc_block (BLOCK_ALLOCATOR ba, uint64_t size, uint64_t *offset) { +void block_allocator_alloc_block(BLOCK_ALLOCATOR ba, uint64_t size, uint64_t *offset) +// Effect: Allocate a block. The resulting block must be aligned on the ba->alignment (which to make direct_io happy must be a positive multiple of 512). +{ invariant(size > 0); //Allocator does not support size 0 blocks. See block_allocator_free_block. grow_blocks_array(ba); ba->n_bytes_in_use += size; diff --git a/ft/block_table.cc b/ft/block_table.cc index 68980741ef6..783b1221d82 100644 --- a/ft/block_table.cc +++ b/ft/block_table.cc @@ -459,9 +459,11 @@ pair_is_unallocated(struct block_translation_pair *pair) { return pair->size == 0 && pair->u.diskoff == diskoff_unused; } -// Purpose of this function is to figure out where to put the inprogress btt on disk, allocate space for it there. -static void -blocknum_alloc_translation_on_disk_unlocked (BLOCK_TABLE bt) { +static void blocknum_alloc_translation_on_disk_unlocked(BLOCK_TABLE bt) +// Effect: figure out where to put the inprogress btt on disk, allocate space for it there. +// The space must be 512-byte aligned (both the starting address and the size). +// As a result, the allcoated space may be a little bit bigger (up to the next 512-byte boundary) than the actual btt. +{ toku_mutex_assert_locked(&bt->mutex); struct translation *t = &bt->inprogress; @@ -479,24 +481,29 @@ PRNTF("blokAllokator", 1L, size, offset, bt); t->block_translation[b.b].size = size; } -//Fills wbuf with bt -//A clean shutdown runs checkpoint start so that current and inprogress are copies. -void -toku_serialize_translation_to_wbuf(BLOCK_TABLE bt, int fd, struct wbuf *w, - int64_t *address, int64_t *size) { +void toku_serialize_translation_to_wbuf(BLOCK_TABLE bt, int fd, struct wbuf *w, + int64_t *address, int64_t *size) +// Effect: Fills wbuf (which starts uninitialized) with bt +// A clean shutdown runs checkpoint start so that current and inprogress are copies. +// The resulting wbuf buffer is guaranteed to be be 512-byte aligned and the total length is a multiple of 512 (so we pad with zeros at the end if needd) +// The address is guaranteed to be 512-byte aligned, but the size is not guaranteed. +// It *is* guaranteed that we can read up to the next 512-byte boundary, however +{ lock_for_blocktable(bt); struct translation *t = &bt->inprogress; BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_TRANSLATION); - blocknum_alloc_translation_on_disk_unlocked(bt); + blocknum_alloc_translation_on_disk_unlocked(bt); // The allocated block must be 512-byte aligned to make O_DIRECT happy. + uint64_t size_translation = calculate_size_on_disk(t); + uint64_t size_aligned = roundup_to_multiple(512, size_translation); + assert((int64_t)size_translation==t->block_translation[b.b].size); { //Init wbuf - uint64_t size_translation = calculate_size_on_disk(t); - assert((int64_t)size_translation==t->block_translation[b.b].size); if (0) printf("%s:%d writing translation table of size_translation %" PRIu64 " at %" PRId64 "\n", __FILE__, __LINE__, size_translation, t->block_translation[b.b].u.diskoff); - wbuf_init(w, toku_malloc(size_translation), size_translation); - assert(w->size==size_translation); + char *XMALLOC_N_ALIGNED(512, size_aligned, buf); + for (uint64_t i=size_translation; ismallest_never_used_blocknum); wbuf_BLOCKNUM(w, t->blocknum_freelist_head); @@ -510,9 +517,10 @@ toku_serialize_translation_to_wbuf(BLOCK_TABLE bt, int fd, struct wbuf *w, uint32_t checksum = x1764_finish(&w->checksum); wbuf_int(w, checksum); *address = t->block_translation[b.b].u.diskoff; - *size = t->block_translation[b.b].size; + *size = size_translation; + assert((*address)%512 == 0); - ensure_safe_write_unlocked(bt, fd, *size, *address); + ensure_safe_write_unlocked(bt, fd, size_aligned, *address); unlock_for_blocktable(bt); } diff --git a/ft/ft-ops.cc b/ft/ft-ops.cc index d360651682c..fa1c3720410 100644 --- a/ft/ft-ops.cc +++ b/ft/ft-ops.cc @@ -3397,6 +3397,20 @@ int toku_open_ft_handle (const char *fname, int is_create, FT_HANDLE *ft_handle_ return r; } +static bool use_direct_io = true; + +void toku_ft_set_direct_io (bool direct_io_on) { + use_direct_io = direct_io_on; +} + +static inline int ft_open_maybe_direct(const char *filename, int oflag, int mode) { + if (use_direct_io) { + return toku_os_open_direct(filename, oflag, mode); + } else { + return toku_os_open(filename, oflag, mode); + } +} + // open a file for use by the brt // Requires: File does not exist. static int ft_create_file(FT_HANDLE UU(brt), const char *fname, int *fdp) { @@ -3404,12 +3418,12 @@ static int ft_create_file(FT_HANDLE UU(brt), const char *fname, int *fdp) { int r; int fd; int er; - fd = open(fname, O_RDWR | O_BINARY, mode); + fd = ft_open_maybe_direct(fname, O_RDWR | O_BINARY, mode); assert(fd==-1); if ((er = get_maybe_error_errno()) != ENOENT) { return er; } - fd = open(fname, O_RDWR | O_CREAT | O_BINARY, mode); + fd = ft_open_maybe_direct(fname, O_RDWR | O_CREAT | O_BINARY, mode); if (fd==-1) { r = get_error_errno(); return r; @@ -3426,7 +3440,7 @@ static int ft_create_file(FT_HANDLE UU(brt), const char *fname, int *fdp) { static int ft_open_file(const char *fname, int *fdp) { mode_t mode = S_IRWXU|S_IRWXG|S_IRWXO; int fd; - fd = open(fname, O_RDWR | O_BINARY, mode); + fd = ft_open_maybe_direct(fname, O_RDWR | O_BINARY, mode); if (fd==-1) { return get_error_errno(); } diff --git a/ft/ft-ops.h b/ft/ft-ops.h index 26d00beaffa..d2349bd5f21 100644 --- a/ft/ft-ops.h +++ b/ft/ft-ops.h @@ -257,4 +257,5 @@ int toku_ft_strerror_r(int error, char *buf, size_t buflen); extern bool garbage_collection_debug; +void toku_ft_set_direct_io(bool direct_io_on); #endif diff --git a/ft/ft-serialize.cc b/ft/ft-serialize.cc index 2762fdee80b..bdae581321e 100644 --- a/ft/ft-serialize.cc +++ b/ft/ft-serialize.cc @@ -39,8 +39,11 @@ void toku_serialize_descriptor_contents_to_fd(int fd, const DESCRIPTOR desc, DISKOFF offset) { // make the checksum int64_t size = toku_serialize_descriptor_size(desc)+4; //4 for checksum + int64_t size_aligned = roundup_to_multiple(512, size); struct wbuf w; - wbuf_init(&w, toku_xmalloc(size), size); + char *XMALLOC_N_ALIGNED(512, size_aligned, aligned_buf); + for (int64_t i=size; i 0) { lazy_assert(size>=4); //4 for checksum { - XMALLOC_N(size, dbuf); + ssize_t size_to_malloc = roundup_to_multiple(512, size); + XMALLOC_N_ALIGNED(512, size_to_malloc, dbuf); { - ssize_t sz_read = toku_os_pread(fd, dbuf, size, offset); - lazy_assert(sz_read==size); + + ssize_t sz_read = toku_os_pread(fd, dbuf, size_to_malloc, offset); + lazy_assert(sz_read==size_to_malloc); } { // check the checksum @@ -118,9 +123,9 @@ exit: return r; } -// We only deserialize brt header once and then share everything with all the brts. -int -deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ftp, uint32_t version) +int deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ftp, uint32_t version) +// Effect: Deserialize the ft header. +// We deserialize brt header only once and then share everything with all the brts. { int r; FT ft = NULL; @@ -179,14 +184,16 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ftp, uint32_t version) //Load translation table { - unsigned char *XMALLOC_N(translation_size_on_disk, tbuf); + size_t size_to_read = roundup_to_multiple(512, translation_size_on_disk); + unsigned char *XMALLOC_N_ALIGNED(512, size_to_read, tbuf); { // This cast is messed up in 32-bits if the block translation // table is ever more than 4GB. But in that case, the // translation table itself won't fit in main memory. - ssize_t readsz = toku_os_pread(fd, tbuf, translation_size_on_disk, + ssize_t readsz = toku_os_pread(fd, tbuf, size_to_read, translation_address_on_disk); - lazy_assert(readsz == translation_size_on_disk); + assert(readsz >= translation_size_on_disk); + assert(readsz <= (ssize_t)size_to_read); } // Create table and read in data. r = toku_blocktable_create_from_buffer(fd, @@ -427,28 +434,30 @@ serialize_ft_min_size (uint32_t version) { return size; } -// Simply reading the raw bytes of the header into an rbuf is insensitive -// to disk format version. If that ever changes, then modify this. +int deserialize_ft_from_fd_into_rbuf(int fd, + toku_off_t offset_of_header, + struct rbuf *rb, + uint64_t *checkpoint_count, + LSN *checkpoint_lsn, + uint32_t * version_p) +// Effect: Read and parse the header of a fractalal tree // -// TOKUDB_DICTIONARY_NO_HEADER means we can overwrite everything in the -// file AND the header is useless -int -deserialize_ft_from_fd_into_rbuf(int fd, - toku_off_t offset_of_header, - struct rbuf *rb, - uint64_t *checkpoint_count, - LSN *checkpoint_lsn, - uint32_t * version_p) +// Simply reading the raw bytes of the header into an rbuf is insensitive +// to disk format version. If that ever changes, then modify this. +// +// TOKUDB_DICTIONARY_NO_HEADER means we can overwrite everything in the +// file AND the header is useless { int r = 0; const int64_t prefix_size = 8 + // magic ("tokudata") 4 + // version 4 + // build_id 4; // size - unsigned char prefix[prefix_size]; + const int64_t read_size = roundup_to_multiple(512, prefix_size); + unsigned char *XMALLOC_N_ALIGNED(512, read_size, prefix); rb->buf = NULL; - int64_t n = toku_os_pread(fd, prefix, prefix_size, offset_of_header); - if (n != prefix_size) { + int64_t n = toku_os_pread(fd, prefix, read_size, offset_of_header); + if (n != read_size) { if (n==0) { r = TOKUDB_DICTIONARY_NO_HEADER; } else if (n<0) { @@ -504,16 +513,21 @@ deserialize_ft_from_fd_into_rbuf(int fd, lazy_assert(rb->ndone==prefix_size); rb->size = size; - XMALLOC_N(rb->size, rb->buf); + { + toku_free(rb->buf); + uint32_t size_to_read = roundup_to_multiple(512, size); + XMALLOC_N_ALIGNED(512, size_to_read, rb->buf); - n = toku_os_pread(fd, rb->buf, rb->size, offset_of_header); - if (n != rb->size) { - if (n < 0) { - r = get_error_errno(); - } else { - r = EINVAL; //Header might be useless (wrong size) or could be a disk read error. + assert(offset_of_header%512==0); + n = toku_os_pread(fd, rb->buf, size_to_read, offset_of_header); + if (n != size_to_read) { + if (n < 0) { + r = get_error_errno(); + } else { + r = EINVAL; //Header might be useless (wrong size) or could be a disk read error. + } + goto exit; } - goto exit; } //It's version 14 or later. Magic looks OK. //We have an rbuf that represents the header. @@ -549,9 +563,7 @@ deserialize_ft_from_fd_into_rbuf(int fd, exit: if (r != 0 && rb->buf != NULL) { - if (rb->buf != prefix) { // don't free prefix, it's stack alloc'd - toku_free(rb->buf); - } + toku_free(rb->buf); rb->buf = NULL; } return r; @@ -718,16 +730,23 @@ void toku_serialize_ft_to (int fd, FT_HEADER h, BLOCK_TABLE blocktable, CACHEFIL toku_serialize_translation_to_wbuf(blocktable, fd, &w_translation, &address_translation, &size_translation); - lazy_assert(size_translation == w_translation.size); + assert(size_translation == w_translation.ndone); // the bytes written are the size + assert(w_translation.size % 512 == 0); // the number of bytes available in the buffer is 0 mod 512, and those last bytes are all initialized. struct wbuf w_main; - size_t size_main = toku_serialize_ft_size(h); - wbuf_init(&w_main, toku_xmalloc(size_main), size_main); + size_t size_main = toku_serialize_ft_size(h); + size_t size_main_aligned = roundup_to_multiple(512, size_main); + assert(size_main_alignedcheckpoint_count & 0x1) ? 0 : BLOCK_ALLOCATOR_HEADER_RESERVE; - toku_os_full_pwrite(fd, w_main.buf, w_main.ndone, main_offset); + toku_os_full_pwrite(fd, w_main.buf, size_main_aligned, main_offset); toku_free(w_main.buf); toku_free(w_translation.buf); } diff --git a/ft/ft_node-serialize.cc b/ft/ft_node-serialize.cc index 7fc39eb3fca..1567c42a660 100644 --- a/ft/ft_node-serialize.cc +++ b/ft/ft_node-serialize.cc @@ -134,7 +134,9 @@ toku_maybe_preallocate_in_file (int fd, int64_t size, int64_t expected_size, int to_write += alignup64(min64(file_size + to_write, FILE_CHANGE_INCREMENT), stripe_width); } if (to_write > 0) { - char *XCALLOC_N(to_write, wbuf); + assert(to_write%512==0); + char *XMALLOC_N_ALIGNED(512, to_write, wbuf); + memset(wbuf, 0, to_write); toku_off_t start_write = alignup64(file_size, stripe_width); invariant(start_write >= file_size); toku_os_full_pwrite(fd, wbuf, to_write, start_write); @@ -773,20 +775,21 @@ serialize_and_compress_sb_node_info(FTNODE node, struct sub_block *sb, st->compress_time += t2 - t1; } -// Writes out each child to a separate malloc'd buffer, then compresses -// all of them, and writes the uncompressed header, to bytes_to_write, -// which is malloc'd. +int toku_serialize_ftnode_to_memory(FTNODE node, + FTNODE_DISK_DATA* ndd, + unsigned int basementnodesize, + enum toku_compression_method compression_method, + bool do_rebalancing, + bool in_parallel, // for loader is true, for toku_ftnode_flush_callback, is false + /*out*/ size_t *n_bytes_to_write, + /*out*/ size_t *n_uncompressed_bytes, + /*out*/ char **bytes_to_write) +// Effect: Writes out each child to a separate malloc'd buffer, then compresses +// all of them, and writes the uncompressed header, to bytes_to_write, +// which is malloc'd. // -int -toku_serialize_ftnode_to_memory (FTNODE node, - FTNODE_DISK_DATA* ndd, - unsigned int basementnodesize, - enum toku_compression_method compression_method, - bool do_rebalancing, - bool in_parallel, // for loader is true, for toku_ftnode_flush_callback, is false - /*out*/ size_t *n_bytes_to_write, - /*out*/ size_t *n_uncompressed_bytes, - /*out*/ char **bytes_to_write) +// The resulting buffer is guaranteed to be 512-byte aligned and the total length is a multiple of 512 (so we pad with zeros at the end if needed). +// 512-byte padding is for O_DIRECT to work. { toku_assert_entire_node_in_memory(node); @@ -849,7 +852,9 @@ toku_serialize_ftnode_to_memory (FTNODE node, total_uncompressed_size += sb[i].uncompressed_size + 4; } - char *XMALLOC_N(total_node_size, data); + uint32_t total_buffer_size = roundup_to_multiple(512, total_node_size); // make the buffer be 512 bytes. + + char *XMALLOC_N_ALIGNED(512, total_buffer_size, data); char *curr_ptr = data; // now create the final serialized node @@ -874,9 +879,14 @@ toku_serialize_ftnode_to_memory (FTNODE node, *(uint32_t *)curr_ptr = toku_htod32(sb[i].xsum); curr_ptr += sizeof(sb[i].xsum); } + // Zero the rest of the buffer + for (uint32_t i=total_node_size; iblocktable, blocknum, &offset, &size); - uint8_t *XMALLOC_N(size, raw_block); + DISKOFF size_aligned = roundup_to_multiple(512, size); + uint8_t *XMALLOC_N_ALIGNED(512, size_aligned, raw_block); rbuf_init(rb, raw_block, size); // read the block - ssize_t rlen = toku_os_pread(fd, raw_block, size, offset); - lazy_assert((DISKOFF)rlen == size); + ssize_t rlen = toku_os_pread(fd, raw_block, size_aligned, offset); + assert((DISKOFF)rlen >= size); + assert((DISKOFF)rlen <= size_aligned); } static const int read_header_heuristic_max = 32*1024; @@ -1170,8 +1184,8 @@ static void read_ftnode_header_from_fd_into_rbuf_if_small_enough (int fd, BLOCKN { DISKOFF offset, size; toku_translate_blocknum_to_offset_size(ft->blocktable, blocknum, &offset, &size); - DISKOFF read_size = MIN(read_header_heuristic_max, size); - uint8_t *XMALLOC_N(size, raw_block); + DISKOFF read_size = roundup_to_multiple(512, MIN(read_header_heuristic_max, size)); + uint8_t *XMALLOC_N_ALIGNED(512, roundup_to_multiple(512, size), raw_block); rbuf_init(rb, raw_block, read_size); // read the block @@ -2418,14 +2432,20 @@ toku_deserialize_bp_from_disk(FTNODE node, FTNODE_DISK_DATA ndd, int childnum, i uint32_t curr_size = BP_SIZE (ndd, childnum); struct rbuf rb = {.buf = NULL, .size = 0, .ndone = 0}; - uint8_t *XMALLOC_N(curr_size, raw_block); - rbuf_init(&rb, raw_block, curr_size); + uint32_t pad_at_beginning = (node_offset+curr_offset)%512; + uint32_t padded_size = roundup_to_multiple(512, pad_at_beginning + curr_size); + uint8_t *XMALLOC_N_ALIGNED(512, padded_size, raw_block); + rbuf_init(&rb, pad_at_beginning+raw_block, curr_size); tokutime_t t0 = toku_time_now(); - // read - ssize_t rlen = toku_os_pread(fd, raw_block, curr_size, node_offset+curr_offset); - lazy_assert((DISKOFF)rlen == curr_size); + // read the block + assert(0==((unsigned long long)raw_block)%512); // for O_DIRECT + assert(0==(padded_size)%512); + assert(0==(node_offset+curr_offset-pad_at_beginning)%512); + ssize_t rlen = toku_os_pread(fd, raw_block, padded_size, node_offset+curr_offset-pad_at_beginning); + assert((DISKOFF)rlen >= pad_at_beginning + curr_size); // we read in at least enough to get what we wanted + assert((DISKOFF)rlen <= padded_size); // we didn't read in too much. tokutime_t t1 = toku_time_now(); @@ -2627,12 +2647,14 @@ serialize_uncompressed_block_to_memory(char * uncompressed_buf, struct sub_block sub_block[/*n_sub_blocks*/], enum toku_compression_method method, /*out*/ size_t *n_bytes_to_write, - /*out*/ char **bytes_to_write) { + /*out*/ char **bytes_to_write) +// Guarantees that the malloc'd BYTES_TO_WRITE is 512-byte aligned (so that O_DIRECT will work) +{ // allocate space for the compressed uncompressed_buf size_t compressed_len = get_sum_compressed_size_bound(n_sub_blocks, sub_block, method); size_t sub_block_header_len = sub_block_header_size(n_sub_blocks); size_t header_len = node_header_overhead + sub_block_header_len + sizeof (uint32_t); // node + sub_block + checksum - char *XMALLOC_N(header_len + compressed_len, compressed_buf); + char *XMALLOC_N_ALIGNED(512, roundup_to_multiple(512, header_len + compressed_len), compressed_buf); // copy the header memcpy(compressed_buf, uncompressed_buf, node_header_overhead); @@ -2662,7 +2684,12 @@ serialize_uncompressed_block_to_memory(char * uncompressed_buf, uint32_t xsum = x1764_memory(compressed_buf, header_length); *ptr = toku_htod32(xsum); - *n_bytes_to_write = header_len + compressed_len; + uint32_t padded_len = roundup_to_multiple(512, header_len + compressed_len); + // Zero out padding. + for (uint32_t i = header_len+compressed_len; i < padded_len; i++) { + compressed_buf[i] = 0; + } + *n_bytes_to_write = padded_len; *bytes_to_write = compressed_buf; } @@ -2933,11 +2960,13 @@ read_and_decompress_block_from_fd_into_rbuf(int fd, BLOCKNUM blocknum, int r = 0; if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b); - uint8_t *XMALLOC_N(size, raw_block); + DISKOFF size_aligned = roundup_to_multiple(512, size); + uint8_t *XMALLOC_N_ALIGNED(512, size, raw_block); { // read the (partially compressed) block - ssize_t rlen = toku_os_pread(fd, raw_block, size, offset); - lazy_assert((DISKOFF)rlen == size); + ssize_t rlen = toku_os_pread(fd, raw_block, size_aligned, offset); + lazy_assert((DISKOFF)rlen >= size); + lazy_assert((DISKOFF)rlen <= size_aligned); } // get the layout_version int layout_version; diff --git a/ft/ftloader.cc b/ft/ftloader.cc index dbc0c93b6e3..16c28a9371b 100644 --- a/ft/ftloader.cc +++ b/ft/ftloader.cc @@ -2102,6 +2102,7 @@ static void allocate_node (struct subtrees_info *sts, int64_t b) { sts->n_subtrees++; } +// dbuf will always contained 512-byte aligned buffer, but the length might not be a multiple of 512 bytes. If that's what you want, then pad it. struct dbuf { unsigned char *buf; int buflen; @@ -2225,7 +2226,7 @@ static void putbuf_bytes (struct dbuf *dbuf, const void *bytes, int nbytes) { int oldbuflen = dbuf->buflen; dbuf->buflen += dbuf->off + nbytes; dbuf->buflen *= 2; - REALLOC_N(dbuf->buflen, dbuf->buf); + REALLOC_N_ALIGNED(512, dbuf->buflen, dbuf->buf); if (dbuf->buf == NULL) { dbuf->error = get_error_errno(); dbuf->buf = oldbuf; @@ -2905,9 +2906,17 @@ static int write_translation_table (struct dbout *out, long long *off_of_transla } unsigned int checksum = x1764_memory(ttable.buf, ttable.off); putbuf_int32(&ttable, checksum); + // pad it to 512 zeros + long long encoded_length = ttable.off; + { + int nbytes_to_add = roundup_to_multiple(512, ttable.off) - encoded_length; + char zeros[nbytes_to_add]; + for (int i=0; ifd, ttable.buf, ttable.off, off_of_translation); } dbuf_destroy(&ttable); @@ -2919,18 +2928,22 @@ static int write_header (struct dbout *out, long long translation_location_on_disk, long long translation_size_on_disk) { int result = 0; size_t size = toku_serialize_ft_size(out->h->h); + size_t alloced_size = roundup_to_multiple(512, size); struct wbuf wbuf; - char *MALLOC_N(size, buf); + char *MALLOC_N_ALIGNED(512, alloced_size, buf); if (buf == NULL) { result = get_error_errno(); } else { wbuf_init(&wbuf, buf, size); out->h->h->on_disk_stats = out->h->in_memory_stats; toku_serialize_ft_to_wbuf(&wbuf, out->h->h, translation_location_on_disk, translation_size_on_disk); + for (size_t i=size; ifd, wbuf.buf, wbuf.ndone, 0); + else { + assert(wbuf.ndone <= alloced_size); + result = toku_os_pwrite(out->fd, wbuf.buf, alloced_size, 0); + } toku_free(buf); } return result; diff --git a/ft/tests/block_allocator_test.cc b/ft/tests/block_allocator_test.cc index cd2fd58f5f7..1dcfd731475 100644 --- a/ft/tests/block_allocator_test.cc +++ b/ft/tests/block_allocator_test.cc @@ -8,19 +8,22 @@ static void ba_alloc_at (BLOCK_ALLOCATOR ba, uint64_t size, uint64_t offset) { block_allocator_validate(ba); - block_allocator_alloc_block_at(ba, size, offset); + block_allocator_alloc_block_at(ba, size*512, offset*512); block_allocator_validate(ba); } static void ba_alloc (BLOCK_ALLOCATOR ba, uint64_t size, uint64_t *answer) { block_allocator_validate(ba); - block_allocator_alloc_block(ba, size, answer); + uint64_t actual_answer; + block_allocator_alloc_block(ba, 512*size, &actual_answer); block_allocator_validate(ba); + assert(actual_answer%512==0); + *answer = actual_answer/512; } static void ba_free (BLOCK_ALLOCATOR ba, uint64_t offset) { block_allocator_validate(ba); - block_allocator_free_block(ba, offset); + block_allocator_free_block(ba, offset*512); block_allocator_validate(ba); } @@ -30,8 +33,8 @@ ba_check_l (BLOCK_ALLOCATOR ba, uint64_t blocknum_in_layout_order, uint64_t expe uint64_t actual_offset, actual_size; int r = block_allocator_get_nth_block_in_layout_order(ba, blocknum_in_layout_order, &actual_offset, &actual_size); assert(r==0); - assert(expected_offset == actual_offset); - assert(expected_size == actual_size); + assert(expected_offset*512 == actual_offset); + assert(expected_size *512 == actual_size); } static void @@ -48,10 +51,10 @@ static void test_ba0 (void) { BLOCK_ALLOCATOR ba; uint64_t b0, b1; - create_block_allocator(&ba, 100, 1); - assert(block_allocator_allocated_limit(ba)==100); + create_block_allocator(&ba, 100*512, 1*512); + assert(block_allocator_allocated_limit(ba)==100*512); ba_alloc_at(ba, 50, 100); - assert(block_allocator_allocated_limit(ba)==150); + assert(block_allocator_allocated_limit(ba)==150*512); ba_alloc_at(ba, 25, 150); ba_alloc (ba, 10, &b0); ba_check_l (ba, 0, 0, 100); @@ -66,9 +69,9 @@ test_ba0 (void) { assert(b0==160); ba_alloc(ba, 10, &b0); ba_alloc(ba, 113, &b1); - assert(113==block_allocator_block_size(ba, b1)); - assert(10==block_allocator_block_size(ba, b0)); - assert(50==block_allocator_block_size(ba, 100)); + assert(113*512==block_allocator_block_size(ba, b1 *512)); + assert(10 *512==block_allocator_block_size(ba, b0 *512)); + assert(50 *512==block_allocator_block_size(ba, 100*512)); uint64_t b2, b3, b4, b5, b6, b7; ba_alloc(ba, 100, &b2); @@ -103,7 +106,7 @@ test_ba0 (void) { static void test_ba1 (int n_initial) { BLOCK_ALLOCATOR ba; - create_block_allocator(&ba, 0, 1); + create_block_allocator(&ba, 0*512, 1*512); int i; int n_blocks=0; uint64_t blocks[1000]; @@ -136,8 +139,8 @@ test_ba2 (void) BLOCK_ALLOCATOR ba; uint64_t b[6]; enum { BSIZE = 1024 }; - create_block_allocator(&ba, 100, BSIZE); - assert(block_allocator_allocated_limit(ba)==100); + create_block_allocator(&ba, 100*512, BSIZE*512); + assert(block_allocator_allocated_limit(ba)==100*512); ba_check_l (ba, 0, 0, 100); ba_check_none (ba, 1); diff --git a/ft/tests/ftloader-test-merge-files-dbufio.cc b/ft/tests/ftloader-test-merge-files-dbufio.cc index 3161fd805d1..1d495a0d110 100644 --- a/ft/tests/ftloader-test-merge-files-dbufio.cc +++ b/ft/tests/ftloader-test-merge-files-dbufio.cc @@ -188,7 +188,7 @@ static void *my_malloc(size_t n) { } } } - return malloc(n); + return os_malloc(n); } static int do_realloc_errors = 1; @@ -207,7 +207,7 @@ static void *my_realloc(void *p, size_t n) { } } } - return realloc(p, n); + return os_realloc(p, n); } diff --git a/ft/tests/ftloader-test-open.cc b/ft/tests/ftloader-test-open.cc index c1bb6a2d061..75a4c5eb00c 100644 --- a/ft/tests/ftloader-test-open.cc +++ b/ft/tests/ftloader-test-open.cc @@ -28,7 +28,7 @@ static void *my_malloc(size_t n) { errno = ENOSPC; return NULL; } else - return malloc(n); + return os_malloc(n); } static int my_compare(DB *UU(desc), const DBT *UU(akey), const DBT *UU(bkey)) { diff --git a/portability/CMakeLists.txt b/portability/CMakeLists.txt index 043fcd313c9..0e0b376def7 100644 --- a/portability/CMakeLists.txt +++ b/portability/CMakeLists.txt @@ -17,6 +17,7 @@ target_link_libraries(${LIBTOKUPORTABILITY} LINK_PUBLIC ${CMAKE_THREAD_LIBS_INIT add_library(tokuportability_static_conv STATIC ${tokuportability_srcs}) set_target_properties(tokuportability_static_conv PROPERTIES POSITION_INDEPENDENT_CODE ON) +add_dependencies(tokuportability_static_conv build_jemalloc) set(tokuportability_source_libs tokuportability_static_conv jemalloc ${CMAKE_THREAD_LIBS_INIT} ${EXTRA_SYSTEM_LIBS}) merge_static_libs(${LIBTOKUPORTABILITY}_static ${LIBTOKUPORTABILITY}_static "${tokuportability_source_libs}") diff --git a/portability/file.cc b/portability/file.cc index 4fd8892b71f..8a8ee51eba1 100644 --- a/portability/file.cc +++ b/portability/file.cc @@ -196,6 +196,8 @@ toku_os_write (int fd, const void *buf, size_t len) { void toku_os_full_pwrite (int fd, const void *buf, size_t len, toku_off_t off) { + assert(0==((long long)buf)%512); + assert((len%512 == 0) && (off%512)==0); // to make pwrite work. const char *bp = (const char *) buf; while (len > 0) { ssize_t r; @@ -218,6 +220,9 @@ toku_os_full_pwrite (int fd, const void *buf, size_t len, toku_off_t off) { ssize_t toku_os_pwrite (int fd, const void *buf, size_t len, toku_off_t off) { + assert(0==((long long)buf)%512); // these asserts are to ensure that direct I/O will work. + assert(0==len %512); + assert(0==off %512); const char *bp = (const char *) buf; ssize_t result = 0; while (len > 0) { @@ -269,6 +274,25 @@ toku_os_open(const char *path, int oflag, int mode) { return rval; } +int +toku_os_open_direct(const char *path, int oflag, int mode) { + int rval; +#if defined(HAVE_O_DIRECT) + rval = toku_os_open(path, oflag | O_DIRECT, mode); +#elif defined(HAVE_F_NOCACHE) + rval = toku_os_open(path, oflag, mode); + if (rval >= 0) { + int r = fcntl(rval, F_NOCACHE, 1); + if (r == -1) { + perror("setting F_NOCACHE"); + } + } +#else +# error "No direct I/O implementation found." +#endif + return rval; +} + int toku_os_fclose(FILE * stream) { int rval = -1; @@ -310,6 +334,9 @@ toku_os_read(int fd, void *buf, size_t count) { ssize_t toku_os_pread (int fd, void *buf, size_t count, off_t offset) { + assert(0==((long long)buf)%512); + assert(0==count%512); + assert(0==offset%512); ssize_t r; if (t_pread) { r = t_pread(fd, buf, count, offset); diff --git a/portability/memory.cc b/portability/memory.cc index 787a6a79085..0fdef30173e 100644 --- a/portability/memory.cc +++ b/portability/memory.cc @@ -21,17 +21,17 @@ #include static malloc_fun_t t_malloc = 0; +static malloc_aligned_fun_t t_malloc_aligned = 0; static malloc_fun_t t_xmalloc = 0; +static malloc_aligned_fun_t t_xmalloc_aligned = 0; static free_fun_t t_free = 0; static realloc_fun_t t_realloc = 0; +static realloc_aligned_fun_t t_realloc_aligned = 0; static realloc_fun_t t_xrealloc = 0; static LOCAL_MEMORY_STATUS_S status; int toku_memory_do_stats = 0; -typedef size_t (*malloc_usable_size_fun_t)(const void *); -static malloc_usable_size_fun_t malloc_usable_size_f; - static bool memory_startup_complete; int @@ -76,14 +76,6 @@ toku_memory_startup(void) { } } - malloc_usable_size_f = (malloc_usable_size_fun_t) dlsym(RTLD_DEFAULT, "malloc_usable_size"); - if (!malloc_usable_size_f) { - malloc_usable_size_f = (malloc_usable_size_fun_t) dlsym(RTLD_DEFAULT, "malloc_size"); // darwin - if (!malloc_usable_size_f) { - result = EINVAL; // couldn't find a malloc size function - } - } - return result; } @@ -105,7 +97,7 @@ toku_memory_get_status(LOCAL_MEMORY_STATUS s) { // jemalloc's malloc_usable_size does not work with a NULL pointer, so we implement a version that works static size_t my_malloc_usable_size(void *p) { - return p == NULL ? 0 : malloc_usable_size_f(p); + return p == NULL ? 0 : os_malloc_usable_size(p); } // Note that max_in_use may be slightly off because use of max_in_use is not thread-safe. @@ -162,6 +154,23 @@ toku_malloc(size_t size) { return p; } +void *toku_malloc_aligned(size_t alignment, size_t size) { + void *p = t_malloc_aligned ? t_malloc_aligned(alignment, size) : os_malloc_aligned(alignment, size); + if (p) { + TOKU_ANNOTATE_NEW_MEMORY(p, size); // see #4671 and https://bugs.kde.org/show_bug.cgi?id=297147 + if (toku_memory_do_stats) { + size_t used = my_malloc_usable_size(p); + toku_sync_add_and_fetch(&status.malloc_count, 1); + toku_sync_add_and_fetch(&status.requested,size); + toku_sync_add_and_fetch(&status.used, used); + set_max(status.used, status.freed); + } + } else { + toku_sync_add_and_fetch(&status.malloc_fail, 1); + } + return p; +} + void * toku_calloc(size_t nmemb, size_t size) { size_t newsize = nmemb * size; @@ -189,6 +198,25 @@ toku_realloc(void *p, size_t size) { return q; } +void *toku_realloc_aligned(size_t alignment, void *p, size_t size) { + size_t used_orig = p ? my_malloc_usable_size(p) : 0; + void *q = t_realloc_aligned ? t_realloc_aligned(alignment, p, size) : os_realloc_aligned(alignment, p, size); + if (q) { + if (toku_memory_do_stats) { + size_t used = my_malloc_usable_size(q); + toku_sync_add_and_fetch(&status.realloc_count, 1); + toku_sync_add_and_fetch(&status.requested, size); + toku_sync_add_and_fetch(&status.used, used); + toku_sync_add_and_fetch(&status.freed, used_orig); + set_max(status.used, status.freed); + } + } else { + toku_sync_add_and_fetch(&status.realloc_fail, 1); + } + return q; +} + + void * toku_memdup(const void *v, size_t len) { void *p = toku_malloc(len); @@ -232,6 +260,23 @@ toku_xmalloc(size_t size) { return p; } +void* toku_xmalloc_aligned(size_t alignment, size_t size) +// Effect: Perform a malloc(size) with the additional property that the returned pointer is a multiple of ALIGNMENT. +// Fail with a resource_assert if the allocation fails (don't return an error code). +// Requires: alignment is a power of two. +{ + void *p = t_xmalloc_aligned ? t_xmalloc_aligned(alignment, size) : os_malloc_aligned(alignment,size); + resource_assert(p); + if (toku_memory_do_stats) { + size_t used = my_malloc_usable_size(p); + toku_sync_add_and_fetch(&status.malloc_count, 1); + toku_sync_add_and_fetch(&status.requested, size); + toku_sync_add_and_fetch(&status.used, used); + set_max(status.used, status.freed); + } + return p; +} + void * toku_xcalloc(size_t nmemb, size_t size) { size_t newsize = nmemb * size; diff --git a/portability/os_malloc.cc b/portability/os_malloc.cc index 4018651094f..46903b99277 100644 --- a/portability/os_malloc.cc +++ b/portability/os_malloc.cc @@ -8,11 +8,174 @@ #include #include +#include #if defined(HAVE_MALLOC_H) # include #elif defined(HAVE_SYS_MALLOC_H) # include #endif +#include + +#include + +// #define this to use a version of os_malloc that helps to debug certain features. +// This version uses the real malloc (so that valgrind should still work) but it forces things to be slightly +// misaligned (in particular, avoiding 512-byte alignment if possible, to find situations where O_DIRECT will fail. +// #define USE_DEBUGGING_MALLOCS + +#ifdef USE_DEBUGGING_MALLOCS +#include + +// Make things misaligned on 512-byte boundaries +static size_t malloced_now_count=0, malloced_now_size=0; +struct malloc_pair { + void *returned_pointer; + void *true_pointer; + size_t requested_size = 0; +}; +static struct malloc_pair *malloced_now; +static pthread_mutex_t malloc_mutex = PTHREAD_MUTEX_INITIALIZER; + +static void malloc_lock(void) { + int r = pthread_mutex_lock(&malloc_mutex); + assert(r==0); +} +static void malloc_unlock(void) { + int r = pthread_mutex_unlock(&malloc_mutex); + assert(r==0); +} + +static void push_to_malloced_memory(void *returned_pointer, void *true_pointer, size_t requested_size) { + malloc_lock(); + if (malloced_now_count == malloced_now_size) { + malloced_now_size = 2*malloced_now_size + 1; + malloced_now = (struct malloc_pair *)realloc(malloced_now, malloced_now_size * sizeof(*malloced_now)); + } + malloced_now[malloced_now_count].returned_pointer = returned_pointer; + malloced_now[malloced_now_count].true_pointer = true_pointer; + malloced_now[malloced_now_count].requested_size = requested_size; + malloced_now_count++; + malloc_unlock(); +} + +static struct malloc_pair *find_malloced_pair(const void *p) +// Requires: Lock must be held before calling. +{ + for (size_t i=0; irequested_size)); + malloc_unlock(); + os_free(p); + return result; + } +} + + +void os_free(void* p) { + malloc_lock(); + struct malloc_pair *mp = find_malloced_pair(p); + assert(mp); + free(mp->true_pointer); + *mp = malloced_now[--malloced_now_count]; + malloc_unlock(); +} + +size_t os_malloc_usable_size(const void *p) { + malloc_lock(); + struct malloc_pair *mp = find_malloced_pair(p); + assert(mp); + size_t size = mp->requested_size; + malloc_unlock(); + return size; +} + +#else void * os_malloc(size_t size) @@ -20,14 +183,81 @@ os_malloc(size_t size) return malloc(size); } +void *os_malloc_aligned(size_t alignment, size_t size) +// Effect: Perform a malloc(size) with the additional property that the returned pointer is a multiple of ALIGNMENT. +// Requires: alignment is a power of two. +{ + void *p; + int r = posix_memalign(&p, alignment, size); + if (r != 0) { + errno = r; + p = nullptr; + } + return p; +} + void * os_realloc(void *p, size_t size) { return realloc(p, size); } +void * os_realloc_aligned(size_t alignment, void *p, size_t size) +// Effect: Perform a realloc(p, size) with the additional property that the returned pointer is a multiple of ALIGNMENT. +// Requires: alignment is a power of two. +{ +#if 1 + if (p==NULL) { + return os_malloc_aligned(alignment, size); + } else { + void *newp = realloc(p, size); + if (0!=((long long)newp%alignment)) { + // it's not aligned, so align it ourselves. + void *newp2 = os_malloc_aligned(alignment, size); + memcpy(newp2, newp, size); + free(newp); + newp = newp2; + } + return newp; + } +#else + // THIS STUFF SEEMS TO FAIL VALGRIND + if (p==NULL) { + return os_malloc_aligned(alignment, size); + } else { + size_t ignore; + int r = rallocm(&p, // returned pointer + &ignore, // actual size of returned object. + size, // the size we want + 0, // extra bytes to "try" to allocate at the end + ALLOCM_ALIGN(alignment)); + if (r!=0) return NULL; + else return p; + } +#endif +} + + void os_free(void* p) { free(p); } + +typedef size_t (*malloc_usable_size_fun_t)(const void *); +static malloc_usable_size_fun_t malloc_usable_size_f = NULL; + +size_t os_malloc_usable_size(const void *p) { + if (p==NULL) return 0; + if (!malloc_usable_size_f) { + malloc_usable_size_f = (malloc_usable_size_fun_t) dlsym(RTLD_DEFAULT, "malloc_usable_size"); + if (!malloc_usable_size_f) { + malloc_usable_size_f = (malloc_usable_size_fun_t) dlsym(RTLD_DEFAULT, "malloc_size"); // darwin + if (!malloc_usable_size_f) { + abort(); // couldn't find a malloc size function + } + } + } + return malloc_usable_size_f(p); +} +#endif diff --git a/portability/tests/test-cache-line-boundary-fails.cc b/portability/tests/test-cache-line-boundary-fails.cc index 50f3d59f017..225db75a042 100644 --- a/portability/tests/test-cache-line-boundary-fails.cc +++ b/portability/tests/test-cache-line-boundary-fails.cc @@ -50,9 +50,8 @@ int test_main(int UU(argc), char *const argv[] UU()) { } { - struct unpackedsevenbytestruct *usevenbytestructs; - int r = posix_memalign((void **) &usevenbytestructs, cachelinesize, sizeof(unpackedsevenbytestruct) * 10); - if (r) { + struct unpackedsevenbytestruct *MALLOC_N_ALIGNED(cachelinesize, 10, usevenbytestructs); + if (usevenbytestructs == NULL) { // this test is supposed to crash, so exiting cleanly is a failure perror("posix_memalign"); exit(EXIT_FAILURE); @@ -65,8 +64,9 @@ int test_main(int UU(argc), char *const argv[] UU()) { toku_free(usevenbytestructs); } - int r = posix_memalign((void **) &psevenbytestructs, cachelinesize, sizeof(packedsevenbytestruct) * 10); - if (r) { + + MALLOC_N_ALIGNED(cachelinesize, 10, psevenbytestructs); + if (psevenbytestructs == NULL) { // this test is supposed to crash, so exiting cleanly is a failure perror("posix_memalign"); exit(EXIT_FAILURE); diff --git a/portability/tests/test-pwrite4g.cc b/portability/tests/test-pwrite4g.cc index 28a5ee167bf..96a21d022dd 100644 --- a/portability/tests/test-pwrite4g.cc +++ b/portability/tests/test-pwrite4g.cc @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -28,8 +29,9 @@ int test_main(int argc, char *const argv[]) { unlink(fname); int fd = open(fname, O_RDWR | O_CREAT | O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO); assert(fd>=0); - char buf[] = "hello"; - int64_t offset = (1LL<<32) + 100; + char *XMALLOC_N_ALIGNED(512, 512, buf); + strcpy(buf, "hello"); + int64_t offset = (1LL<<32) + 512; toku_os_full_pwrite(fd, buf, sizeof buf, offset); char newbuf[sizeof buf]; r = pread(fd, newbuf, sizeof newbuf, 100); @@ -42,6 +44,7 @@ int test_main(int argc, char *const argv[]) { r = toku_os_get_file_size(fd, &fsize); assert(r == 0); assert(fsize > 100 + (signed)sizeof(buf)); + toku_free(buf); r = close(fd); assert(r==0); return 0; diff --git a/src/export.map b/src/export.map index 270178dcd30..519c7922c2a 100644 --- a/src/export.map +++ b/src/export.map @@ -4,6 +4,7 @@ db_env_create; db_strerror; db_version; + db_env_set_direct_io; db_env_set_func_fsync; db_env_set_func_malloc; db_env_set_func_realloc; diff --git a/src/ydb_env_func.cc b/src/ydb_env_func.cc index f2bbf17568a..9b053d66ee6 100644 --- a/src/ydb_env_func.cc +++ b/src/ydb_env_func.cc @@ -27,6 +27,10 @@ void * checkpoint_callback2_extra = NULL; bool engine_status_enable = true; // if false, suppress engine status output on failed assert, for test programs only +void db_env_set_direct_io (bool direct_io_on) { + toku_ft_set_direct_io(direct_io_on); +} + void db_env_set_func_fsync (int (*fsync_function)(int)) { toku_set_func_fsync(fsync_function); } diff --git a/toku_include/config.h.in b/toku_include/config.h.in index d5511e64cb9..c56674ece9e 100644 --- a/toku_include/config.h.in +++ b/toku_include/config.h.in @@ -44,6 +44,8 @@ #cmakedefine HAVE_M_MMAP_THRESHOLD 1 #cmakedefine HAVE_CLOCK_REALTIME 1 +#cmakedefine HAVE_O_DIRECT 1 +#cmakedefine HAVE_F_NOCACHE 1 #cmakedefine HAVE_MALLOC_SIZE 1 #cmakedefine HAVE_MALLOC_USABLE_SIZE 1 diff --git a/toku_include/memory.h b/toku_include/memory.h index 53f2ec47de8..b7bfe3b833d 100644 --- a/toku_include/memory.h +++ b/toku_include/memory.h @@ -18,17 +18,28 @@ void toku_memory_shutdown(void) __attribute__((destructor)); /* Generally: errno is set to 0 or a value to indicate problems. */ -/* Everything should call toku_malloc() instead of malloc(), and toku_calloc() instead of calloc() */ +// Everything should call toku_malloc() instead of malloc(), and toku_calloc() instead of calloc() +// That way the tests can can, e.g., replace the malloc function using toku_set_func_malloc(). void *toku_calloc(size_t nmemb, size_t size) __attribute__((__visibility__("default"))); void *toku_xcalloc(size_t nmemb, size_t size) __attribute__((__visibility__("default"))); void *toku_malloc(size_t size) __attribute__((__visibility__("default"))); +void *toku_malloc_aligned(size_t alignment, size_t size) __attribute__((__visibility__("default"))); // xmalloc aborts instead of return NULL if we run out of memory -void *toku_xmalloc(size_t size); +void *toku_xmalloc(size_t size) __attribute__((__visibility__("default"))); void *toku_xrealloc(void*, size_t size) __attribute__((__visibility__("default"))); +void *toku_xmalloc_aligned(size_t alignment, size_t size) __attribute__((__visibility__("default"))); +// Effect: Perform a os_malloc_aligned(size) with the additional property that the returned pointer is a multiple of ALIGNMENT. +// Fail with a resource_assert if the allocation fails (don't return an error code). +// If the alloc_aligned function has been set then call it instead. +// Requires: alignment is a power of two. void toku_free(void*) __attribute__((__visibility__("default"))); void *toku_realloc(void *, size_t size) __attribute__((__visibility__("default"))); +void *toku_realloc_aligned(size_t alignment, void *p, size_t size) __attribute__((__visibility__("default"))); +// Effect: Perform a os_realloc_aligned(alignment, p, size) which has the additional property that the returned pointer is a multiple of ALIGNMENT. +// If the malloc_aligned function has been set then call it instead. +// Requires: alignment is a power of two. size_t toku_malloc_usable_size(void *p) __attribute__((__visibility__("default"))); @@ -50,6 +61,8 @@ size_t toku_malloc_usable_size(void *p) __attribute__((__visibility__("default") * to make an array of 5 integers. */ #define MALLOC_N(n,v) CAST_FROM_VOIDP(v, toku_malloc((n)*sizeof(*v))) +#define MALLOC_N_ALIGNED(align, n, v) CAST_FROM_VOIDP(v, toku_malloc_aligned((align), (n)*sizeof(*v))) + //CALLOC_N is like calloc with auto-figuring out size of members #define CALLOC_N(n,v) CAST_FROM_VOIDP(v, toku_calloc((n), sizeof(*v))) @@ -57,6 +70,7 @@ size_t toku_malloc_usable_size(void *p) __attribute__((__visibility__("default") #define CALLOC(v) CALLOC_N(1,v) #define REALLOC_N(n,v) CAST_FROM_VOIDP(v, toku_realloc(v, (n)*sizeof(*v))) +#define REALLOC_N_ALIGNED(align, n,v) CAST_FROM_VOIDP(v, toku_realloc_aligned((align), v, (n)*sizeof(*v))) // XMALLOC macros are like MALLOC except they abort if the operation fails #define XMALLOC(v) CAST_FROM_VOIDP(v, toku_xmalloc(sizeof(*v))) @@ -66,6 +80,8 @@ size_t toku_malloc_usable_size(void *p) __attribute__((__visibility__("default") #define XREALLOC(v,s) CAST_FROM_VOIDP(v, toku_xrealloc(v, s)) #define XREALLOC_N(n,v) CAST_FROM_VOIDP(v, toku_xrealloc(v, (n)*sizeof(*v))) +#define XMALLOC_N_ALIGNED(align, n, v) CAST_FROM_VOIDP(v, toku_xmalloc_aligned((align), (n)*sizeof(*v))) + #define XMEMDUP(dst, src) CAST_FROM_VOIDP(dst, toku_xmemdup(src, sizeof(*src))) #define XMEMDUP_N(dst, src, len) CAST_FROM_VOIDP(dst, toku_xmemdup(src, len)) @@ -94,6 +110,8 @@ void toku_do_memory_check(void); typedef void *(*malloc_fun_t)(size_t); typedef void (*free_fun_t)(void*); typedef void *(*realloc_fun_t)(void*,size_t); +typedef void *(*malloc_aligned_fun_t)(size_t /*alignment*/, size_t /*size*/); +typedef void *(*realloc_aligned_fun_t)(size_t /*alignment*/, void */*pointer*/, size_t /*size*/); void toku_set_func_malloc(malloc_fun_t f); void toku_set_func_xmalloc_only(malloc_fun_t f); diff --git a/toku_include/toku_portability.h b/toku_include/toku_portability.h index 35ed137340e..b109e71997b 100644 --- a/toku_include/toku_portability.h +++ b/toku_include/toku_portability.h @@ -247,8 +247,26 @@ extern void *realloc(void*, size_t) __THROW __attribute__((__deprecat #endif void *os_malloc(size_t) __attribute__((__visibility__("default"))); +// Effect: See man malloc(2) + +void *os_malloc_aligned(size_t /*alignment*/, size_t /*size*/) __attribute__((__visibility__("default"))); +// Effect: Perform a malloc(size) with the additional property that the returned pointer is a multiple of ALIGNMENT. +// Requires: alignment is a power of two. + + void *os_realloc(void*,size_t) __attribute__((__visibility__("default"))); +// Effect: See man realloc(2) + +void *os_realloc_aligned(size_t/*alignment*/, void*,size_t) __attribute__((__visibility__("default"))); +// Effect: Perform a realloc(p, size) with the additional property that the returned pointer is a multiple of ALIGNMENT. +// Requires: alignment is a power of two. + void os_free(void*) __attribute__((__visibility__("default"))); +// Effect: See man free(2) + +size_t os_malloc_usable_size(const void *p) __attribute__((__visibility__("default"))); +// Effect: Return an estimate of the usable size inside a pointer. If this function is not defined the memory.cc will +// look for the jemalloc, libc, or darwin versions of the function for computing memory footprint. // full_pwrite and full_write performs a pwrite, and checks errors. It doesn't return unless all the data was written. */ void toku_os_full_pwrite (int fd, const void *buf, size_t len, toku_off_t off) __attribute__((__visibility__("default"))); @@ -262,6 +280,7 @@ int toku_os_write (int fd, const void *buf, size_t len) __attribute__((__visibil FILE * toku_os_fdopen(int fildes, const char *mode); FILE * toku_os_fopen(const char *filename, const char *mode); int toku_os_open(const char *path, int oflag, int mode); +int toku_os_open_direct(const char *path, int oflag, int mode); int toku_os_close(int fd); int toku_os_fclose(FILE * stream); ssize_t toku_os_read(int fd, void *buf, size_t count); @@ -293,4 +312,17 @@ void toku_set_func_pread (ssize_t (*)(int, void *, size_t, off_t)); int toku_portability_init(void); void toku_portability_destroy(void); +static inline uint64_t roundup_to_multiple(uint64_t alignment, uint64_t v) +// Effect: Return X, where X the smallest multiple of ALIGNMENT such that X>=V. +// Requires: ALIGNMENT is a power of two +{ + assert(0==(alignment&(alignment-1))); // alignment must be a power of two + uint64_t result = (v+alignment-1)&~(alignment-1); + assert(result>=v); // The result is >=V. + assert(result%alignment==0); // The result is a multiple of alignment. + assert(result