mirror of
				https://github.com/MariaDB/server.git
				synced 2025-10-31 19:06:14 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			3259 lines
		
	
	
	
		
			120 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			3259 lines
		
	
	
	
		
			120 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
 | |
| // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
 | |
| #ident "$Id$"
 | |
| /*======
 | |
| This file is part of PerconaFT.
 | |
| 
 | |
| 
 | |
| Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
 | |
| 
 | |
|     PerconaFT is free software: you can redistribute it and/or modify
 | |
|     it under the terms of the GNU General Public License, version 2,
 | |
|     as published by the Free Software Foundation.
 | |
| 
 | |
|     PerconaFT is distributed in the hope that it will be useful,
 | |
|     but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|     GNU General Public License for more details.
 | |
| 
 | |
|     You should have received a copy of the GNU General Public License
 | |
|     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
| ----------------------------------------
 | |
| 
 | |
|     PerconaFT is free software: you can redistribute it and/or modify
 | |
|     it under the terms of the GNU Affero General Public License, version 3,
 | |
|     as published by the Free Software Foundation.
 | |
| 
 | |
|     PerconaFT is distributed in the hope that it will be useful,
 | |
|     but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|     GNU Affero General Public License for more details.
 | |
| 
 | |
|     You should have received a copy of the GNU Affero General Public License
 | |
|     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
 | |
| ======= */
 | |
| 
 | |
| #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
 | |
| 
 | |
| #include "portability/toku_atomic.h"
 | |
| 
 | |
| #include "ft/cachetable/cachetable.h"
 | |
| #include "ft/ft.h"
 | |
| #include "ft/ft-internal.h"
 | |
| #include "ft/node.h"
 | |
| #include "ft/logger/log-internal.h"
 | |
| #include "ft/txn/rollback.h"
 | |
| #include "ft/serialize/block_allocator.h"
 | |
| #include "ft/serialize/block_table.h"
 | |
| #include "ft/serialize/compress.h"
 | |
| #include "ft/serialize/ft_node-serialize.h"
 | |
| #include "ft/serialize/sub_block.h"
 | |
| #include "util/sort.h"
 | |
| #include "util/threadpool.h"
 | |
| #include "util/status.h"
 | |
| #include "util/scoped_malloc.h"
 | |
| 
 | |
| static FT_UPGRADE_STATUS_S ft_upgrade_status;
 | |
| 
 | |
| #define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(ft_upgrade_status, k, c, t, "ft upgrade: " l, inc)
 | |
| 
 | |
| static void
 | |
| status_init(void)
 | |
| {
 | |
|     // Note, this function initializes the keyname, type, and legend fields.
 | |
|     // Value fields are initialized to zero by compiler.
 | |
|     STATUS_INIT(FT_UPGRADE_FOOTPRINT,             nullptr, UINT64, "footprint", TOKU_ENGINE_STATUS);
 | |
|     ft_upgrade_status.initialized = true;
 | |
| }
 | |
| #undef STATUS_INIT
 | |
| 
 | |
| #define UPGRADE_STATUS_VALUE(x) ft_upgrade_status.status[x].value.num
 | |
| 
 | |
| void
 | |
| toku_ft_upgrade_get_status(FT_UPGRADE_STATUS s) {
 | |
|     if (!ft_upgrade_status.initialized) {
 | |
|         status_init();
 | |
|     }
 | |
|     UPGRADE_STATUS_VALUE(FT_UPGRADE_FOOTPRINT) = toku_log_upgrade_get_footprint();
 | |
|     *s = ft_upgrade_status;
 | |
| }
 | |
| 
 | |
| static int num_cores = 0; // cache the number of cores for the parallelization
 | |
| static struct toku_thread_pool *ft_pool = NULL;
 | |
| bool toku_serialize_in_parallel;
 | |
| 
 | |
| int get_num_cores(void) {
 | |
|     return num_cores;
 | |
| }
 | |
| 
 | |
| struct toku_thread_pool *get_ft_pool(void) {
 | |
|     return ft_pool;
 | |
| }
 | |
| 
 | |
| void toku_serialize_set_parallel(bool in_parallel) {
 | |
|     toku_unsafe_set(&toku_serialize_in_parallel, in_parallel);
 | |
| }
 | |
| 
 | |
| void toku_ft_serialize_layer_init(void) {
 | |
|     num_cores = toku_os_get_number_active_processors();
 | |
|     int r = toku_thread_pool_create(&ft_pool, num_cores);
 | |
|     lazy_assert_zero(r);
 | |
|     toku_serialize_in_parallel = false;
 | |
| }
 | |
| 
 | |
| void toku_ft_serialize_layer_destroy(void) {
 | |
|     toku_thread_pool_destroy(&ft_pool);
 | |
| }
 | |
| 
 | |
| enum { FILE_CHANGE_INCREMENT = (16 << 20) };
 | |
| 
 | |
| static inline uint64_t 
 | |
| alignup64(uint64_t a, uint64_t b) {
 | |
|     return ((a+b-1)/b)*b;
 | |
| }
 | |
| 
 | |
| // safe_file_size_lock must be held.
 | |
| void
 | |
| toku_maybe_truncate_file (int fd, uint64_t size_used, uint64_t expected_size, uint64_t *new_sizep)
 | |
| // Effect: If file size >= SIZE+32MiB, reduce file size.
 | |
| // (32 instead of 16.. hysteresis).
 | |
| // Return 0 on success, otherwise an error number.
 | |
| {
 | |
|     int64_t file_size;
 | |
|     {
 | |
|         int r = toku_os_get_file_size(fd, &file_size);
 | |
|         lazy_assert_zero(r);
 | |
|         invariant(file_size >= 0);
 | |
|     }
 | |
|     invariant(expected_size == (uint64_t)file_size);
 | |
|     // If file space is overallocated by at least 32M
 | |
|     if ((uint64_t)file_size >= size_used + (2*FILE_CHANGE_INCREMENT)) {
 | |
|         toku_off_t new_size = alignup64(size_used, (2*FILE_CHANGE_INCREMENT)); //Truncate to new size_used.
 | |
|         invariant(new_size < file_size);
 | |
|         invariant(new_size >= 0);
 | |
|         int r = ftruncate(fd, new_size);
 | |
|         lazy_assert_zero(r);
 | |
|         *new_sizep = new_size;
 | |
|     }
 | |
|     else {
 | |
|         *new_sizep = file_size;
 | |
|     }
 | |
|     return;
 | |
| }
 | |
| 
 | |
| static int64_t 
 | |
| min64(int64_t a, int64_t b) {
 | |
|     if (a<b) return a;
 | |
|     return b;
 | |
| }
 | |
| 
 | |
| void
 | |
| toku_maybe_preallocate_in_file (int fd, int64_t size, int64_t expected_size, int64_t *new_size)
 | |
| // Effect: make the file bigger by either doubling it or growing by 16MiB whichever is less, until it is at least size
 | |
| // Return 0 on success, otherwise an error number.
 | |
| {
 | |
|     int64_t file_size = 0;
 | |
|     //TODO(yoni): Allow variable stripe_width (perhaps from ft) for larger raids
 | |
|     const uint64_t stripe_width = 4096;
 | |
|     {
 | |
|         int r = toku_os_get_file_size(fd, &file_size);
 | |
|         if (r != 0) { // debug #2463
 | |
|             int the_errno = get_maybe_error_errno();
 | |
|             fprintf(stderr, "%s:%d fd=%d size=%" PRIu64 " r=%d errno=%d\n", __FUNCTION__, __LINE__, fd, size, r, the_errno); fflush(stderr);
 | |
|         }
 | |
|         lazy_assert_zero(r);
 | |
|     }
 | |
|     invariant(file_size >= 0);
 | |
|     invariant(expected_size == file_size);
 | |
|     // We want to double the size of the file, or add 16MiB, whichever is less.
 | |
|     // We emulate calling this function repeatedly until it satisfies the request.
 | |
|     int64_t to_write = 0;
 | |
|     if (file_size == 0) {
 | |
|         // Prevent infinite loop by starting with stripe_width as a base case.
 | |
|         to_write = stripe_width;
 | |
|     }
 | |
|     while (file_size + to_write < size) {
 | |
|         to_write += alignup64(min64(file_size + to_write, FILE_CHANGE_INCREMENT), stripe_width);
 | |
|     }
 | |
|     if (to_write > 0) {
 | |
|         assert(to_write%512==0);
 | |
|         toku::scoped_malloc_aligned wbuf_aligned(to_write, 512);
 | |
|         char *wbuf = reinterpret_cast<char *>(wbuf_aligned.get());
 | |
|         memset(wbuf, 0, to_write);
 | |
|         toku_off_t start_write = alignup64(file_size, stripe_width);
 | |
|         invariant(start_write >= file_size);
 | |
|         toku_os_full_pwrite(fd, wbuf, to_write, start_write);
 | |
|         *new_size = start_write + to_write;
 | |
|     }
 | |
|     else {
 | |
|         *new_size = file_size;
 | |
|     }
 | |
| }
 | |
| 
 | |
| // Don't include the sub_block header
 | |
| // Overhead calculated in same order fields are written to wbuf
 | |
| enum {
 | |
|     node_header_overhead = (8+   // magic "tokunode" or "tokuleaf" or "tokuroll"
 | |
|                             4+   // layout_version
 | |
|                             4+   // layout_version_original
 | |
|                             4),  // build_id
 | |
| };
 | |
| 
 | |
| // uncompressed header offsets
 | |
| enum {
 | |
|     uncompressed_magic_offset = 0,
 | |
|     uncompressed_version_offset = 8,
 | |
| };
 | |
| 
 | |
| static uint32_t
 | |
| serialize_node_header_size(FTNODE node) {
 | |
|     uint32_t retval = 0;
 | |
|     retval += 8; // magic
 | |
|     retval += sizeof(node->layout_version);
 | |
|     retval += sizeof(node->layout_version_original);
 | |
|     retval += 4; // BUILD_ID
 | |
|     retval += 4; // n_children
 | |
|     retval += node->n_children*8; // encode start offset and length of each partition
 | |
|     retval += 4; // checksum
 | |
|     return retval;
 | |
| }
 | |
| 
 | |
| static void
 | |
| serialize_node_header(FTNODE node, FTNODE_DISK_DATA ndd, struct wbuf *wbuf) {
 | |
|     if (node->height == 0) 
 | |
|         wbuf_nocrc_literal_bytes(wbuf, "tokuleaf", 8);
 | |
|     else 
 | |
|         wbuf_nocrc_literal_bytes(wbuf, "tokunode", 8);
 | |
|     paranoid_invariant(node->layout_version == FT_LAYOUT_VERSION);
 | |
|     wbuf_nocrc_int(wbuf, node->layout_version);
 | |
|     wbuf_nocrc_int(wbuf, node->layout_version_original);
 | |
|     wbuf_nocrc_uint(wbuf, BUILD_ID);
 | |
|     wbuf_nocrc_int (wbuf, node->n_children);
 | |
|     for (int i=0; i<node->n_children; i++) {
 | |
|         assert(BP_SIZE(ndd,i)>0);
 | |
|         wbuf_nocrc_int(wbuf, BP_START(ndd, i)); // save the beginning of the partition
 | |
|         wbuf_nocrc_int(wbuf, BP_SIZE (ndd, i));         // and the size
 | |
|     }
 | |
|     // checksum the header
 | |
|     uint32_t end_to_end_checksum = toku_x1764_memory(wbuf->buf, wbuf_get_woffset(wbuf));
 | |
|     wbuf_nocrc_int(wbuf, end_to_end_checksum);
 | |
|     invariant(wbuf->ndone == wbuf->size);
 | |
| }
 | |
| 
 | |
| static uint32_t
 | |
| serialize_ftnode_partition_size (FTNODE node, int i)
 | |
| {
 | |
|     uint32_t result = 0;
 | |
|     paranoid_invariant(node->bp[i].state == PT_AVAIL);
 | |
|     result++; // Byte that states what the partition is
 | |
|     if (node->height > 0) {
 | |
|         NONLEAF_CHILDINFO bnc = BNC(node, i);
 | |
|         // number of messages (4 bytes) plus size of the buffer
 | |
|         result += (4 + toku_bnc_nbytesinbuf(bnc));
 | |
|         // number of offsets (4 bytes) plus an array of 4 byte offsets, for each message tree
 | |
|         result += (4 + (4 * bnc->fresh_message_tree.size()));
 | |
|         result += (4 + (4 * bnc->stale_message_tree.size()));
 | |
|         result += (4 + (4 * bnc->broadcast_list.size()));
 | |
|     }
 | |
|     else {
 | |
|         result += 4 + bn_data::HEADER_LENGTH; // n_entries in buffer table + basement header
 | |
|         result += BLB_NBYTESINDATA(node, i);
 | |
|     }
 | |
|     result += 4; // checksum
 | |
|     return result;
 | |
| }
 | |
| 
 | |
| #define FTNODE_PARTITION_DMT_LEAVES 0xaa
 | |
| #define FTNODE_PARTITION_MSG_BUFFER 0xbb
 | |
| 
 | |
| UU() static int
 | |
| assert_fresh(const int32_t &offset, const uint32_t UU(idx), message_buffer *const msg_buffer) {
 | |
|     bool is_fresh = msg_buffer->get_freshness(offset);
 | |
|     assert(is_fresh);
 | |
|     return 0;
 | |
| }
 | |
| 
 | |
| UU() static int
 | |
| assert_stale(const int32_t &offset, const uint32_t UU(idx), message_buffer *const msg_buffer) {
 | |
|     bool is_fresh = msg_buffer->get_freshness(offset);
 | |
|     assert(!is_fresh);
 | |
|     return 0;
 | |
| }
 | |
| 
 | |
| static void bnc_verify_message_trees(NONLEAF_CHILDINFO UU(bnc)) {
 | |
| #ifdef TOKU_DEBUG_PARANOID
 | |
|     bnc->fresh_message_tree.iterate<message_buffer, assert_fresh>(&bnc->msg_buffer);
 | |
|     bnc->stale_message_tree.iterate<message_buffer, assert_stale>(&bnc->msg_buffer);
 | |
| #endif
 | |
| }
 | |
| 
 | |
| static int
 | |
| wbuf_write_offset(const int32_t &offset, const uint32_t UU(idx), struct wbuf *const wb) {
 | |
|     wbuf_nocrc_int(wb, offset);
 | |
|     return 0;
 | |
| }
 | |
| 
 | |
| static void serialize_child_buffer(NONLEAF_CHILDINFO bnc, struct wbuf *wb) {
 | |
|     unsigned char ch = FTNODE_PARTITION_MSG_BUFFER;
 | |
|     wbuf_nocrc_char(wb, ch);
 | |
| 
 | |
|     // serialize the message buffer
 | |
|     bnc->msg_buffer.serialize_to_wbuf(wb);
 | |
| 
 | |
|     // serialize the message trees (num entries, offsets array):
 | |
|     // first, verify their contents are consistent with the message buffer
 | |
|     bnc_verify_message_trees(bnc);
 | |
| 
 | |
|     // fresh
 | |
|     wbuf_nocrc_int(wb, bnc->fresh_message_tree.size());
 | |
|     bnc->fresh_message_tree.iterate<struct wbuf, wbuf_write_offset>(wb);
 | |
| 
 | |
|     // stale
 | |
|     wbuf_nocrc_int(wb, bnc->stale_message_tree.size());
 | |
|     bnc->stale_message_tree.iterate<struct wbuf, wbuf_write_offset>(wb);
 | |
| 
 | |
|     // broadcast
 | |
|     wbuf_nocrc_int(wb, bnc->broadcast_list.size());
 | |
|     bnc->broadcast_list.iterate<struct wbuf, wbuf_write_offset>(wb);
 | |
| }
 | |
| 
 | |
| //
 | |
| // Serialize the i'th partition of node into sb
 | |
| // For leaf nodes, this would be the i'th basement node
 | |
| // For internal nodes, this would be the i'th internal node
 | |
| //
 | |
| static void
 | |
| serialize_ftnode_partition(FTNODE node, int i, struct sub_block *sb) {
 | |
|     // Caller should have allocated memory.
 | |
|     invariant_notnull(sb->uncompressed_ptr);
 | |
|     invariant(sb->uncompressed_size > 0);
 | |
|     paranoid_invariant(sb->uncompressed_size == serialize_ftnode_partition_size(node, i));
 | |
| 
 | |
|     //
 | |
|     // Now put the data into sb->uncompressed_ptr
 | |
|     //
 | |
|     struct wbuf wb;
 | |
|     wbuf_init(&wb, sb->uncompressed_ptr, sb->uncompressed_size);
 | |
|     if (node->height > 0) {
 | |
|         // TODO: (Zardosht) possibly exit early if there are no messages
 | |
|         serialize_child_buffer(BNC(node, i), &wb);
 | |
|     }
 | |
|     else {
 | |
|         unsigned char ch = FTNODE_PARTITION_DMT_LEAVES;
 | |
|         bn_data* bd = BLB_DATA(node, i);
 | |
| 
 | |
|         wbuf_nocrc_char(&wb, ch);
 | |
|         wbuf_nocrc_uint(&wb, bd->num_klpairs());
 | |
| 
 | |
|         bd->serialize_to_wbuf(&wb);
 | |
|     }
 | |
|     uint32_t end_to_end_checksum = toku_x1764_memory(sb->uncompressed_ptr, wbuf_get_woffset(&wb));
 | |
|     wbuf_nocrc_int(&wb, end_to_end_checksum);
 | |
|     invariant(wb.ndone == wb.size);
 | |
|     invariant(sb->uncompressed_size==wb.ndone);
 | |
| }
 | |
| 
 | |
| //
 | |
| // Takes the data in sb->uncompressed_ptr, and compresses it 
 | |
| // into a newly allocated buffer sb->compressed_ptr
 | |
| // 
 | |
| static void
 | |
| compress_ftnode_sub_block(struct sub_block *sb, enum toku_compression_method method) {
 | |
|     invariant(sb->compressed_ptr != nullptr);
 | |
|     invariant(sb->compressed_size_bound > 0);
 | |
|     paranoid_invariant(sb->compressed_size_bound == toku_compress_bound(method, sb->uncompressed_size));
 | |
|     
 | |
|     //
 | |
|     // This probably seems a bit complicated. Here is what is going on.
 | |
|     // In PerconaFT 5.0, sub_blocks were compressed and the compressed data
 | |
|     // was checksummed. The checksum did NOT include the size of the compressed data
 | |
|     // and the size of the uncompressed data. The fields of sub_block only reference the
 | |
|     // compressed data, and it is the responsibility of the user of the sub_block
 | |
|     // to write the length
 | |
|     //
 | |
|     // For Dr. No, we want the checksum to also include the size of the compressed data, and the 
 | |
|     // size of the decompressed data, because this data
 | |
|     // may be read off of disk alone, so it must be verifiable alone.
 | |
|     //
 | |
|     // So, we pass in a buffer to compress_nocrc_sub_block that starts 8 bytes after the beginning
 | |
|     // of sb->compressed_ptr, so we have space to put in the sizes, and then run the checksum.
 | |
|     //
 | |
|     sb->compressed_size = compress_nocrc_sub_block(
 | |
|         sb,
 | |
|         (char *)sb->compressed_ptr + 8,
 | |
|         sb->compressed_size_bound,
 | |
|         method
 | |
|         );
 | |
| 
 | |
|     uint32_t* extra = (uint32_t *)(sb->compressed_ptr);
 | |
|     // store the compressed and uncompressed size at the beginning
 | |
|     extra[0] = toku_htod32(sb->compressed_size);
 | |
|     extra[1] = toku_htod32(sb->uncompressed_size);
 | |
|     // now checksum the entire thing
 | |
|     sb->compressed_size += 8; // now add the eight bytes that we saved for the sizes
 | |
|     sb->xsum = toku_x1764_memory(sb->compressed_ptr,sb->compressed_size);
 | |
| 
 | |
|     //
 | |
|     // This is the end result for Dr. No and forward. For ftnodes, sb->compressed_ptr contains
 | |
|     // two integers at the beginning, the size and uncompressed size, and then the compressed
 | |
|     // data. sb->xsum contains the checksum of this entire thing.
 | |
|     // 
 | |
|     // In PerconaFT 5.0, sb->compressed_ptr only contained the compressed data, sb->xsum
 | |
|     // checksummed only the compressed data, and the checksumming of the sizes were not
 | |
|     // done here.
 | |
|     //
 | |
| }
 | |
| 
 | |
| //
 | |
| // Returns the size needed to serialize the ftnode info
 | |
| // Does not include header information that is common with rollback logs
 | |
| // such as the magic, layout_version, and build_id
 | |
| // Includes only node specific info such as pivot information, n_children, and so on
 | |
| //
 | |
| static uint32_t
 | |
| serialize_ftnode_info_size(FTNODE node)
 | |
| {
 | |
|     uint32_t retval = 0;
 | |
|     retval += 8; // max_msn_applied_to_node_on_disk
 | |
|     retval += 4; // nodesize
 | |
|     retval += 4; // flags
 | |
|     retval += 4; // height;
 | |
|     retval += 8; // oldest_referenced_xid_known
 | |
|     retval += node->pivotkeys.serialized_size();
 | |
|     retval += (node->n_children-1)*4; // encode length of each pivot
 | |
|     if (node->height > 0) {
 | |
|         retval += node->n_children*8; // child blocknum's
 | |
|     }
 | |
|     retval += 4; // checksum
 | |
|     return retval;
 | |
| }
 | |
| 
 | |
| static void serialize_ftnode_info(FTNODE node, SUB_BLOCK sb) {
 | |
|     // Memory must have been allocated by our caller.
 | |
|     invariant(sb->uncompressed_size > 0);
 | |
|     invariant_notnull(sb->uncompressed_ptr);
 | |
|     paranoid_invariant(sb->uncompressed_size == serialize_ftnode_info_size(node));
 | |
| 
 | |
|     struct wbuf wb;
 | |
|     wbuf_init(&wb, sb->uncompressed_ptr, sb->uncompressed_size);
 | |
| 
 | |
|     wbuf_MSN(&wb, node->max_msn_applied_to_node_on_disk);
 | |
|     wbuf_nocrc_uint(&wb, 0); // write a dummy value for where node->nodesize used to be
 | |
|     wbuf_nocrc_uint(&wb, node->flags);
 | |
|     wbuf_nocrc_int (&wb, node->height);    
 | |
|     wbuf_TXNID(&wb, node->oldest_referenced_xid_known);
 | |
|     node->pivotkeys.serialize_to_wbuf(&wb);
 | |
| 
 | |
|     // child blocks, only for internal nodes
 | |
|     if (node->height > 0) {
 | |
|         for (int i = 0; i < node->n_children; i++) {
 | |
|             wbuf_nocrc_BLOCKNUM(&wb, BP_BLOCKNUM(node,i));
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     uint32_t end_to_end_checksum = toku_x1764_memory(sb->uncompressed_ptr, wbuf_get_woffset(&wb));
 | |
|     wbuf_nocrc_int(&wb, end_to_end_checksum);
 | |
|     invariant(wb.ndone == wb.size);
 | |
|     invariant(sb->uncompressed_size==wb.ndone);
 | |
| }
 | |
| 
 | |
| // This is the size of the uncompressed data, not including the compression headers
 | |
| unsigned int
 | |
| toku_serialize_ftnode_size (FTNODE node) {
 | |
|     unsigned int result = 0;
 | |
|     //
 | |
|     // As of now, this seems to be called if and only if the entire node is supposed
 | |
|     // to be in memory, so we will assert it.
 | |
|     //
 | |
|     toku_ftnode_assert_fully_in_memory(node);
 | |
|     result += serialize_node_header_size(node);
 | |
|     result += serialize_ftnode_info_size(node);
 | |
|     for (int i = 0; i < node->n_children; i++) {
 | |
|         result += serialize_ftnode_partition_size(node,i);
 | |
|     }
 | |
|     return result;
 | |
| }
 | |
| 
 | |
| struct serialize_times {
 | |
|     tokutime_t serialize_time;
 | |
|     tokutime_t compress_time;
 | |
| };
 | |
| 
 | |
| static void
 | |
| serialize_and_compress_partition(FTNODE node,
 | |
|                                  int childnum,
 | |
|                                  enum toku_compression_method compression_method,
 | |
|                                  SUB_BLOCK sb,
 | |
|                                  struct serialize_times *st)
 | |
| {
 | |
|     // serialize, compress, update status
 | |
|     tokutime_t t0 = toku_time_now();
 | |
|     serialize_ftnode_partition(node, childnum, sb);
 | |
|     tokutime_t t1 = toku_time_now();
 | |
|     compress_ftnode_sub_block(sb, compression_method);
 | |
|     tokutime_t t2 = toku_time_now();
 | |
| 
 | |
|     st->serialize_time += t1 - t0;
 | |
|     st->compress_time += t2 - t1;
 | |
| }
 | |
| 
 | |
| void
 | |
| toku_create_compressed_partition_from_available(
 | |
|     FTNODE node,
 | |
|     int childnum,
 | |
|     enum toku_compression_method compression_method,
 | |
|     SUB_BLOCK sb
 | |
|     )
 | |
| {
 | |
|     tokutime_t t0 = toku_time_now();
 | |
| 
 | |
|     // serialize
 | |
|     sb->uncompressed_size = serialize_ftnode_partition_size(node, childnum);
 | |
|     toku::scoped_malloc uncompressed_buf(sb->uncompressed_size);
 | |
|     sb->uncompressed_ptr = uncompressed_buf.get();
 | |
|     serialize_ftnode_partition(node, childnum, sb);
 | |
| 
 | |
|     tokutime_t t1 = toku_time_now();
 | |
| 
 | |
|     // compress. no need to pad with extra bytes for sizes/xsum - we're not storing them
 | |
|     set_compressed_size_bound(sb, compression_method);
 | |
|     sb->compressed_ptr = toku_xmalloc(sb->compressed_size_bound);
 | |
|     sb->compressed_size = compress_nocrc_sub_block(
 | |
|         sb,
 | |
|         sb->compressed_ptr,
 | |
|         sb->compressed_size_bound,
 | |
|         compression_method
 | |
|         );
 | |
|     sb->uncompressed_ptr = NULL;
 | |
| 
 | |
|     tokutime_t t2 = toku_time_now();
 | |
| 
 | |
|     toku_ft_status_update_serialize_times(node, t1 - t0, t2 - t1);
 | |
| }
 | |
| 
 | |
| static void
 | |
| serialize_and_compress_serially(FTNODE node,
 | |
|                                 int npartitions,
 | |
|                                 enum toku_compression_method compression_method,
 | |
|                                 struct sub_block sb[],
 | |
|                                 struct serialize_times *st) {
 | |
|     for (int i = 0; i < npartitions; i++) {
 | |
|         serialize_and_compress_partition(node, i, compression_method, &sb[i], st);
 | |
|     }
 | |
| }
 | |
| 
 | |
| struct serialize_compress_work {
 | |
|     struct work base;
 | |
|     FTNODE node;
 | |
|     int i;
 | |
|     enum toku_compression_method compression_method;
 | |
|     struct sub_block *sb;
 | |
|     struct serialize_times st;
 | |
| };
 | |
| 
 | |
| static void *
 | |
| serialize_and_compress_worker(void *arg) {
 | |
|     struct workset *ws = (struct workset *) arg;
 | |
|     while (1) {
 | |
|         struct serialize_compress_work *w = (struct serialize_compress_work *) workset_get(ws);
 | |
|         if (w == NULL)
 | |
|             break;
 | |
|         int i = w->i;
 | |
|         serialize_and_compress_partition(w->node, i, w->compression_method, &w->sb[i], &w->st);
 | |
|     }
 | |
|     workset_release_ref(ws);
 | |
|     return arg;
 | |
| }
 | |
| 
 | |
| static void
 | |
| serialize_and_compress_in_parallel(FTNODE node,
 | |
|                                    int npartitions,
 | |
|                                    enum toku_compression_method compression_method,
 | |
|                                    struct sub_block sb[],
 | |
|                                    struct serialize_times *st) {
 | |
|     if (npartitions == 1) {
 | |
|         serialize_and_compress_partition(node, 0, compression_method, &sb[0], st);
 | |
|     } else {
 | |
|         int T = num_cores;
 | |
|         if (T > npartitions)
 | |
|             T = npartitions;
 | |
|         if (T > 0)
 | |
|             T = T - 1;
 | |
|         struct workset ws;
 | |
|         ZERO_STRUCT(ws);
 | |
|         workset_init(&ws);
 | |
|         struct serialize_compress_work work[npartitions];
 | |
|         workset_lock(&ws);
 | |
|         for (int i = 0; i < npartitions; i++) {
 | |
|             work[i] = (struct serialize_compress_work) { .base = {{NULL, NULL}},
 | |
|                                                          .node = node,
 | |
|                                                          .i = i,
 | |
|                                                          .compression_method = compression_method,
 | |
|                                                          .sb = sb,
 | |
|                                                          .st = { .serialize_time = 0, .compress_time = 0} };
 | |
|             workset_put_locked(&ws, &work[i].base);
 | |
|         }
 | |
|         workset_unlock(&ws);
 | |
|         toku_thread_pool_run(ft_pool, 0, &T, serialize_and_compress_worker, &ws);
 | |
|         workset_add_ref(&ws, T);
 | |
|         serialize_and_compress_worker(&ws);
 | |
|         workset_join(&ws);
 | |
|         workset_destroy(&ws);
 | |
| 
 | |
|         // gather up the statistics from each thread's work item
 | |
|         for (int i = 0; i < npartitions; i++) {
 | |
|             st->serialize_time += work[i].st.serialize_time;
 | |
|             st->compress_time += work[i].st.compress_time;
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| static void
 | |
| serialize_and_compress_sb_node_info(FTNODE node, struct sub_block *sb,
 | |
|         enum toku_compression_method compression_method, struct serialize_times *st) {
 | |
|     // serialize, compress, update serialize times.
 | |
|     tokutime_t t0 = toku_time_now();
 | |
|     serialize_ftnode_info(node, sb);
 | |
|     tokutime_t t1 = toku_time_now();
 | |
|     compress_ftnode_sub_block(sb, compression_method);
 | |
|     tokutime_t t2 = toku_time_now();
 | |
| 
 | |
|     st->serialize_time += t1 - t0;
 | |
|     st->compress_time += t2 - t1;
 | |
| }
 | |
| 
 | |
| int toku_serialize_ftnode_to_memory(FTNODE node,
 | |
|                                     FTNODE_DISK_DATA* ndd,
 | |
|                                     unsigned int basementnodesize,
 | |
|                                     enum toku_compression_method compression_method,
 | |
|                                     bool do_rebalancing,
 | |
|                                     bool in_parallel, // for loader is true, for toku_ftnode_flush_callback, is false
 | |
|                             /*out*/ size_t *n_bytes_to_write,
 | |
|                             /*out*/ size_t *n_uncompressed_bytes,
 | |
|                             /*out*/ char  **bytes_to_write)
 | |
| // Effect: Writes out each child to a separate malloc'd buffer, then compresses
 | |
| //   all of them, and writes the uncompressed header, to bytes_to_write,
 | |
| //   which is malloc'd.
 | |
| //
 | |
| //   The resulting buffer is guaranteed to be 512-byte aligned and the total length is a multiple of 512 (so we pad with zeros at the end if needed).
 | |
| //   512-byte padding is for O_DIRECT to work.
 | |
| {
 | |
|     toku_ftnode_assert_fully_in_memory(node);
 | |
| 
 | |
|     if (do_rebalancing && node->height == 0) {
 | |
|         toku_ftnode_leaf_rebalance(node, basementnodesize);
 | |
|     }
 | |
|     const int npartitions = node->n_children;
 | |
| 
 | |
|     // Each partition represents a compressed sub block
 | |
|     // For internal nodes, a sub block is a message buffer
 | |
|     // For leaf nodes, a sub block is a basement node
 | |
|     toku::scoped_calloc sb_buf(sizeof(struct sub_block) * npartitions);
 | |
|     struct sub_block *sb = reinterpret_cast<struct sub_block *>(sb_buf.get());
 | |
|     XREALLOC_N(npartitions, *ndd);
 | |
| 
 | |
|     //
 | |
|     // First, let's serialize and compress the individual sub blocks
 | |
|     //
 | |
| 
 | |
|     // determine how large our serialization and compression buffers need to be.
 | |
|     size_t serialize_buf_size = 0, compression_buf_size = 0;
 | |
|     for (int i = 0; i < node->n_children; i++) {
 | |
|         sb[i].uncompressed_size = serialize_ftnode_partition_size(node, i);
 | |
|         sb[i].compressed_size_bound = toku_compress_bound(compression_method, sb[i].uncompressed_size);
 | |
|         serialize_buf_size += sb[i].uncompressed_size;
 | |
|         compression_buf_size += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
 | |
|     }
 | |
| 
 | |
|     // give each sub block a base pointer to enough buffer space for serialization and compression
 | |
|     toku::scoped_malloc serialize_buf(serialize_buf_size);
 | |
|     toku::scoped_malloc compression_buf(compression_buf_size);
 | |
|     for (size_t i = 0, uncompressed_offset = 0, compressed_offset = 0; i < (size_t) node->n_children; i++) {
 | |
|         sb[i].uncompressed_ptr = reinterpret_cast<char *>(serialize_buf.get()) + uncompressed_offset;
 | |
|         sb[i].compressed_ptr = reinterpret_cast<char *>(compression_buf.get()) + compressed_offset;
 | |
|         uncompressed_offset += sb[i].uncompressed_size;
 | |
|         compressed_offset += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
 | |
|         invariant(uncompressed_offset <= serialize_buf_size);
 | |
|         invariant(compressed_offset <= compression_buf_size);
 | |
|     }
 | |
| 
 | |
|     // do the actual serialization now that we have buffer space
 | |
|     struct serialize_times st = { 0, 0 };
 | |
|     if (in_parallel) {
 | |
|         serialize_and_compress_in_parallel(node, npartitions, compression_method, sb, &st);
 | |
|     } else {
 | |
|         serialize_and_compress_serially(node, npartitions, compression_method, sb, &st);
 | |
|     }
 | |
| 
 | |
|     //
 | |
|     // Now lets create a sub-block that has the common node information,
 | |
|     // This does NOT include the header
 | |
|     //
 | |
| 
 | |
|     // determine how large our serialization and copmression buffers need to be
 | |
|     struct sub_block sb_node_info;
 | |
|     sub_block_init(&sb_node_info);
 | |
|     size_t sb_node_info_uncompressed_size = serialize_ftnode_info_size(node);
 | |
|     size_t sb_node_info_compressed_size_bound = toku_compress_bound(compression_method, sb_node_info_uncompressed_size);
 | |
|     toku::scoped_malloc sb_node_info_uncompressed_buf(sb_node_info_uncompressed_size);
 | |
|     toku::scoped_malloc sb_node_info_compressed_buf(sb_node_info_compressed_size_bound + 8); // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
 | |
|     sb_node_info.uncompressed_size = sb_node_info_uncompressed_size;
 | |
|     sb_node_info.uncompressed_ptr = sb_node_info_uncompressed_buf.get();
 | |
|     sb_node_info.compressed_size_bound = sb_node_info_compressed_size_bound;
 | |
|     sb_node_info.compressed_ptr = sb_node_info_compressed_buf.get();
 | |
| 
 | |
|     // do the actual serialization now that we have buffer space
 | |
|     serialize_and_compress_sb_node_info(node, &sb_node_info, compression_method, &st);
 | |
| 
 | |
|     //
 | |
|     // At this point, we have compressed each of our pieces into individual sub_blocks,
 | |
|     // we can put the header and all the subblocks into a single buffer and return it.
 | |
|     //
 | |
| 
 | |
|     // update the serialize times, ignore the header for simplicity. we captured all
 | |
|     // of the partitions' serialize times so that's probably good enough.
 | |
|     toku_ft_status_update_serialize_times(node, st.serialize_time, st.compress_time);
 | |
| 
 | |
|     // The total size of the node is:
 | |
|     // size of header + disk size of the n+1 sub_block's created above
 | |
|     uint32_t total_node_size = (serialize_node_header_size(node) // uncompressed header
 | |
|                                  + sb_node_info.compressed_size   // compressed nodeinfo (without its checksum)
 | |
|                                  + 4);                            // nodeinfo's checksum
 | |
|     uint32_t total_uncompressed_size = (serialize_node_header_size(node) // uncompressed header
 | |
|                                  + sb_node_info.uncompressed_size   // uncompressed nodeinfo (without its checksum)
 | |
|                                  + 4);                            // nodeinfo's checksum
 | |
|     // store the BP_SIZESs
 | |
|     for (int i = 0; i < node->n_children; i++) {
 | |
|         uint32_t len         = sb[i].compressed_size + 4; // data and checksum
 | |
|         BP_SIZE (*ndd,i) = len;
 | |
|         BP_START(*ndd,i) = total_node_size;
 | |
|         total_node_size += sb[i].compressed_size + 4;
 | |
|         total_uncompressed_size += sb[i].uncompressed_size + 4;
 | |
|     }
 | |
| 
 | |
|     // now create the final serialized node
 | |
|     uint32_t total_buffer_size = roundup_to_multiple(512, total_node_size); // make the buffer be 512 bytes.
 | |
|     char *XMALLOC_N_ALIGNED(512, total_buffer_size, data);
 | |
|     char *curr_ptr = data;
 | |
| 
 | |
|     // write the header
 | |
|     struct wbuf wb;
 | |
|     wbuf_init(&wb, curr_ptr, serialize_node_header_size(node));
 | |
|     serialize_node_header(node, *ndd, &wb);
 | |
|     assert(wb.ndone == wb.size);
 | |
|     curr_ptr += serialize_node_header_size(node);
 | |
| 
 | |
|     // now write sb_node_info
 | |
|     memcpy(curr_ptr, sb_node_info.compressed_ptr, sb_node_info.compressed_size);
 | |
|     curr_ptr += sb_node_info.compressed_size;
 | |
|     // write the checksum
 | |
|     *(uint32_t *)curr_ptr = toku_htod32(sb_node_info.xsum);
 | |
|     curr_ptr += sizeof(sb_node_info.xsum);
 | |
| 
 | |
|     for (int i = 0; i < npartitions; i++) {
 | |
|         memcpy(curr_ptr, sb[i].compressed_ptr, sb[i].compressed_size);
 | |
|         curr_ptr += sb[i].compressed_size;
 | |
|         // write the checksum
 | |
|         *(uint32_t *)curr_ptr = toku_htod32(sb[i].xsum);
 | |
|         curr_ptr += sizeof(sb[i].xsum);
 | |
|     }
 | |
|     // Zero the rest of the buffer
 | |
|     memset(data + total_node_size, 0, total_buffer_size - total_node_size);
 | |
|             
 | |
|     assert((uint32_t) (curr_ptr - data) == total_node_size);
 | |
|     *bytes_to_write = data;
 | |
|     *n_bytes_to_write = total_buffer_size;
 | |
|     *n_uncompressed_bytes = total_uncompressed_size;
 | |
| 
 | |
|     invariant(*n_bytes_to_write % 512 == 0);
 | |
|     invariant(reinterpret_cast<unsigned long long>(*bytes_to_write) % 512 == 0);
 | |
|     return 0;
 | |
| }
 | |
| 
 | |
| int toku_serialize_ftnode_to(int fd,
 | |
|                              BLOCKNUM blocknum,
 | |
|                              FTNODE node,
 | |
|                              FTNODE_DISK_DATA *ndd,
 | |
|                              bool do_rebalancing,
 | |
|                              FT ft,
 | |
|                              bool for_checkpoint) {
 | |
|     size_t n_to_write;
 | |
|     size_t n_uncompressed_bytes;
 | |
|     char *compressed_buf = nullptr;
 | |
| 
 | |
|     // because toku_serialize_ftnode_to is only called for
 | |
|     // in toku_ftnode_flush_callback, we pass false
 | |
|     // for in_parallel. The reasoning is that when we write
 | |
|     // nodes to disk via toku_ftnode_flush_callback, we
 | |
|     // assume that it is being done on a non-critical
 | |
|     // background thread (probably for checkpointing), and therefore
 | |
|     // should not hog CPU,
 | |
|     //
 | |
|     // Should the above facts change, we may want to revisit
 | |
|     // passing false for in_parallel here
 | |
|     //
 | |
|     // alternatively, we could have made in_parallel a parameter
 | |
|     // for toku_serialize_ftnode_to, but instead we did this.
 | |
|     int r = toku_serialize_ftnode_to_memory(
 | |
|         node,
 | |
|         ndd,
 | |
|         ft->h->basementnodesize,
 | |
|         ft->h->compression_method,
 | |
|         do_rebalancing,
 | |
|         toku_unsafe_fetch(&toku_serialize_in_parallel),
 | |
|         &n_to_write,
 | |
|         &n_uncompressed_bytes,
 | |
|         &compressed_buf);
 | |
|     if (r != 0) {
 | |
|         return r;
 | |
|     }
 | |
| 
 | |
|     // If the node has never been written, then write the whole buffer,
 | |
|     // including the zeros
 | |
|     invariant(blocknum.b >= 0);
 | |
|     DISKOFF offset;
 | |
| 
 | |
|     // Dirties the ft
 | |
|     ft->blocktable.realloc_on_disk(
 | |
|         blocknum, n_to_write, &offset, ft, fd, for_checkpoint);
 | |
| 
 | |
|     tokutime_t t0 = toku_time_now();
 | |
|     toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset);
 | |
|     tokutime_t t1 = toku_time_now();
 | |
| 
 | |
|     tokutime_t io_time = t1 - t0;
 | |
|     toku_ft_status_update_flush_reason(
 | |
|         node, n_uncompressed_bytes, n_to_write, io_time, for_checkpoint);
 | |
| 
 | |
|     toku_free(compressed_buf);
 | |
|     node->clear_dirty();  // See #1957.   Must set the node to be clean after
 | |
|                       // serializing it so that it doesn't get written again on
 | |
|                       // the next checkpoint or eviction.
 | |
|     if (node->height == 0) {
 | |
|         for (int i = 0; i < node->n_children; i++) {
 | |
|             if (BP_STATE(node, i) == PT_AVAIL) {
 | |
|                 BLB_LRD(node, i) = 0;
 | |
|             }
 | |
|         }
 | |
|     }
 | |
|     return 0;
 | |
| }
 | |
| 
 | |
| static void
 | |
| sort_and_steal_offset_arrays(NONLEAF_CHILDINFO bnc,
 | |
|                              const toku::comparator &cmp,
 | |
|                              int32_t **fresh_offsets, int32_t nfresh,
 | |
|                              int32_t **stale_offsets, int32_t nstale,
 | |
|                              int32_t **broadcast_offsets, int32_t nbroadcast) {
 | |
|     // We always have fresh / broadcast offsets (even if they are empty)
 | |
|     // but we may not have stale offsets, in the case of v13 upgrade.
 | |
|     invariant(fresh_offsets != nullptr);
 | |
|     invariant(broadcast_offsets != nullptr);
 | |
|     invariant(cmp.valid());
 | |
| 
 | |
|     typedef toku::sort<int32_t, const struct toku_msg_buffer_key_msn_cmp_extra, toku_msg_buffer_key_msn_cmp> msn_sort;
 | |
| 
 | |
|     const int32_t n_in_this_buffer = nfresh + nstale + nbroadcast;
 | |
|     struct toku_msg_buffer_key_msn_cmp_extra extra(cmp, &bnc->msg_buffer);
 | |
|     msn_sort::mergesort_r(*fresh_offsets, nfresh, extra);
 | |
|     bnc->fresh_message_tree.destroy();
 | |
|     bnc->fresh_message_tree.create_steal_sorted_array(fresh_offsets, nfresh, n_in_this_buffer);
 | |
|     if (stale_offsets) {
 | |
|         msn_sort::mergesort_r(*stale_offsets, nstale, extra);
 | |
|         bnc->stale_message_tree.destroy();
 | |
|         bnc->stale_message_tree.create_steal_sorted_array(stale_offsets, nstale, n_in_this_buffer);
 | |
|     }
 | |
|     bnc->broadcast_list.destroy();
 | |
|     bnc->broadcast_list.create_steal_sorted_array(broadcast_offsets, nbroadcast, n_in_this_buffer);
 | |
| }
 | |
| 
 | |
| static MSN
 | |
| deserialize_child_buffer_v13(FT ft, NONLEAF_CHILDINFO bnc, struct rbuf *rb) {
 | |
|     // We skip 'stale' offsets for upgraded nodes.
 | |
|     int32_t nfresh = 0, nbroadcast = 0;
 | |
|     int32_t *fresh_offsets = nullptr, *broadcast_offsets = nullptr;
 | |
| 
 | |
|     // Only sort buffers if we have a valid comparison function. In certain scenarios,
 | |
|     // like deserialie_ft_versioned() or tokuftdump, we'll need to deserialize ftnodes
 | |
|     // for simple inspection and don't actually require that the message buffers are
 | |
|     // properly sorted. This is very ugly, but correct.
 | |
|     const bool sort = ft->cmp.valid();
 | |
| 
 | |
|     MSN highest_msn_in_this_buffer =
 | |
|         bnc->msg_buffer.deserialize_from_rbuf_v13(rb, &ft->h->highest_unused_msn_for_upgrade,
 | |
|                                                   sort ? &fresh_offsets : nullptr, &nfresh,
 | |
|                                                   sort ? &broadcast_offsets : nullptr, &nbroadcast);
 | |
| 
 | |
|     if (sort) {
 | |
|         sort_and_steal_offset_arrays(bnc, ft->cmp,
 | |
|                                      &fresh_offsets, nfresh,
 | |
|                                      nullptr, 0, // no stale offsets
 | |
|                                      &broadcast_offsets, nbroadcast);
 | |
|     }
 | |
| 
 | |
|     return highest_msn_in_this_buffer;
 | |
| }
 | |
| 
 | |
| static void
 | |
| deserialize_child_buffer_v26(NONLEAF_CHILDINFO bnc, struct rbuf *rb, const toku::comparator &cmp) {
 | |
|     int32_t nfresh = 0, nstale = 0, nbroadcast = 0;
 | |
|     int32_t *fresh_offsets, *stale_offsets, *broadcast_offsets;
 | |
| 
 | |
|     // Only sort buffers if we have a valid comparison function. In certain scenarios,
 | |
|     // like deserialie_ft_versioned() or tokuftdump, we'll need to deserialize ftnodes
 | |
|     // for simple inspection and don't actually require that the message buffers are
 | |
|     // properly sorted. This is very ugly, but correct.
 | |
|     const bool sort = cmp.valid();
 | |
| 
 | |
|     // read in the message buffer
 | |
|     bnc->msg_buffer.deserialize_from_rbuf(rb,
 | |
|                                           sort ? &fresh_offsets : nullptr, &nfresh,
 | |
|                                           sort ? &stale_offsets : nullptr, &nstale,
 | |
|                                           sort ? &broadcast_offsets : nullptr, &nbroadcast);
 | |
| 
 | |
|     if (sort) {
 | |
|         sort_and_steal_offset_arrays(bnc, cmp,
 | |
|                                      &fresh_offsets, nfresh,
 | |
|                                      &stale_offsets, nstale,
 | |
|                                      &broadcast_offsets, nbroadcast);
 | |
|     }
 | |
| }
 | |
| 
 | |
| static void
 | |
| deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rb) {
 | |
|     // read in the message buffer
 | |
|     bnc->msg_buffer.deserialize_from_rbuf(rb,
 | |
|                                           nullptr, nullptr,  // fresh_offsets, nfresh,
 | |
|                                           nullptr, nullptr,  // stale_offsets, nstale,
 | |
|                                           nullptr, nullptr); // broadcast_offsets, nbroadcast
 | |
| 
 | |
|     // read in each message tree (fresh, stale, broadcast)
 | |
|     int32_t nfresh = rbuf_int(rb);
 | |
|     int32_t *XMALLOC_N(nfresh, fresh_offsets);
 | |
|     for (int i = 0; i < nfresh; i++) {
 | |
|         fresh_offsets[i] = rbuf_int(rb);
 | |
|     }
 | |
| 
 | |
|     int32_t nstale = rbuf_int(rb);
 | |
|     int32_t *XMALLOC_N(nstale, stale_offsets);
 | |
|     for (int i = 0; i < nstale; i++) {
 | |
|         stale_offsets[i] = rbuf_int(rb);
 | |
|     }
 | |
| 
 | |
|     int32_t nbroadcast = rbuf_int(rb);
 | |
|     int32_t *XMALLOC_N(nbroadcast, broadcast_offsets);
 | |
|     for (int i = 0; i < nbroadcast; i++) {
 | |
|         broadcast_offsets[i] = rbuf_int(rb);
 | |
|     }
 | |
| 
 | |
|     // build OMTs out of each offset array
 | |
|     bnc->fresh_message_tree.destroy();
 | |
|     bnc->fresh_message_tree.create_steal_sorted_array(&fresh_offsets, nfresh, nfresh);
 | |
|     bnc->stale_message_tree.destroy();
 | |
|     bnc->stale_message_tree.create_steal_sorted_array(&stale_offsets, nstale, nstale);
 | |
|     bnc->broadcast_list.destroy();
 | |
|     bnc->broadcast_list.create_steal_sorted_array(&broadcast_offsets, nbroadcast, nbroadcast);
 | |
| }
 | |
| 
 | |
| // dump a buffer to stderr
 | |
| // no locking around this for now
 | |
| void
 | |
| dump_bad_block(unsigned char *vp, uint64_t size) {
 | |
|     const uint64_t linesize = 64;
 | |
|     uint64_t n = size / linesize;
 | |
|     for (uint64_t i = 0; i < n; i++) {
 | |
|         fprintf(stderr, "%p: ", vp);
 | |
|         for (uint64_t j = 0; j < linesize; j++) {
 | |
|             unsigned char c = vp[j];
 | |
|             fprintf(stderr, "%2.2X", c);
 | |
|         }
 | |
|         fprintf(stderr, "\n");
 | |
|         vp += linesize;
 | |
|     }
 | |
|     size = size % linesize;
 | |
|     for (uint64_t i=0; i<size; i++) {
 | |
|         if ((i % linesize) == 0)
 | |
|             fprintf(stderr, "%p: ", vp+i);
 | |
|         fprintf(stderr, "%2.2X", vp[i]);
 | |
|         if (((i+1) % linesize) == 0)
 | |
|             fprintf(stderr, "\n");
 | |
|     }
 | |
|     fprintf(stderr, "\n");
 | |
| }
 | |
| 
 | |
| ////////////////////////////////////////////////////////////////////
 | |
| ////////////////////////////////////////////////////////////////////
 | |
| ////////////////////////////////////////////////////////////////////
 | |
| ////////////////////////////////////////////////////////////////////
 | |
| ////////////////////////////////////////////////////////////////////
 | |
| ////////////////////////////////////////////////////////////////////
 | |
| ////////////////////////////////////////////////////////////////////
 | |
| ////////////////////////////////////////////////////////////////////
 | |
| 
 | |
| BASEMENTNODE toku_create_empty_bn(void) {
 | |
|     BASEMENTNODE bn = toku_create_empty_bn_no_buffer();
 | |
|     bn->data_buffer.initialize_empty();
 | |
|     return bn;
 | |
| }
 | |
| 
 | |
| BASEMENTNODE toku_clone_bn(BASEMENTNODE orig_bn) {
 | |
|     BASEMENTNODE bn = toku_create_empty_bn_no_buffer();
 | |
|     bn->max_msn_applied = orig_bn->max_msn_applied;
 | |
|     bn->seqinsert = orig_bn->seqinsert;
 | |
|     bn->stale_ancestor_messages_applied = orig_bn->stale_ancestor_messages_applied;
 | |
|     bn->stat64_delta = orig_bn->stat64_delta;
 | |
|     bn->logical_rows_delta = orig_bn->logical_rows_delta;
 | |
|     bn->data_buffer.clone(&orig_bn->data_buffer);
 | |
|     return bn;
 | |
| }
 | |
| 
 | |
| BASEMENTNODE toku_create_empty_bn_no_buffer(void) {
 | |
|     BASEMENTNODE XMALLOC(bn);
 | |
|     bn->max_msn_applied.msn = 0;
 | |
|     bn->seqinsert = 0;
 | |
|     bn->stale_ancestor_messages_applied = false;
 | |
|     bn->stat64_delta = ZEROSTATS;
 | |
|     bn->logical_rows_delta = 0;
 | |
|     bn->data_buffer.init_zero();
 | |
|     return bn;
 | |
| }
 | |
| 
 | |
| NONLEAF_CHILDINFO toku_create_empty_nl(void) {
 | |
|     NONLEAF_CHILDINFO XMALLOC(cn);
 | |
|     cn->msg_buffer.create();
 | |
|     cn->fresh_message_tree.create_no_array();
 | |
|     cn->stale_message_tree.create_no_array();
 | |
|     cn->broadcast_list.create_no_array();
 | |
|     memset(cn->flow, 0, sizeof cn->flow);
 | |
|     return cn;
 | |
| }
 | |
| 
 | |
| // must clone the OMTs, since we serialize them along with the message buffer
 | |
| NONLEAF_CHILDINFO toku_clone_nl(NONLEAF_CHILDINFO orig_childinfo) {
 | |
|     NONLEAF_CHILDINFO XMALLOC(cn);
 | |
|     cn->msg_buffer.clone(&orig_childinfo->msg_buffer);
 | |
|     cn->fresh_message_tree.create_no_array();
 | |
|     cn->fresh_message_tree.clone(orig_childinfo->fresh_message_tree);
 | |
|     cn->stale_message_tree.create_no_array();
 | |
|     cn->stale_message_tree.clone(orig_childinfo->stale_message_tree);
 | |
|     cn->broadcast_list.create_no_array();
 | |
|     cn->broadcast_list.clone(orig_childinfo->broadcast_list);
 | |
|     memset(cn->flow, 0, sizeof cn->flow);
 | |
|     return cn;
 | |
| }
 | |
| 
 | |
| void destroy_basement_node (BASEMENTNODE bn)
 | |
| {
 | |
|     bn->data_buffer.destroy();
 | |
|     toku_free(bn);
 | |
| }
 | |
| 
 | |
| void destroy_nonleaf_childinfo (NONLEAF_CHILDINFO nl)
 | |
| {
 | |
|     nl->msg_buffer.destroy();
 | |
|     nl->fresh_message_tree.destroy();
 | |
|     nl->stale_message_tree.destroy();
 | |
|     nl->broadcast_list.destroy();
 | |
|     toku_free(nl);
 | |
| }
 | |
| 
 | |
| void read_block_from_fd_into_rbuf(
 | |
|     int fd, 
 | |
|     BLOCKNUM blocknum,
 | |
|     FT ft,
 | |
|     struct rbuf *rb
 | |
|     ) 
 | |
| {
 | |
|     // get the file offset and block size for the block
 | |
|     DISKOFF offset, size;
 | |
|     ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size);
 | |
|     DISKOFF size_aligned = roundup_to_multiple(512, size);
 | |
|     uint8_t *XMALLOC_N_ALIGNED(512, size_aligned, raw_block);
 | |
|     rbuf_init(rb, raw_block, size);
 | |
|     // read the block
 | |
|     ssize_t rlen = toku_os_pread(fd, raw_block, size_aligned, offset);
 | |
|     assert((DISKOFF)rlen >= size);
 | |
|     assert((DISKOFF)rlen <= size_aligned);
 | |
| }
 | |
| 
 | |
| static const int read_header_heuristic_max = 32*1024;
 | |
| 
 | |
| #ifndef MIN
 | |
| #define MIN(a,b) (((a)>(b)) ? (b) : (a))
 | |
| #endif
 | |
| 
 | |
| // Effect: If the header part of the node is small enough, then read it into the rbuf.  The rbuf will be allocated to be big enough in any case.
 | |
| static void read_ftnode_header_from_fd_into_rbuf_if_small_enough(int fd, BLOCKNUM blocknum,
 | |
|                                                                  FT ft, struct rbuf *rb,
 | |
|                                                                  ftnode_fetch_extra *bfe) {
 | |
|     DISKOFF offset, size;
 | |
|     ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size);
 | |
|     DISKOFF read_size = roundup_to_multiple(512, MIN(read_header_heuristic_max, size));
 | |
|     uint8_t *XMALLOC_N_ALIGNED(512, roundup_to_multiple(512, size), raw_block);
 | |
|     rbuf_init(rb, raw_block, read_size);
 | |
| 
 | |
|     // read the block
 | |
|     tokutime_t t0 = toku_time_now();
 | |
|     ssize_t rlen = toku_os_pread(fd, raw_block, read_size, offset);
 | |
|     tokutime_t t1 = toku_time_now();
 | |
| 
 | |
|     assert(rlen >= 0);
 | |
|     rbuf_init(rb, raw_block, rlen);
 | |
| 
 | |
|     bfe->bytes_read = rlen;
 | |
|     bfe->io_time = t1 - t0;
 | |
|     toku_ft_status_update_pivot_fetch_reason(bfe);
 | |
| }
 | |
| 
 | |
| //
 | |
| // read the compressed partition into the sub_block,
 | |
| // validate the checksum of the compressed data
 | |
| //
 | |
| int
 | |
| read_compressed_sub_block(struct rbuf *rb, struct sub_block *sb)
 | |
| {
 | |
|     int r = 0;
 | |
|     sb->compressed_size = rbuf_int(rb);
 | |
|     sb->uncompressed_size = rbuf_int(rb);
 | |
|     const void **cp = (const void **) &sb->compressed_ptr;
 | |
|     rbuf_literal_bytes(rb, cp, sb->compressed_size);
 | |
|     sb->xsum = rbuf_int(rb);
 | |
|     // let's check the checksum
 | |
|     uint32_t actual_xsum = toku_x1764_memory((char *)sb->compressed_ptr-8, 8+sb->compressed_size);
 | |
|     if (sb->xsum != actual_xsum) {
 | |
|         r = TOKUDB_BAD_CHECKSUM;
 | |
|     }
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| static int
 | |
| read_and_decompress_sub_block(struct rbuf *rb, struct sub_block *sb)
 | |
| {
 | |
|     int r = 0;
 | |
|     r = read_compressed_sub_block(rb, sb);
 | |
|     if (r != 0) {
 | |
|         goto exit;
 | |
|     }
 | |
| 
 | |
|     just_decompress_sub_block(sb);
 | |
| exit:
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| // Allocates space for the sub-block and de-compresses the data from
 | |
| // the supplied compressed pointer..
 | |
| void
 | |
| just_decompress_sub_block(struct sub_block *sb)
 | |
| {
 | |
|     // <CER> TODO: Add assert that the subblock was read in.
 | |
|     sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size);
 | |
| 
 | |
|     toku_decompress(
 | |
|         (Bytef *) sb->uncompressed_ptr,
 | |
|         sb->uncompressed_size,
 | |
|         (Bytef *) sb->compressed_ptr,
 | |
|         sb->compressed_size
 | |
|         );
 | |
| }
 | |
| 
 | |
| // verify the checksum
 | |
| int verify_ftnode_sub_block(struct sub_block *sb,
 | |
|                             const char *fname,
 | |
|                             BLOCKNUM blocknum) {
 | |
|     int r = 0;
 | |
|     // first verify the checksum
 | |
|     uint32_t data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end
 | |
|     uint32_t stored_xsum = toku_dtoh32(*((uint32_t *)((char *)sb->uncompressed_ptr + data_size)));
 | |
|     uint32_t actual_xsum = toku_x1764_memory(sb->uncompressed_ptr, data_size);
 | |
|     if (stored_xsum != actual_xsum) {
 | |
|         fprintf(
 | |
|             stderr,
 | |
|             "%s:%d:verify_ftnode_sub_block - "
 | |
|             "file[%s], blocknum[%lld], stored_xsum[%u] != actual_xsum[%u]\n",
 | |
|             __FILE__,
 | |
|             __LINE__,
 | |
|             fname ? fname : "unknown",
 | |
|             (longlong)blocknum.b,
 | |
|             stored_xsum,
 | |
|             actual_xsum);
 | |
|         dump_bad_block((Bytef *) sb->uncompressed_ptr, sb->uncompressed_size);
 | |
|         r = TOKUDB_BAD_CHECKSUM;
 | |
|     }
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| // This function deserializes the data stored by serialize_ftnode_info
 | |
| static int deserialize_ftnode_info(struct sub_block *sb, FTNODE node) {
 | |
| 
 | |
|     // sb_node_info->uncompressed_ptr stores the serialized node information
 | |
|     // this function puts that information into node
 | |
| 
 | |
|     // first verify the checksum
 | |
|     int r = 0;
 | |
|     const char *fname = toku_ftnode_get_cachefile_fname_in_env(node);
 | |
|     r = verify_ftnode_sub_block(sb, fname, node->blocknum);
 | |
|     if (r != 0) {
 | |
|         fprintf(
 | |
|             stderr,
 | |
|             "%s:%d:deserialize_ftnode_info - "
 | |
|             "file[%s], blocknum[%lld], verify_ftnode_sub_block failed with %d\n",
 | |
|             __FILE__,
 | |
|             __LINE__,
 | |
|             fname ? fname : "unknown",
 | |
|             (longlong)node->blocknum.b,
 | |
|             r);
 | |
|         dump_bad_block(static_cast<unsigned char *>(sb->uncompressed_ptr),
 | |
|                        sb->uncompressed_size);
 | |
|         goto exit;
 | |
|     }
 | |
| 
 | |
|     uint32_t data_size;
 | |
|     data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end
 | |
| 
 | |
|     // now with the data verified, we can read the information into the node
 | |
|     struct rbuf rb;
 | |
|     rbuf_init(&rb, (unsigned char *) sb->uncompressed_ptr, data_size);
 | |
| 
 | |
|     node->max_msn_applied_to_node_on_disk = rbuf_MSN(&rb);
 | |
|     (void)rbuf_int(&rb);
 | |
|     node->flags = rbuf_int(&rb);
 | |
|     node->height = rbuf_int(&rb);
 | |
|     if (node->layout_version_read_from_disk < FT_LAYOUT_VERSION_19) {
 | |
|         (void) rbuf_int(&rb); // optimized_for_upgrade
 | |
|     }
 | |
|     if (node->layout_version_read_from_disk >= FT_LAYOUT_VERSION_22) {
 | |
|         rbuf_TXNID(&rb, &node->oldest_referenced_xid_known);
 | |
|     }
 | |
| 
 | |
|     // now create the basement nodes or childinfos, depending on whether this is a
 | |
|     // leaf node or internal node
 | |
|     // now the subtree_estimates
 | |
| 
 | |
|     // n_children is now in the header, nd the allocatio of the node->bp is in deserialize_ftnode_from_rbuf.
 | |
| 
 | |
|     // now the pivots
 | |
|     if (node->n_children > 1) {
 | |
|         node->pivotkeys.deserialize_from_rbuf(&rb, node->n_children - 1);
 | |
|     } else {
 | |
|         node->pivotkeys.create_empty();
 | |
|     }
 | |
| 
 | |
|     // if this is an internal node, unpack the block nums, and fill in necessary fields
 | |
|     // of childinfo
 | |
|     if (node->height > 0) {
 | |
|         for (int i = 0; i < node->n_children; i++) {
 | |
|             BP_BLOCKNUM(node,i) = rbuf_blocknum(&rb);
 | |
|             BP_WORKDONE(node, i) = 0;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     // make sure that all the data was read
 | |
|     if (data_size != rb.ndone) {
 | |
|         fprintf(
 | |
|             stderr,
 | |
|             "%s:%d:deserialize_ftnode_info - "
 | |
|             "file[%s], blocknum[%lld], data_size[%d] != rb.ndone[%d]\n",
 | |
|             __FILE__,
 | |
|             __LINE__,
 | |
|             fname ? fname : "unknown",
 | |
|             (longlong)node->blocknum.b,
 | |
|             data_size,
 | |
|             rb.ndone);
 | |
|         dump_bad_block(rb.buf, rb.size);
 | |
|         abort();
 | |
|     }
 | |
| exit:
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| static void
 | |
| setup_available_ftnode_partition(FTNODE node, int i) {
 | |
|     if (node->height == 0) {
 | |
|         set_BLB(node, i, toku_create_empty_bn());
 | |
|         BLB_MAX_MSN_APPLIED(node,i) = node->max_msn_applied_to_node_on_disk;
 | |
|     }
 | |
|     else {
 | |
|         set_BNC(node, i, toku_create_empty_nl());
 | |
|     }
 | |
| }
 | |
| 
 | |
| // Assign the child_to_read member of the bfe from the given ftnode
 | |
| // that has been brought into memory.
 | |
| static void
 | |
| update_bfe_using_ftnode(FTNODE node, ftnode_fetch_extra *bfe)
 | |
| {
 | |
|     if (bfe->type == ftnode_fetch_subset && bfe->search != NULL) {
 | |
|         // we do not take into account prefetching yet
 | |
|         // as of now, if we need a subset, the only thing
 | |
|         // we can possibly require is a single basement node
 | |
|         // we find out what basement node the query cares about
 | |
|         // and check if it is available
 | |
|         bfe->child_to_read = toku_ft_search_which_child(
 | |
|             bfe->ft->cmp,
 | |
|             node,
 | |
|             bfe->search
 | |
|             );
 | |
|     } else if (bfe->type == ftnode_fetch_keymatch) {
 | |
|         // we do not take into account prefetching yet
 | |
|         // as of now, if we need a subset, the only thing
 | |
|         // we can possibly require is a single basement node
 | |
|         // we find out what basement node the query cares about
 | |
|         // and check if it is available
 | |
|         if (node->height == 0) {
 | |
|             int left_child = bfe->leftmost_child_wanted(node);
 | |
|             int right_child = bfe->rightmost_child_wanted(node);
 | |
|             if (left_child == right_child) {
 | |
|                 bfe->child_to_read = left_child;
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| // Using the search parameters in the bfe, this function will
 | |
| // initialize all of the given ftnode's partitions.
 | |
| static void
 | |
| setup_partitions_using_bfe(FTNODE node,
 | |
|                            ftnode_fetch_extra *bfe,
 | |
|                            bool data_in_memory)
 | |
| {
 | |
|     // Leftmost and Rightmost Child bounds.
 | |
|     int lc, rc;
 | |
|     if (bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_prefetch) {
 | |
|         lc = bfe->leftmost_child_wanted(node);
 | |
|         rc = bfe->rightmost_child_wanted(node);
 | |
|     } else {
 | |
|         lc = -1;
 | |
|         rc = -1;
 | |
|     }
 | |
| 
 | |
|     //
 | |
|     // setup memory needed for the node
 | |
|     //
 | |
|     //printf("node height %d, blocknum %" PRId64 ", type %d lc %d rc %d\n", node->height, node->blocknum.b, bfe->type, lc, rc);
 | |
|     for (int i = 0; i < node->n_children; i++) {
 | |
|         BP_INIT_UNTOUCHED_CLOCK(node,i);
 | |
|         if (data_in_memory) {
 | |
|             BP_STATE(node, i) = ((bfe->wants_child_available(i) || (lc <= i && i <= rc))
 | |
|                                  ? PT_AVAIL : PT_COMPRESSED);
 | |
|         } else {
 | |
|             BP_STATE(node, i) = PT_ON_DISK;
 | |
|         }
 | |
|         BP_WORKDONE(node,i) = 0;
 | |
| 
 | |
|         switch (BP_STATE(node,i)) {
 | |
|         case PT_AVAIL:
 | |
|             setup_available_ftnode_partition(node, i);
 | |
|             BP_TOUCH_CLOCK(node,i);
 | |
|             break;
 | |
|         case PT_COMPRESSED:
 | |
|             set_BSB(node, i, sub_block_creat());
 | |
|             break;
 | |
|         case PT_ON_DISK:
 | |
|             set_BNULL(node, i);
 | |
|             break;
 | |
|         case PT_INVALID:
 | |
|             abort();
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| static void setup_ftnode_partitions(FTNODE node, ftnode_fetch_extra *bfe, bool data_in_memory)
 | |
| // Effect: Used when reading a ftnode into main memory, this sets up the partitions.
 | |
| //   We set bfe->child_to_read as well as the BP_STATE and the data pointers (e.g., with set_BSB or set_BNULL or other set_ operations).
 | |
| // Arguments:  Node: the node to set up.
 | |
| //             bfe:  Describes the key range needed.
 | |
| //             data_in_memory: true if we have all the data (in which case we set the BP_STATE to be either PT_AVAIL or PT_COMPRESSED depending on the bfe.
 | |
| //                             false if we don't have the partitions in main memory (in which case we set the state to PT_ON_DISK.
 | |
| {
 | |
|     // Set bfe->child_to_read.
 | |
|     update_bfe_using_ftnode(node, bfe);
 | |
| 
 | |
|     // Setup the partitions.
 | |
|     setup_partitions_using_bfe(node, bfe, data_in_memory);
 | |
| }
 | |
| 
 | |
| /* deserialize the partition from the sub-block's uncompressed buffer
 | |
|  * and destroy the uncompressed buffer
 | |
|  */
 | |
| static int deserialize_ftnode_partition(
 | |
|     struct sub_block *sb,
 | |
|     FTNODE node,
 | |
|     int childnum,  // which partition to deserialize
 | |
|     const toku::comparator &cmp) {
 | |
| 
 | |
|     int r = 0;
 | |
|     const char *fname = toku_ftnode_get_cachefile_fname_in_env(node);
 | |
|     r = verify_ftnode_sub_block(sb, fname, node->blocknum);
 | |
|     if (r != 0) {
 | |
|         fprintf(stderr,
 | |
|                 "%s:%d:deserialize_ftnode_partition - "
 | |
|                 "file[%s], blocknum[%lld], "
 | |
|                 "verify_ftnode_sub_block failed with %d\n",
 | |
|                 __FILE__,
 | |
|                 __LINE__,
 | |
|                 fname ? fname : "unknown",
 | |
|                 (longlong)node->blocknum.b,
 | |
|                 r);
 | |
|         goto exit;
 | |
|     }
 | |
|     uint32_t data_size;
 | |
|     data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end
 | |
| 
 | |
|     // now with the data verified, we can read the information into the node
 | |
|     struct rbuf rb;
 | |
|     rbuf_init(&rb, (unsigned char *) sb->uncompressed_ptr, data_size);
 | |
|     unsigned char ch;
 | |
|     ch = rbuf_char(&rb);
 | |
| 
 | |
|     if (node->height > 0) {
 | |
|         if (ch != FTNODE_PARTITION_MSG_BUFFER) {
 | |
|             fprintf(stderr,
 | |
|                     "%s:%d:deserialize_ftnode_partition - "
 | |
|                     "file[%s], blocknum[%lld], ch[%d] != "
 | |
|                     "FTNODE_PARTITION_MSG_BUFFER[%d]\n",
 | |
|                     __FILE__,
 | |
|                     __LINE__,
 | |
|                     fname ? fname : "unknown",
 | |
|                     (longlong)node->blocknum.b,
 | |
|                     ch,
 | |
|                     FTNODE_PARTITION_MSG_BUFFER);
 | |
|             dump_bad_block(rb.buf, rb.size);
 | |
|             assert(ch == FTNODE_PARTITION_MSG_BUFFER);
 | |
|         }
 | |
|         NONLEAF_CHILDINFO bnc = BNC(node, childnum);
 | |
|         if (node->layout_version_read_from_disk <= FT_LAYOUT_VERSION_26) {
 | |
|             // Layout version <= 26 did not serialize sorted message trees to disk.
 | |
|             deserialize_child_buffer_v26(bnc, &rb, cmp);
 | |
|         } else {
 | |
|             deserialize_child_buffer(bnc, &rb);
 | |
|         }
 | |
|         BP_WORKDONE(node, childnum) = 0;
 | |
|     } else {
 | |
|         if (ch != FTNODE_PARTITION_DMT_LEAVES) {
 | |
|             fprintf(stderr,
 | |
|                     "%s:%d:deserialize_ftnode_partition - "
 | |
|                     "file[%s], blocknum[%lld], ch[%d] != "
 | |
|                     "FTNODE_PARTITION_DMT_LEAVES[%d]\n",
 | |
|                     __FILE__,
 | |
|                     __LINE__,
 | |
|                     fname ? fname : "unknown",
 | |
|                     (longlong)node->blocknum.b,
 | |
|                     ch,
 | |
|                     FTNODE_PARTITION_DMT_LEAVES);
 | |
|             dump_bad_block(rb.buf, rb.size);
 | |
|             assert(ch == FTNODE_PARTITION_DMT_LEAVES);
 | |
|         }
 | |
| 
 | |
|         BLB_SEQINSERT(node, childnum) = 0;
 | |
|         uint32_t num_entries = rbuf_int(&rb);
 | |
|         // we are now at the first byte of first leafentry
 | |
|         data_size -= rb.ndone; // remaining bytes of leafentry data
 | |
| 
 | |
|         BASEMENTNODE bn = BLB(node, childnum);
 | |
|         bn->data_buffer.deserialize_from_rbuf(
 | |
|             num_entries, &rb, data_size, node->layout_version_read_from_disk);
 | |
|     }
 | |
|     if (rb.ndone != rb.size) {
 | |
|         fprintf(stderr,
 | |
|                 "%s:%d:deserialize_ftnode_partition - "
 | |
|                 "file[%s], blocknum[%lld], rb.ndone[%d] != rb.size[%d]\n",
 | |
|                 __FILE__,
 | |
|                 __LINE__,
 | |
|                 fname ? fname : "unknown",
 | |
|                 (longlong)node->blocknum.b,
 | |
|                 rb.ndone,
 | |
|                 rb.size);
 | |
|         dump_bad_block(rb.buf, rb.size);
 | |
|         assert(rb.ndone == rb.size);
 | |
|     }
 | |
| 
 | |
| exit:
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| static int decompress_and_deserialize_worker(struct rbuf curr_rbuf,
 | |
|                                              struct sub_block curr_sb,
 | |
|                                              FTNODE node,
 | |
|                                              int child,
 | |
|                                              const toku::comparator &cmp,
 | |
|                                              tokutime_t *decompress_time) {
 | |
|     int r = 0;
 | |
|     tokutime_t t0 = toku_time_now();
 | |
|     r = read_and_decompress_sub_block(&curr_rbuf, &curr_sb);
 | |
|     if (r != 0) {
 | |
|         const char *fname = toku_ftnode_get_cachefile_fname_in_env(node);
 | |
|         fprintf(stderr,
 | |
|                 "%s:%d:decompress_and_deserialize_worker - "
 | |
|                 "file[%s], blocknum[%lld], read_and_decompress_sub_block failed "
 | |
|                 "with %d\n",
 | |
|                 __FILE__,
 | |
|                 __LINE__,
 | |
|                 fname ? fname : "unknown",
 | |
|                 (longlong)node->blocknum.b,
 | |
|                 r);
 | |
|         dump_bad_block(curr_rbuf.buf, curr_rbuf.size);
 | |
|         goto exit;
 | |
|     }
 | |
|     *decompress_time = toku_time_now() - t0;
 | |
|     // at this point, sb->uncompressed_ptr stores the serialized node partition
 | |
|     r = deserialize_ftnode_partition(&curr_sb, node, child, cmp);
 | |
|     if (r != 0) {
 | |
|         const char *fname = toku_ftnode_get_cachefile_fname_in_env(node);
 | |
|         fprintf(stderr,
 | |
|                 "%s:%d:decompress_and_deserialize_worker - "
 | |
|                 "file[%s], blocknum[%lld], deserialize_ftnode_partition failed "
 | |
|                 "with %d\n",
 | |
|                 __FILE__,
 | |
|                 __LINE__,
 | |
|                 fname ? fname : "unknown",
 | |
|                 (longlong)node->blocknum.b,
 | |
|                 r);
 | |
|         dump_bad_block(curr_rbuf.buf, curr_rbuf.size);
 | |
|         goto exit;
 | |
|     }
 | |
| 
 | |
| exit:
 | |
|     toku_free(curr_sb.uncompressed_ptr);
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| static int check_and_copy_compressed_sub_block_worker(struct rbuf curr_rbuf,
 | |
|                                                       struct sub_block curr_sb,
 | |
|                                                       FTNODE node,
 | |
|                                                       int child) {
 | |
|     int r = 0;
 | |
|     r = read_compressed_sub_block(&curr_rbuf, &curr_sb);
 | |
|     if (r != 0) {
 | |
|         goto exit;
 | |
|     }
 | |
| 
 | |
|     SUB_BLOCK bp_sb;
 | |
|     bp_sb = BSB(node, child);
 | |
|     bp_sb->compressed_size = curr_sb.compressed_size;
 | |
|     bp_sb->uncompressed_size = curr_sb.uncompressed_size;
 | |
|     bp_sb->compressed_ptr = toku_xmalloc(bp_sb->compressed_size);
 | |
|     memcpy(
 | |
|         bp_sb->compressed_ptr, curr_sb.compressed_ptr, bp_sb->compressed_size);
 | |
| exit:
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| static FTNODE alloc_ftnode_for_deserialize(uint32_t fullhash, BLOCKNUM blocknum) {
 | |
| // Effect: Allocate an FTNODE and fill in the values that are not read from
 | |
|     FTNODE XMALLOC(node);
 | |
|     node->fullhash = fullhash;
 | |
|     node->blocknum = blocknum;
 | |
|     node->clear_dirty();
 | |
|     node->oldest_referenced_xid_known = TXNID_NONE;
 | |
|     node->bp = nullptr;
 | |
|     node->ct_pair = nullptr;
 | |
|     return node; 
 | |
| }
 | |
| 
 | |
| static int deserialize_ftnode_header_from_rbuf_if_small_enough(
 | |
|     FTNODE *ftnode,
 | |
|     FTNODE_DISK_DATA *ndd,
 | |
|     BLOCKNUM blocknum,
 | |
|     uint32_t fullhash,
 | |
|     ftnode_fetch_extra *bfe,
 | |
|     struct rbuf *rb,
 | |
|     int fd)
 | |
| // If we have enough information in the rbuf to construct a header, then do so.
 | |
| // Also fetch in the basement node if needed.
 | |
| // Return 0 if it worked.  If something goes wrong (including that we are
 | |
| // looking at some old data format that doesn't have partitions) then return
 | |
| // nonzero.
 | |
| {
 | |
|     int r = 0;
 | |
| 
 | |
|     tokutime_t t0, t1;
 | |
|     tokutime_t decompress_time = 0;
 | |
|     tokutime_t deserialize_time = 0;
 | |
|     // we must get the name from bfe and not through
 | |
|     // toku_ftnode_get_cachefile_fname_in_env as the node is not set up yet
 | |
|     const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
 | |
|     
 | |
|     t0 = toku_time_now();
 | |
| 
 | |
|     FTNODE node = alloc_ftnode_for_deserialize(fullhash, blocknum);
 | |
| 
 | |
|     if (rb->size < 24) {
 | |
|         fprintf(
 | |
|             stderr,
 | |
|             "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
 | |
|             "file[%s], blocknum[%lld], rb->size[%u] < 24\n",
 | |
|             __FILE__,
 | |
|             __LINE__,
 | |
|             fname ? fname : "unknown",
 | |
|             (longlong)blocknum.b,
 | |
|             rb->size);
 | |
|         dump_bad_block(rb->buf, rb->size);
 | |
|         // TODO: What error do we return here?
 | |
|         // Does it even matter?
 | |
|         r = toku_db_badformat();
 | |
|         goto cleanup;
 | |
|     }
 | |
| 
 | |
|     const void *magic;
 | |
|     rbuf_literal_bytes(rb, &magic, 8);
 | |
|     if (memcmp(magic, "tokuleaf", 8) != 0 &&
 | |
|         memcmp(magic, "tokunode", 8) != 0) {
 | |
|         fprintf(
 | |
|             stderr,
 | |
|             "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
 | |
|             "file[%s], blocknum[%lld], unrecognized magic number "
 | |
|             "%2.2x %2.2x %2.2x %2.2x   %2.2x %2.2x %2.2x %2.2x\n",
 | |
|             __FILE__,
 | |
|             __LINE__,
 | |
|             fname ? fname : "unknown",
 | |
|             (longlong)blocknum.b,
 | |
|             static_cast<const uint8_t*>(magic)[0],
 | |
|             static_cast<const uint8_t*>(magic)[1],
 | |
|             static_cast<const uint8_t*>(magic)[2],
 | |
|             static_cast<const uint8_t*>(magic)[3],
 | |
|             static_cast<const uint8_t*>(magic)[4],
 | |
|             static_cast<const uint8_t*>(magic)[5],
 | |
|             static_cast<const uint8_t*>(magic)[6],
 | |
|             static_cast<const uint8_t*>(magic)[7]);
 | |
|         dump_bad_block(rb->buf, rb->size);
 | |
|         r = toku_db_badformat();        
 | |
|         goto cleanup;
 | |
|     }
 | |
| 
 | |
|     node->layout_version_read_from_disk = rbuf_int(rb);
 | |
|     if (node->layout_version_read_from_disk <
 | |
|         FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES) {
 | |
|         fprintf(
 | |
|             stderr,
 | |
|             "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
 | |
|             "file[%s], blocknum[%lld], node->layout_version_read_from_disk[%d] "
 | |
|             "< FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES[%d]\n",
 | |
|             __FILE__,
 | |
|             __LINE__,
 | |
|             fname ? fname : "unknown",
 | |
|             (longlong)blocknum.b,
 | |
|             node->layout_version_read_from_disk,
 | |
|             FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES);
 | |
|         dump_bad_block(rb->buf, rb->size);
 | |
|         // This code path doesn't have to worry about upgrade.
 | |
|         r = toku_db_badformat();
 | |
|         goto cleanup;
 | |
|     }
 | |
| 
 | |
|     // If we get here, we know the node is at least
 | |
|     // FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES.  We haven't changed
 | |
|     // the serialization format since then (this comment is correct as of
 | |
|     // version 20, which is Deadshot) so we can go ahead and say the
 | |
|     // layout version is current (it will be as soon as we finish
 | |
|     // deserializing).
 | |
|     // TODO(leif): remove node->layout_version (#5174)
 | |
|     node->layout_version = FT_LAYOUT_VERSION;
 | |
| 
 | |
|     node->layout_version_original = rbuf_int(rb);
 | |
|     node->build_id = rbuf_int(rb);
 | |
|     node->n_children = rbuf_int(rb);
 | |
|     // Guaranteed to be have been able to read up to here.  If n_children
 | |
|     // is too big, we may have a problem, so check that we won't overflow
 | |
|     // while reading the partition locations.
 | |
|     unsigned int nhsize;
 | |
|     // we can do this because n_children is filled in.
 | |
|     nhsize = serialize_node_header_size(node);
 | |
|     unsigned int needed_size;
 | |
|     // we need 12 more so that we can read the compressed block size information
 | |
|     // that follows for the nodeinfo.
 | |
|     needed_size = nhsize + 12;
 | |
|     if (needed_size > rb->size) {
 | |
|         fprintf(
 | |
|             stderr,
 | |
|             "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
 | |
|             "file[%s], blocknum[%lld], needed_size[%d] > rb->size[%d]\n",
 | |
|             __FILE__,
 | |
|             __LINE__,
 | |
|             fname ? fname : "unknown",
 | |
|             (longlong)blocknum.b,
 | |
|             needed_size,
 | |
|             rb->size);
 | |
|         dump_bad_block(rb->buf, rb->size);
 | |
|         r = toku_db_badformat();
 | |
|         goto cleanup;
 | |
|     }
 | |
| 
 | |
|     XMALLOC_N(node->n_children, node->bp);
 | |
|     XMALLOC_N(node->n_children, *ndd);
 | |
|     // read the partition locations
 | |
|     for (int i=0; i<node->n_children; i++) {
 | |
|         BP_START(*ndd,i) = rbuf_int(rb);
 | |
|         BP_SIZE (*ndd,i) = rbuf_int(rb);
 | |
|     }
 | |
| 
 | |
|     uint32_t checksum;
 | |
|     checksum = toku_x1764_memory(rb->buf, rb->ndone);
 | |
|     uint32_t stored_checksum;
 | |
|     stored_checksum = rbuf_int(rb);
 | |
|     if (stored_checksum != checksum) {
 | |
|         fprintf(
 | |
|             stderr,
 | |
|             "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
 | |
|             "file[%s], blocknum[%lld], stored_checksum[%d] != checksum[%d]\n",
 | |
|             __FILE__,
 | |
|             __LINE__,
 | |
|             fname ? fname : "unknown",
 | |
|             (longlong)blocknum.b,
 | |
|             stored_checksum,
 | |
|             checksum);
 | |
|         dump_bad_block(rb->buf, rb->size);
 | |
|         r = TOKUDB_BAD_CHECKSUM;
 | |
|         goto cleanup;
 | |
|     }
 | |
| 
 | |
|     // Now we want to read the pivot information.
 | |
|     struct sub_block sb_node_info;
 | |
|     sub_block_init(&sb_node_info);
 | |
|     // we'll be able to read these because we checked the size earlier.
 | |
|     sb_node_info.compressed_size = rbuf_int(rb);
 | |
|     sb_node_info.uncompressed_size = rbuf_int(rb);
 | |
|     if (rb->size - rb->ndone < sb_node_info.compressed_size + 8) {
 | |
|         fprintf(
 | |
|             stderr,
 | |
|             "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
 | |
|             "file[%s], blocknum[%lld], rb->size[%d] - rb->ndone[%d] < "
 | |
|             "sb_node_info.compressed_size[%d] + 8\n",
 | |
|             __FILE__,
 | |
|             __LINE__,
 | |
|             fname ? fname : "unknown",
 | |
|             (longlong)blocknum.b,
 | |
|             rb->size,
 | |
|             rb->ndone,
 | |
|             sb_node_info.compressed_size);
 | |
|         dump_bad_block(rb->buf, rb->size);
 | |
|         r = toku_db_badformat();
 | |
|         goto cleanup;
 | |
|     }
 | |
| 
 | |
|     // Finish reading compressed the sub_block
 | |
|     const void **cp;
 | |
|     cp = (const void **) &sb_node_info.compressed_ptr;
 | |
|     rbuf_literal_bytes(rb, cp, sb_node_info.compressed_size);
 | |
|     sb_node_info.xsum = rbuf_int(rb);
 | |
|     // let's check the checksum
 | |
|     uint32_t actual_xsum;
 | |
|     actual_xsum = toku_x1764_memory((char *)sb_node_info.compressed_ptr - 8,
 | |
|                                     8 + sb_node_info.compressed_size);
 | |
|     if (sb_node_info.xsum != actual_xsum) {
 | |
|         fprintf(
 | |
|             stderr,
 | |
|             "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
 | |
|             "file[%s], blocknum[%lld], sb_node_info.xsum[%d] != actual_xsum[%d]\n",
 | |
|             __FILE__,
 | |
|             __LINE__,
 | |
|             fname ? fname : "unknown",
 | |
|             (longlong)blocknum.b,
 | |
|             sb_node_info.xsum,
 | |
|             actual_xsum);
 | |
|         dump_bad_block(rb->buf, rb->size);
 | |
|         r = TOKUDB_BAD_CHECKSUM;
 | |
|         goto cleanup;
 | |
|     }
 | |
| 
 | |
|     // Now decompress the subblock
 | |
|     {
 | |
|         toku::scoped_malloc sb_node_info_buf(sb_node_info.uncompressed_size);
 | |
|         sb_node_info.uncompressed_ptr = sb_node_info_buf.get();
 | |
|         tokutime_t decompress_t0 = toku_time_now();
 | |
|         toku_decompress((Bytef *)sb_node_info.uncompressed_ptr,
 | |
|                         sb_node_info.uncompressed_size,
 | |
|                         (Bytef *)sb_node_info.compressed_ptr,
 | |
|                         sb_node_info.compressed_size);
 | |
|         tokutime_t decompress_t1 = toku_time_now();
 | |
|         decompress_time = decompress_t1 - decompress_t0;
 | |
| 
 | |
|         // at this point sb->uncompressed_ptr stores the serialized node info.
 | |
|         r = deserialize_ftnode_info(&sb_node_info, node);
 | |
|         if (r != 0) {
 | |
|             fprintf(
 | |
|                 stderr,
 | |
|                 "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
 | |
|                 "file[%s], blocknum[%lld], deserialize_ftnode_info failed with "
 | |
|                 "%d\n",
 | |
|                 __FILE__,
 | |
|                 __LINE__,
 | |
|                 fname ? fname : "unknown",
 | |
|                 (longlong)blocknum.b,
 | |
|                 r);
 | |
|             dump_bad_block(
 | |
|                 static_cast<unsigned char *>(sb_node_info.uncompressed_ptr),
 | |
|                 sb_node_info.uncompressed_size);
 | |
|             dump_bad_block(rb->buf, rb->size);
 | |
|             goto cleanup;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     // Now we have the ftnode_info.  We have a bunch more stuff in the
 | |
|     // rbuf, so we might be able to store the compressed data for some
 | |
|     // objects.
 | |
|     // We can proceed to deserialize the individual subblocks.
 | |
| 
 | |
|     // setup the memory of the partitions
 | |
|     // for partitions being decompressed, create either message buffer or basement node
 | |
|     // for partitions staying compressed, create sub_block
 | |
|     setup_ftnode_partitions(node, bfe, false);
 | |
| 
 | |
|     // We must capture deserialize and decompression time before
 | |
|     // the pf_callback, otherwise we would double-count.
 | |
|     t1 = toku_time_now();
 | |
|     deserialize_time = (t1 - t0) - decompress_time;
 | |
| 
 | |
|     // do partial fetch if necessary
 | |
|     if (bfe->type != ftnode_fetch_none) {
 | |
|         PAIR_ATTR attr;
 | |
|         r = toku_ftnode_pf_callback(node, *ndd, bfe, fd, &attr);
 | |
|         if (r != 0) {
 | |
|             fprintf(
 | |
|                 stderr,
 | |
|                 "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - "
 | |
|                 "file[%s], blocknum[%lld], toku_ftnode_pf_callback failed with "
 | |
|                 "%d\n",
 | |
|                 __FILE__,
 | |
|                 __LINE__,
 | |
|                 fname ? fname : "unknown",
 | |
|                 (longlong)blocknum.b,
 | |
|                 r);
 | |
|             dump_bad_block(rb->buf, rb->size);
 | |
|             goto cleanup;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     // handle clock
 | |
|     for (int i = 0; i < node->n_children; i++) {
 | |
|         if (bfe->wants_child_available(i)) {
 | |
|             paranoid_invariant(BP_STATE(node,i) == PT_AVAIL);
 | |
|             BP_TOUCH_CLOCK(node,i);
 | |
|         }
 | |
|     }
 | |
|     *ftnode = node;
 | |
|     r = 0;
 | |
| 
 | |
| cleanup:
 | |
|     if (r == 0) {
 | |
|         bfe->deserialize_time += deserialize_time;
 | |
|         bfe->decompress_time += decompress_time;
 | |
|         toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time);
 | |
|     }
 | |
|     if (r != 0) {
 | |
|         if (node) {
 | |
|             toku_free(*ndd);
 | |
|             toku_free(node->bp);
 | |
|             toku_free(node);
 | |
|         }
 | |
|     }
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| // This function takes a deserialized version 13 or 14 buffer and
 | |
| // constructs the associated internal, non-leaf ftnode object.  It
 | |
| // also creates MSN's for older messages created in older versions
 | |
| // that did not generate MSN's for messages.  These new MSN's are
 | |
| // generated from the root downwards, counting backwards from MIN_MSN
 | |
| // and persisted in the ft header.
 | |
| static int deserialize_and_upgrade_internal_node(FTNODE node,
 | |
|                                                  struct rbuf *rb,
 | |
|                                                  ftnode_fetch_extra *bfe,
 | |
|                                                  STAT64INFO info) {
 | |
|     int version = node->layout_version_read_from_disk;
 | |
| 
 | |
|     if (version == FT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT) {
 | |
|         (void) rbuf_int(rb);                          // 10. fingerprint
 | |
|     }
 | |
| 
 | |
|     node->n_children = rbuf_int(rb);                  // 11. n_children
 | |
| 
 | |
|     // Sub-tree esitmates...
 | |
|     for (int i = 0; i < node->n_children; ++i) {
 | |
|         if (version == FT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT) {
 | |
|             (void) rbuf_int(rb);                      // 12. fingerprint
 | |
|         }
 | |
|         uint64_t nkeys = rbuf_ulonglong(rb);          // 13. nkeys
 | |
|         uint64_t ndata = rbuf_ulonglong(rb);          // 14. ndata
 | |
|         uint64_t dsize = rbuf_ulonglong(rb);          // 15. dsize
 | |
|         (void) rbuf_char(rb);                         // 16. exact (char)
 | |
|         invariant(nkeys == ndata);
 | |
|         if (info) {
 | |
|             // info is non-null if we're trying to upgrade old subtree
 | |
|             // estimates to stat64info
 | |
|             info->numrows += nkeys;
 | |
|             info->numbytes += dsize;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     // Pivot keys
 | |
|     node->pivotkeys.deserialize_from_rbuf(rb, node->n_children - 1);
 | |
| 
 | |
|     // Create space for the child node buffers (a.k.a. partitions).
 | |
|     XMALLOC_N(node->n_children, node->bp);
 | |
| 
 | |
|     // Set the child blocknums.
 | |
|     for (int i = 0; i < node->n_children; ++i) {
 | |
|         BP_BLOCKNUM(node, i) = rbuf_blocknum(rb);    // 18. blocknums
 | |
|         BP_WORKDONE(node, i) = 0;
 | |
|     }
 | |
| 
 | |
|     // Read in the child buffer maps.
 | |
|     for (int i = 0; i < node->n_children; ++i) {
 | |
|         // The following fields were previously used by the `sub_block_map'
 | |
|         // They include:
 | |
|         // - 4 byte index
 | |
|         (void) rbuf_int(rb);
 | |
|         // - 4 byte offset
 | |
|         (void) rbuf_int(rb);
 | |
|         // - 4 byte size
 | |
|         (void) rbuf_int(rb);
 | |
|     }
 | |
| 
 | |
|     // We need to setup this node's partitions, but we can't call the
 | |
|     // existing call (setup_ftnode_paritions.) because there are
 | |
|     // existing optimizations that would prevent us from bringing all
 | |
|     // of this node's partitions into memory.  Instead, We use the
 | |
|     // existing bfe and node to set the bfe's child_to_search member.
 | |
|     // Then we create a temporary bfe that needs all the nodes to make
 | |
|     // sure we properly intitialize our partitions before filling them
 | |
|     // in from our soon-to-be-upgraded node.
 | |
|     update_bfe_using_ftnode(node, bfe);
 | |
|     ftnode_fetch_extra temp_bfe;
 | |
|     temp_bfe.create_for_full_read(nullptr);
 | |
|     setup_partitions_using_bfe(node, &temp_bfe, true);
 | |
| 
 | |
|     // Cache the highest MSN generated for the message buffers.  This
 | |
|     // will be set in the ftnode.
 | |
|     //
 | |
|     // The way we choose MSNs for upgraded messages is delicate.  The
 | |
|     // field `highest_unused_msn_for_upgrade' in the header is always an
 | |
|     // MSN that no message has yet.  So when we have N messages that need
 | |
|     // MSNs, we decrement it by N, and then use it and the N-1 MSNs less
 | |
|     // than it, but we do not use the value we decremented it to.
 | |
|     //
 | |
|     // In the code below, we initialize `lowest' with the value of
 | |
|     // `highest_unused_msn_for_upgrade' after it is decremented, so we
 | |
|     // need to be sure to increment it once before we enqueue our first
 | |
|     // message.
 | |
|     MSN highest_msn;
 | |
|     highest_msn.msn = 0;
 | |
| 
 | |
|     // Deserialize de-compressed buffers.
 | |
|     for (int i = 0; i < node->n_children; ++i) {
 | |
|         NONLEAF_CHILDINFO bnc = BNC(node, i);
 | |
|         MSN highest_msn_in_this_buffer = deserialize_child_buffer_v13(bfe->ft, bnc, rb);
 | |
|         if (highest_msn.msn == 0) {
 | |
|             highest_msn.msn = highest_msn_in_this_buffer.msn;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     // Assign the highest msn from our upgrade message buffers
 | |
|     node->max_msn_applied_to_node_on_disk = highest_msn;
 | |
|     // Since we assigned MSNs to this node's messages, we need to dirty it.
 | |
|     node->set_dirty();
 | |
| 
 | |
|     // Must compute the checksum now (rather than at the end, while we
 | |
|     // still have the pointer to the buffer).
 | |
|     if (version >= FT_FIRST_LAYOUT_VERSION_WITH_END_TO_END_CHECKSUM) {
 | |
|         uint32_t expected_xsum = toku_dtoh32(*(uint32_t*)(rb->buf+rb->size-4)); // 27. checksum
 | |
|         uint32_t actual_xsum   = toku_x1764_memory(rb->buf, rb->size-4);
 | |
|         if (expected_xsum != actual_xsum) {
 | |
|             fprintf(stderr, "%s:%d: Bad checksum: expected = %" PRIx32 ", actual= %" PRIx32 "\n",
 | |
|                     __FUNCTION__,
 | |
|                     __LINE__,
 | |
|                     expected_xsum,
 | |
|                     actual_xsum);
 | |
|             fprintf(stderr,
 | |
|                     "Checksum failure while reading node in file %s.\n",
 | |
|                     toku_cachefile_fname_in_env(bfe->ft->cf));
 | |
|             fflush(stderr);
 | |
|             return toku_db_badformat();
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     return 0;
 | |
| }
 | |
| 
 | |
| // This function takes a deserialized version 13 or 14 buffer and
 | |
| // constructs the associated leaf ftnode object.
 | |
| static int
 | |
| deserialize_and_upgrade_leaf_node(FTNODE node,
 | |
|                                   struct rbuf *rb,
 | |
|                                   ftnode_fetch_extra *bfe,
 | |
|                                   STAT64INFO info)
 | |
| {
 | |
|     int r = 0;
 | |
|     int version = node->layout_version_read_from_disk;
 | |
| 
 | |
|     // This is a leaf node, so the offsets in the buffer will be
 | |
|     // different from the internal node offsets above.
 | |
|     uint64_t nkeys = rbuf_ulonglong(rb);                // 10. nkeys
 | |
|     uint64_t ndata = rbuf_ulonglong(rb);                // 11. ndata
 | |
|     uint64_t dsize = rbuf_ulonglong(rb);                // 12. dsize
 | |
|     invariant(nkeys == ndata);
 | |
|     if (info) {
 | |
|         // info is non-null if we're trying to upgrade old subtree
 | |
|         // estimates to stat64info
 | |
|         info->numrows += nkeys;
 | |
|         info->numbytes += dsize;
 | |
|     }
 | |
| 
 | |
|     // This is the optimized for upgrade field.
 | |
|     if (version == FT_LAYOUT_VERSION_14) {
 | |
|         (void) rbuf_int(rb);                            // 13. optimized
 | |
|     }
 | |
| 
 | |
|     // npartitions - This is really the number of leaf entries in
 | |
|     // our single basement node.  There should only be 1 (ONE)
 | |
|     // partition, so there shouldn't be any pivot key stored.  This
 | |
|     // means the loop will not iterate.  We could remove the loop and
 | |
|     // assert that the value is indeed 1.
 | |
|     int npartitions = rbuf_int(rb);                     // 14. npartitions
 | |
|     assert(npartitions == 1);
 | |
| 
 | |
|     // Set number of children to 1, since we will only have one
 | |
|     // basement node.
 | |
|     node->n_children = 1;
 | |
|     XMALLOC_N(node->n_children, node->bp);
 | |
|     node->pivotkeys.create_empty();
 | |
| 
 | |
|     // Create one basement node to contain all the leaf entries by
 | |
|     // setting up the single partition and updating the bfe.
 | |
|     update_bfe_using_ftnode(node, bfe);
 | |
|     ftnode_fetch_extra temp_bfe;
 | |
|     temp_bfe.create_for_full_read(bfe->ft);
 | |
|     setup_partitions_using_bfe(node, &temp_bfe, true);
 | |
| 
 | |
|     // 11. Deserialize the partition maps, though they are not used in the
 | |
|     // newer versions of ftnodes.
 | |
|     for (int i = 0; i < node->n_children; ++i) {
 | |
|         // The following fields were previously used by the `sub_block_map'
 | |
|         // They include:
 | |
|         // - 4 byte index
 | |
|         (void) rbuf_int(rb);
 | |
|         // - 4 byte offset
 | |
|         (void) rbuf_int(rb);
 | |
|         // - 4 byte size
 | |
|         (void) rbuf_int(rb);
 | |
|     }
 | |
| 
 | |
|     // Copy all of the leaf entries into the single basement node.
 | |
| 
 | |
|     // The number of leaf entries in buffer.
 | |
|     int n_in_buf = rbuf_int(rb);                        // 15. # of leaves
 | |
|     BLB_SEQINSERT(node,0) = 0;
 | |
|     BASEMENTNODE bn = BLB(node, 0);
 | |
| 
 | |
|     // Read the leaf entries from the buffer, advancing the buffer
 | |
|     // as we go.
 | |
|     bool has_end_to_end_checksum = (version >= FT_FIRST_LAYOUT_VERSION_WITH_END_TO_END_CHECKSUM);
 | |
|     if (version <= FT_LAYOUT_VERSION_13) {
 | |
|         // Create our mempool.
 | |
|         // Loop through
 | |
|         for (int i = 0; i < n_in_buf; ++i) {
 | |
|             LEAFENTRY_13 le = reinterpret_cast<LEAFENTRY_13>(&rb->buf[rb->ndone]);
 | |
|             uint32_t disksize = leafentry_disksize_13(le);
 | |
|             rb->ndone += disksize;                       // 16. leaf entry (13)
 | |
|             invariant(rb->ndone<=rb->size);
 | |
|             LEAFENTRY new_le;
 | |
|             size_t new_le_size;
 | |
|             void* key = NULL;
 | |
|             uint32_t keylen = 0;
 | |
|             r = toku_le_upgrade_13_14(le,
 | |
|                                       &key,
 | |
|                                       &keylen,
 | |
|                                       &new_le_size,
 | |
|                                       &new_le);
 | |
|             assert_zero(r);
 | |
|             // Copy the pointer value straight into the OMT
 | |
|             LEAFENTRY new_le_in_bn = nullptr;
 | |
|             void *maybe_free;
 | |
|             bn->data_buffer.get_space_for_insert(
 | |
|                 i,
 | |
|                 key,
 | |
|                 keylen,
 | |
|                 new_le_size,
 | |
|                 &new_le_in_bn,
 | |
|                 &maybe_free
 | |
|                 );
 | |
|             if (maybe_free) {
 | |
|                 toku_free(maybe_free);
 | |
|             }
 | |
|             memcpy(new_le_in_bn, new_le, new_le_size);
 | |
|             toku_free(new_le);
 | |
|         }
 | |
|     } else {
 | |
|         uint32_t data_size = rb->size - rb->ndone;
 | |
|         if (has_end_to_end_checksum) {
 | |
|             data_size -= sizeof(uint32_t);
 | |
|         }
 | |
|         bn->data_buffer.deserialize_from_rbuf(n_in_buf, rb, data_size, node->layout_version_read_from_disk);
 | |
|     }
 | |
| 
 | |
|     // Whatever this is must be less than the MSNs of every message above
 | |
|     // it, so it's ok to take it here.
 | |
|     bn->max_msn_applied = bfe->ft->h->highest_unused_msn_for_upgrade;
 | |
|     bn->stale_ancestor_messages_applied = false;
 | |
|     node->max_msn_applied_to_node_on_disk = bn->max_msn_applied;
 | |
| 
 | |
|     // Checksum (end to end) is only on version 14
 | |
|     if (has_end_to_end_checksum) {
 | |
|         uint32_t expected_xsum = rbuf_int(rb);             // 17. checksum 
 | |
|         uint32_t actual_xsum = toku_x1764_memory(rb->buf, rb->size - 4);
 | |
|         if (expected_xsum != actual_xsum) {
 | |
|             fprintf(stderr, "%s:%d: Bad checksum: expected = %" PRIx32 ", actual= %" PRIx32 "\n",
 | |
|                     __FUNCTION__,
 | |
|                     __LINE__,
 | |
|                     expected_xsum,
 | |
|                     actual_xsum);
 | |
|             fprintf(stderr,
 | |
|                     "Checksum failure while reading node in file %s.\n",
 | |
|                     toku_cachefile_fname_in_env(bfe->ft->cf));
 | |
|             fflush(stderr);
 | |
|             return toku_db_badformat();
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     // We should have read the whole block by this point.
 | |
|     if (rb->ndone != rb->size) {
 | |
|         // TODO: Error handling.
 | |
|         return 1;
 | |
|     }
 | |
| 
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| static int read_and_decompress_block_from_fd_into_rbuf(
 | |
|     int fd,
 | |
|     BLOCKNUM blocknum,
 | |
|     DISKOFF offset,
 | |
|     DISKOFF size,
 | |
|     FT ft,
 | |
|     struct rbuf *rb,
 | |
|     /* out */ int *layout_version_p);
 | |
| 
 | |
| // This function upgrades a version 14 or 13 ftnode to the current
 | |
| // version. NOTE: This code assumes the first field of the rbuf has
 | |
| // already been read from the buffer (namely the layout_version of the
 | |
| // ftnode.)
 | |
| static int deserialize_and_upgrade_ftnode(FTNODE node,
 | |
|                                           FTNODE_DISK_DATA *ndd,
 | |
|                                           BLOCKNUM blocknum,
 | |
|                                           ftnode_fetch_extra *bfe,
 | |
|                                           STAT64INFO info,
 | |
|                                           int fd) {
 | |
|     int r = 0;
 | |
|     int version;
 | |
| 
 | |
|     // I. First we need to de-compress the entire node, only then can
 | |
|     // we read the different sub-sections.
 | |
|     // get the file offset and block size for the block
 | |
|     DISKOFF offset, size;
 | |
|     bfe->ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size);
 | |
| 
 | |
|     struct rbuf rb;
 | |
|     r = read_and_decompress_block_from_fd_into_rbuf(fd,
 | |
|                                                     blocknum,
 | |
|                                                     offset,
 | |
|                                                     size,
 | |
|                                                     bfe->ft,
 | |
|                                                     &rb,
 | |
|                                                     &version);
 | |
|     if (r != 0) {
 | |
|         const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
 | |
|         fprintf(stderr,
 | |
|                 "%s:%d:deserialize_and_upgrade_ftnode - "
 | |
|                 "file[%s], blocknum[%lld], "
 | |
|                 "read_and_decompress_block_from_fd_into_rbuf failed with %d\n",
 | |
|                 __FILE__,
 | |
|                 __LINE__,
 | |
|                 fname ? fname : "unknown",
 | |
|                 (longlong)blocknum.b,
 | |
|                 r);
 | |
|         goto exit;
 | |
|     }
 | |
| 
 | |
|     // Re-read the magic field from the previous call, since we are
 | |
|     // restarting with a fresh rbuf.
 | |
|     {
 | |
|         const void *magic;
 | |
|         rbuf_literal_bytes(&rb, &magic, 8);              // 1. magic
 | |
|     }
 | |
| 
 | |
|     // II. Start reading ftnode fields out of the decompressed buffer.
 | |
| 
 | |
|     // Copy over old version info.
 | |
|     node->layout_version_read_from_disk = rbuf_int(&rb); // 2. layout version
 | |
|     version = node->layout_version_read_from_disk;
 | |
|     if (version > FT_LAYOUT_VERSION_14) {
 | |
|         const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
 | |
|         fprintf(stderr,
 | |
|                 "%s:%d:deserialize_and_upgrade_ftnode - "
 | |
|                 "file[%s], blocknum[%lld], version[%d] > "
 | |
|                 "FT_LAYOUT_VERSION_14[%d]\n",
 | |
|                 __FILE__,
 | |
|                 __LINE__,
 | |
|                 fname ? fname : "unknown",
 | |
|                 (longlong)blocknum.b,
 | |
|                 version,
 | |
|                 FT_LAYOUT_VERSION_14);
 | |
|         dump_bad_block(rb.buf, rb.size);
 | |
|         goto exit;
 | |
|     }
 | |
|     assert(version <= FT_LAYOUT_VERSION_14);
 | |
|     // Upgrade the current version number to the current version.
 | |
|     node->layout_version = FT_LAYOUT_VERSION;
 | |
| 
 | |
|     node->layout_version_original = rbuf_int(&rb);      // 3. original layout
 | |
|     node->build_id = rbuf_int(&rb);                     // 4. build id
 | |
| 
 | |
|     // The remaining offsets into the rbuf do not map to the current
 | |
|     // version, so we need to fill in the blanks and ignore older
 | |
|     // fields.
 | |
|     (void)rbuf_int(&rb);                                // 5. nodesize
 | |
|     node->flags = rbuf_int(&rb);                        // 6. flags
 | |
|     node->height = rbuf_int(&rb);                       // 7. height
 | |
| 
 | |
|     // If the version is less than 14, there are two extra ints here.
 | |
|     // we would need to ignore them if they are there.
 | |
|     // These are the 'fingerprints'.
 | |
|     if (version == FT_LAYOUT_VERSION_13) {
 | |
|         (void) rbuf_int(&rb);                           // 8. rand4
 | |
|         (void) rbuf_int(&rb);                           // 9. local
 | |
|     }
 | |
| 
 | |
|     // The next offsets are dependent on whether this is a leaf node
 | |
|     // or not.
 | |
| 
 | |
|     // III. Read in Leaf and Internal Node specific data.
 | |
| 
 | |
|     // Check height to determine whether this is a leaf node or not.
 | |
|     if (node->height > 0) {
 | |
|         r = deserialize_and_upgrade_internal_node(node, &rb, bfe, info);
 | |
|     } else {
 | |
|         r = deserialize_and_upgrade_leaf_node(node, &rb, bfe, info);
 | |
|     }
 | |
| 
 | |
|     XMALLOC_N(node->n_children, *ndd);
 | |
|     // Initialize the partition locations to zero, because version 14
 | |
|     // and below have no notion of partitions on disk.
 | |
|     for (int i=0; i<node->n_children; i++) {
 | |
|         BP_START(*ndd,i) = 0;
 | |
|         BP_SIZE (*ndd,i) = 0;
 | |
|     }
 | |
| 
 | |
|     toku_free(rb.buf);
 | |
| exit:
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| // Effect: deserializes a ftnode that is in rb (with pointer of rb just past the
 | |
| // magic) into a FTNODE.
 | |
| static int deserialize_ftnode_from_rbuf(FTNODE *ftnode,
 | |
|                                         FTNODE_DISK_DATA *ndd,
 | |
|                                         BLOCKNUM blocknum,
 | |
|                                         uint32_t fullhash,
 | |
|                                         ftnode_fetch_extra *bfe,
 | |
|                                         STAT64INFO info,
 | |
|                                         struct rbuf *rb,
 | |
|                                         int fd) {
 | |
|     int r = 0;
 | |
|     struct sub_block sb_node_info;
 | |
| 
 | |
|     tokutime_t t0, t1;
 | |
|     tokutime_t decompress_time = 0;
 | |
|     tokutime_t deserialize_time = 0;
 | |
|     const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
 | |
| 
 | |
|     t0 = toku_time_now();
 | |
| 
 | |
|     FTNODE node = alloc_ftnode_for_deserialize(fullhash, blocknum);
 | |
| 
 | |
|     // now start reading from rbuf
 | |
|     // first thing we do is read the header information
 | |
|     const void *magic;
 | |
|     rbuf_literal_bytes(rb, &magic, 8);
 | |
|     if (memcmp(magic, "tokuleaf", 8) != 0 &&
 | |
|         memcmp(magic, "tokunode", 8) != 0) {
 | |
|         fprintf(stderr,
 | |
|                 "%s:%d:deserialize_ftnode_from_rbuf - "
 | |
|                 "file[%s], blocknum[%lld], unrecognized magic number "
 | |
|                 "%2.2x %2.2x %2.2x %2.2x   %2.2x %2.2x %2.2x %2.2x\n",
 | |
|                 __FILE__,
 | |
|                 __LINE__,
 | |
|                 fname ? fname : "unknown",
 | |
|                 (longlong)blocknum.b,
 | |
|                 static_cast<const uint8_t *>(magic)[0],
 | |
|                 static_cast<const uint8_t *>(magic)[1],
 | |
|                 static_cast<const uint8_t *>(magic)[2],
 | |
|                 static_cast<const uint8_t *>(magic)[3],
 | |
|                 static_cast<const uint8_t *>(magic)[4],
 | |
|                 static_cast<const uint8_t *>(magic)[5],
 | |
|                 static_cast<const uint8_t *>(magic)[6],
 | |
|                 static_cast<const uint8_t *>(magic)[7]);
 | |
|         dump_bad_block(rb->buf, rb->size);
 | |
| 
 | |
|         r = toku_db_badformat();
 | |
|         goto cleanup;
 | |
|     }
 | |
| 
 | |
|     node->layout_version_read_from_disk = rbuf_int(rb);
 | |
|     lazy_assert(node->layout_version_read_from_disk >= FT_LAYOUT_MIN_SUPPORTED_VERSION);
 | |
| 
 | |
|     // Check if we are reading in an older node version.
 | |
|     if (node->layout_version_read_from_disk <= FT_LAYOUT_VERSION_14) {
 | |
|         int version = node->layout_version_read_from_disk;
 | |
|         // Perform the upgrade.
 | |
|         r = deserialize_and_upgrade_ftnode(node, ndd, blocknum, bfe, info, fd);
 | |
|         if (r != 0) {
 | |
|             fprintf(stderr,
 | |
|                     "%s:%d:deserialize_ftnode_from_rbuf - "
 | |
|                     "file[%s], blocknum[%lld], deserialize_and_upgrade_ftnode "
 | |
|                     "failed with %d\n",
 | |
|                     __FILE__,
 | |
|                     __LINE__,
 | |
|                     fname ? fname : "unknown",
 | |
|                     (longlong)blocknum.b,
 | |
|                     r);
 | |
|             dump_bad_block(rb->buf, rb->size);
 | |
|             goto cleanup;
 | |
|         }
 | |
| 
 | |
|         if (version <= FT_LAYOUT_VERSION_13) {
 | |
|             // deprecate 'TOKU_DB_VALCMP_BUILTIN'. just remove the flag
 | |
|             node->flags &= ~TOKU_DB_VALCMP_BUILTIN_13;
 | |
|         }
 | |
| 
 | |
|         // If everything is ok, just re-assign the ftnode and retrn.
 | |
|         *ftnode = node;
 | |
|         r = 0;
 | |
|         goto cleanup;
 | |
|     }
 | |
| 
 | |
|     // Upgrade versions after 14 to current.  This upgrade is trivial, it
 | |
|     // removes the optimized for upgrade field, which has already been
 | |
|     // removed in the deserialization code (see
 | |
|     // deserialize_ftnode_info()).
 | |
|     node->layout_version = FT_LAYOUT_VERSION;
 | |
|     node->layout_version_original = rbuf_int(rb);
 | |
|     node->build_id = rbuf_int(rb);
 | |
|     node->n_children = rbuf_int(rb);
 | |
|     XMALLOC_N(node->n_children, node->bp);
 | |
|     XMALLOC_N(node->n_children, *ndd);
 | |
|     // read the partition locations
 | |
|     for (int i=0; i<node->n_children; i++) {
 | |
|         BP_START(*ndd,i) = rbuf_int(rb);
 | |
|         BP_SIZE (*ndd,i) = rbuf_int(rb);
 | |
|     }
 | |
|     // verify checksum of header stored
 | |
|     uint32_t checksum;
 | |
|     checksum = toku_x1764_memory(rb->buf, rb->ndone);
 | |
|     uint32_t stored_checksum;
 | |
|     stored_checksum = rbuf_int(rb);
 | |
|     if (stored_checksum != checksum) {
 | |
|         fprintf(
 | |
|             stderr,
 | |
|             "%s:%d:deserialize_ftnode_from_rbuf - "
 | |
|             "file[%s], blocknum[%lld], stored_checksum[%d] != checksum[%d]\n",
 | |
|             __FILE__,
 | |
|             __LINE__,
 | |
|             fname ? fname : "unknown",
 | |
|             (longlong)blocknum.b,
 | |
|             stored_checksum,
 | |
|             checksum);
 | |
|         dump_bad_block(rb->buf, rb->size);
 | |
|         invariant(stored_checksum == checksum);
 | |
|     }
 | |
| 
 | |
|     // now we read and decompress the pivot and child information
 | |
|     sub_block_init(&sb_node_info);
 | |
|     {
 | |
|         tokutime_t sb_decompress_t0 = toku_time_now();
 | |
|         r = read_and_decompress_sub_block(rb, &sb_node_info);
 | |
|         tokutime_t sb_decompress_t1 = toku_time_now();
 | |
|         decompress_time += sb_decompress_t1 - sb_decompress_t0;
 | |
|         if (r != 0) {
 | |
|             fprintf(
 | |
|                 stderr,
 | |
|                 "%s:%d:deserialize_ftnode_from_rbuf - "
 | |
|                 "file[%s], blocknum[%lld], read_and_decompress_sub_block failed "
 | |
|                 "with %d\n",
 | |
|                 __FILE__,
 | |
|                 __LINE__,
 | |
|                 fname ? fname : "unknown",
 | |
|                 (longlong)blocknum.b,
 | |
|                 r);
 | |
|             dump_bad_block(
 | |
|                 static_cast<unsigned char *>(sb_node_info.uncompressed_ptr),
 | |
|                 sb_node_info.uncompressed_size);
 | |
|             dump_bad_block(rb->buf, rb->size);
 | |
|             goto cleanup;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     // at this point, sb->uncompressed_ptr stores the serialized node info
 | |
|     r = deserialize_ftnode_info(&sb_node_info, node);
 | |
|     if (r != 0) {
 | |
|         fprintf(
 | |
|             stderr,
 | |
|             "%s:%d:deserialize_ftnode_from_rbuf - "
 | |
|             "file[%s], blocknum[%lld], deserialize_ftnode_info failed with "
 | |
|             "%d\n",
 | |
|             __FILE__,
 | |
|             __LINE__,
 | |
|             fname ? fname : "unknown",
 | |
|             (longlong)blocknum.b,
 | |
|             r);
 | |
|         dump_bad_block(rb->buf, rb->size);
 | |
|         goto cleanup;
 | |
|     }
 | |
|     toku_free(sb_node_info.uncompressed_ptr);
 | |
| 
 | |
|     // now that the node info has been deserialized, we can proceed to
 | |
|     // deserialize the individual sub blocks
 | |
| 
 | |
|     // setup the memory of the partitions
 | |
|     // for partitions being decompressed, create either message buffer or
 | |
|     //   basement node
 | |
|     // for partitions staying compressed, create sub_block
 | |
|     setup_ftnode_partitions(node, bfe, true);
 | |
| 
 | |
|     // This loop is parallelizeable, since we don't have a dependency on the
 | |
|     // work done so far.
 | |
|     for (int i = 0; i < node->n_children; i++) {
 | |
|         uint32_t curr_offset = BP_START(*ndd, i);
 | |
|         uint32_t curr_size = BP_SIZE(*ndd, i);
 | |
|         // the compressed, serialized partitions start at where rb is currently
 | |
|         // pointing, which would be rb->buf + rb->ndone
 | |
|         // we need to intialize curr_rbuf to point to this place
 | |
|         struct rbuf curr_rbuf = {.buf = nullptr, .size = 0, .ndone = 0};
 | |
|         rbuf_init(&curr_rbuf, rb->buf + curr_offset, curr_size);
 | |
| 
 | |
|         //
 | |
|         // now we are at the point where we have:
 | |
|         //  - read the entire compressed node off of disk,
 | |
|         //  - decompressed the pivot and offset information,
 | |
|         //  - have arrived at the individual partitions.
 | |
|         //
 | |
|         // Based on the information in bfe, we want to decompress a subset of
 | |
|         // of the compressed partitions (also possibly none or possibly all)
 | |
|         // The partitions that we want to decompress and make available
 | |
|         // to the node, we do, the rest we simply copy in compressed
 | |
|         // form into the node, and set the state of the partition to
 | |
|         // PT_COMPRESSED
 | |
|         //
 | |
| 
 | |
|         struct sub_block curr_sb;
 | |
|         sub_block_init(&curr_sb);
 | |
| 
 | |
|         // curr_rbuf is passed by value to decompress_and_deserialize_worker,
 | |
|         // so there's no ugly race condition.
 | |
|         // This would be more obvious if curr_rbuf were an array.
 | |
| 
 | |
|         // deserialize_ftnode_info figures out what the state
 | |
|         // should be and sets up the memory so that we are ready to use it
 | |
| 
 | |
|         switch (BP_STATE(node, i)) {
 | |
|             case PT_AVAIL: {
 | |
|                 //  case where we read and decompress the partition
 | |
|                 tokutime_t partition_decompress_time;
 | |
|                 r = decompress_and_deserialize_worker(
 | |
|                     curr_rbuf,
 | |
|                     curr_sb,
 | |
|                     node,
 | |
|                     i,
 | |
|                     bfe->ft->cmp,
 | |
|                     &partition_decompress_time);
 | |
|                 decompress_time += partition_decompress_time;
 | |
|                 if (r != 0) {
 | |
|                     fprintf(
 | |
|                         stderr,
 | |
|                         "%s:%d:deserialize_ftnode_from_rbuf - "
 | |
|                         "file[%s], blocknum[%lld], childnum[%d], "
 | |
|                         "decompress_and_deserialize_worker failed with %d\n",
 | |
|                         __FILE__,
 | |
|                         __LINE__,
 | |
|                         fname ? fname : "unknown",
 | |
|                         (longlong)blocknum.b,
 | |
|                         i,
 | |
|                         r);
 | |
|                     dump_bad_block(rb->buf, rb->size);
 | |
|                     goto cleanup;
 | |
|                 }
 | |
|                 break;
 | |
|             }
 | |
|         case PT_COMPRESSED:
 | |
|             // case where we leave the partition in the compressed state
 | |
|             r = check_and_copy_compressed_sub_block_worker(curr_rbuf, curr_sb, node, i);
 | |
|             if (r != 0) {
 | |
|                 fprintf(
 | |
|                     stderr,
 | |
|                     "%s:%d:deserialize_ftnode_from_rbuf - "
 | |
|                     "file[%s], blocknum[%lld], childnum[%d], "
 | |
|                     "check_and_copy_compressed_sub_block_worker failed with "
 | |
|                     "%d\n",
 | |
|                     __FILE__,
 | |
|                     __LINE__,
 | |
|                     fname ? fname : "unknown",
 | |
|                     (longlong)blocknum.b,
 | |
|                     i,
 | |
|                     r);
 | |
|                 dump_bad_block(rb->buf, rb->size);
 | |
|                 goto cleanup;
 | |
|             }
 | |
|             break;
 | |
|         case PT_INVALID: // this is really bad
 | |
|         case PT_ON_DISK: // it's supposed to be in memory.
 | |
|             abort();
 | |
|         }
 | |
|     }
 | |
|     *ftnode = node;
 | |
|     r = 0;
 | |
| 
 | |
| cleanup:
 | |
|     if (r == 0) {
 | |
|         t1 = toku_time_now();
 | |
|         deserialize_time = (t1 - t0) - decompress_time;
 | |
|         bfe->deserialize_time += deserialize_time;
 | |
|         bfe->decompress_time += decompress_time; 
 | |
|         toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time);
 | |
|     }
 | |
|     if (r != 0) {
 | |
|         // NOTE: Right now, callers higher in the stack will assert on
 | |
|         // failure, so this is OK for production.  However, if we
 | |
|         // create tools that use this function to search for errors in
 | |
|         // the FT, then we will leak memory.
 | |
|         if (node) {
 | |
|             toku_free(node);
 | |
|         }
 | |
|     }
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| int
 | |
| toku_deserialize_bp_from_disk(FTNODE node, FTNODE_DISK_DATA ndd, int childnum, int fd, ftnode_fetch_extra *bfe) {
 | |
|     int r = 0;
 | |
|     assert(BP_STATE(node,childnum) == PT_ON_DISK);
 | |
|     assert(node->bp[childnum].ptr.tag == BCT_NULL);
 | |
|     
 | |
|     //
 | |
|     // setup the partition
 | |
|     //
 | |
|     setup_available_ftnode_partition(node, childnum);
 | |
|     BP_STATE(node,childnum) = PT_AVAIL;
 | |
|     
 | |
|     //
 | |
|     // read off disk and make available in memory
 | |
|     // 
 | |
|     // get the file offset and block size for the block
 | |
|     DISKOFF node_offset, total_node_disk_size;
 | |
|     bfe->ft->blocktable.translate_blocknum_to_offset_size(node->blocknum, &node_offset, &total_node_disk_size);
 | |
| 
 | |
|     uint32_t curr_offset = BP_START(ndd, childnum);
 | |
|     uint32_t curr_size = BP_SIZE (ndd, childnum);
 | |
| 
 | |
|     struct rbuf rb;
 | |
|     rbuf_init(&rb, nullptr, 0);
 | |
| 
 | |
|     uint32_t pad_at_beginning = (node_offset+curr_offset)%512;
 | |
|     uint32_t padded_size = roundup_to_multiple(512, pad_at_beginning + curr_size);
 | |
| 
 | |
|     toku::scoped_malloc_aligned raw_block_buf(padded_size, 512);
 | |
|     uint8_t *raw_block = reinterpret_cast<uint8_t *>(raw_block_buf.get());
 | |
|     rbuf_init(&rb, pad_at_beginning+raw_block, curr_size);
 | |
|     tokutime_t t0 = toku_time_now();
 | |
| 
 | |
|     // read the block
 | |
|     assert(0==((unsigned long long)raw_block)%512); // for O_DIRECT
 | |
|     assert(0==(padded_size)%512);
 | |
|     assert(0==(node_offset+curr_offset-pad_at_beginning)%512);
 | |
|     ssize_t rlen = toku_os_pread(fd, raw_block, padded_size, node_offset+curr_offset-pad_at_beginning);
 | |
|     assert((DISKOFF)rlen >= pad_at_beginning + curr_size); // we read in at least enough to get what we wanted
 | |
|     assert((DISKOFF)rlen <= padded_size);                  // we didn't read in too much.
 | |
| 
 | |
|     tokutime_t t1 = toku_time_now();
 | |
| 
 | |
|     // read sub block
 | |
|     struct sub_block curr_sb;
 | |
|     sub_block_init(&curr_sb);
 | |
|     r = read_compressed_sub_block(&rb, &curr_sb);
 | |
|     if (r != 0) {
 | |
|         return r;
 | |
|     }
 | |
|     invariant(curr_sb.compressed_ptr != NULL);
 | |
| 
 | |
|     // decompress
 | |
|     toku::scoped_malloc uncompressed_buf(curr_sb.uncompressed_size);
 | |
|     curr_sb.uncompressed_ptr = uncompressed_buf.get();
 | |
|     toku_decompress((Bytef *) curr_sb.uncompressed_ptr, curr_sb.uncompressed_size,
 | |
|                     (Bytef *) curr_sb.compressed_ptr, curr_sb.compressed_size);
 | |
| 
 | |
|     // deserialize
 | |
|     tokutime_t t2 = toku_time_now();
 | |
| 
 | |
|     r = deserialize_ftnode_partition(&curr_sb, node, childnum, bfe->ft->cmp);
 | |
| 
 | |
|     tokutime_t t3 = toku_time_now();
 | |
| 
 | |
|     // capture stats
 | |
|     tokutime_t io_time = t1 - t0;
 | |
|     tokutime_t decompress_time = t2 - t1;
 | |
|     tokutime_t deserialize_time = t3 - t2;
 | |
|     bfe->deserialize_time += deserialize_time;
 | |
|     bfe->decompress_time += decompress_time;
 | |
|     toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time);
 | |
| 
 | |
|     bfe->bytes_read = rlen;
 | |
|     bfe->io_time = io_time;
 | |
| 
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| // Take a ftnode partition that is in the compressed state, and make it avail
 | |
| int toku_deserialize_bp_from_compressed(FTNODE node,
 | |
|                                         int childnum,
 | |
|                                         ftnode_fetch_extra *bfe) {
 | |
| 
 | |
|     int r = 0;
 | |
|     assert(BP_STATE(node, childnum) == PT_COMPRESSED);
 | |
|     SUB_BLOCK curr_sb = BSB(node, childnum);
 | |
| 
 | |
|     toku::scoped_malloc uncompressed_buf(curr_sb->uncompressed_size);
 | |
|     assert(curr_sb->uncompressed_ptr == NULL);
 | |
|     curr_sb->uncompressed_ptr = uncompressed_buf.get();
 | |
| 
 | |
|     setup_available_ftnode_partition(node, childnum);
 | |
|     BP_STATE(node,childnum) = PT_AVAIL;
 | |
| 
 | |
|     // decompress the sub_block
 | |
|     tokutime_t t0 = toku_time_now();
 | |
| 
 | |
|     toku_decompress((Bytef *)curr_sb->uncompressed_ptr,
 | |
|                     curr_sb->uncompressed_size,
 | |
|                     (Bytef *)curr_sb->compressed_ptr,
 | |
|                     curr_sb->compressed_size);
 | |
| 
 | |
|     tokutime_t t1 = toku_time_now();
 | |
| 
 | |
|     r = deserialize_ftnode_partition(curr_sb, node, childnum, bfe->ft->cmp);
 | |
|     if (r != 0) {
 | |
|         const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
 | |
|         fprintf(stderr,
 | |
|                 "%s:%d:toku_deserialize_bp_from_compressed - "
 | |
|                 "file[%s], blocknum[%lld], "
 | |
|                 "deserialize_ftnode_partition failed with %d\n",
 | |
|                 __FILE__,
 | |
|                 __LINE__,
 | |
|                 fname ? fname : "unknown",
 | |
|                 (longlong)node->blocknum.b,
 | |
|                 r);
 | |
|         dump_bad_block(static_cast<unsigned char *>(curr_sb->compressed_ptr),
 | |
|                        curr_sb->compressed_size);
 | |
|         dump_bad_block(static_cast<unsigned char *>(curr_sb->uncompressed_ptr),
 | |
|                        curr_sb->uncompressed_size);
 | |
|     }
 | |
| 
 | |
|     tokutime_t t2 = toku_time_now();
 | |
| 
 | |
|     tokutime_t decompress_time = t1 - t0;
 | |
|     tokutime_t deserialize_time = t2 - t1;
 | |
|     bfe->deserialize_time += deserialize_time;
 | |
|     bfe->decompress_time += decompress_time;
 | |
|     toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time);
 | |
| 
 | |
|     toku_free(curr_sb->compressed_ptr);
 | |
|     toku_free(curr_sb);
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| static int deserialize_ftnode_from_fd(int fd,
 | |
|                                       BLOCKNUM blocknum,
 | |
|                                       uint32_t fullhash,
 | |
|                                       FTNODE *ftnode,
 | |
|                                       FTNODE_DISK_DATA *ndd,
 | |
|                                       ftnode_fetch_extra *bfe,
 | |
|                                       STAT64INFO info) {
 | |
|     struct rbuf rb = RBUF_INITIALIZER;
 | |
| 
 | |
|     tokutime_t t0 = toku_time_now();
 | |
|     read_block_from_fd_into_rbuf(fd, blocknum, bfe->ft, &rb);
 | |
|     tokutime_t t1 = toku_time_now();
 | |
| 
 | |
|     // Decompress and deserialize the ftnode. Time statistics
 | |
|     // are taken inside this function.
 | |
|     int r = deserialize_ftnode_from_rbuf(
 | |
|         ftnode, ndd, blocknum, fullhash, bfe, info, &rb, fd);
 | |
|     if (r != 0) {
 | |
|         const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf);
 | |
|         fprintf(
 | |
|             stderr,
 | |
|             "%s:%d:deserialize_ftnode_from_fd - "
 | |
|             "file[%s], blocknum[%lld], deserialize_ftnode_from_rbuf failed with "
 | |
|             "%d\n",
 | |
|             __FILE__,
 | |
|             __LINE__,
 | |
|             fname ? fname : "unknown",
 | |
|             (longlong)blocknum.b,
 | |
|             r);
 | |
|         dump_bad_block(rb.buf, rb.size);
 | |
|     }
 | |
| 
 | |
|     bfe->bytes_read = rb.size;
 | |
|     bfe->io_time = t1 - t0;
 | |
|     toku_free(rb.buf);
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| // Effect: Read a node in.  If possible, read just the header.
 | |
| //         Perform version upgrade if necessary.
 | |
| int toku_deserialize_ftnode_from(int fd,
 | |
|                                  BLOCKNUM blocknum,
 | |
|                                  uint32_t fullhash,
 | |
|                                  FTNODE *ftnode,
 | |
|                                  FTNODE_DISK_DATA *ndd,
 | |
|                                  ftnode_fetch_extra *bfe) {
 | |
|     int r = 0;
 | |
|     struct rbuf rb = RBUF_INITIALIZER;
 | |
| 
 | |
|     // each function below takes the appropriate io/decompression/deserialize
 | |
|     // statistics
 | |
| 
 | |
|     if (!bfe->read_all_partitions) {
 | |
|         read_ftnode_header_from_fd_into_rbuf_if_small_enough(
 | |
|             fd, blocknum, bfe->ft, &rb, bfe);
 | |
|         r = deserialize_ftnode_header_from_rbuf_if_small_enough(
 | |
|             ftnode, ndd, blocknum, fullhash, bfe, &rb, fd);
 | |
|     } else {
 | |
|         // force us to do it the old way
 | |
|         r = -1;
 | |
|     }
 | |
|     if (r != 0) {
 | |
|         // Something went wrong, go back to doing it the old way.
 | |
|         r = deserialize_ftnode_from_fd(
 | |
|             fd, blocknum, fullhash, ftnode, ndd, bfe, nullptr);
 | |
|     }
 | |
| 
 | |
|     toku_free(rb.buf);
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| void
 | |
| toku_verify_or_set_counts(FTNODE UU(node)) {
 | |
| }
 | |
| 
 | |
| int 
 | |
| toku_db_badformat(void) {
 | |
|     return DB_BADFORMAT;
 | |
| }
 | |
| 
 | |
| static size_t
 | |
| serialize_rollback_log_size(ROLLBACK_LOG_NODE log) {
 | |
|     size_t size = node_header_overhead //8 "tokuroll", 4 version, 4 version_original, 4 build_id
 | |
|                  +16 //TXNID_PAIR
 | |
|                  +8 //sequence
 | |
|                  +8 //blocknum
 | |
|                  +8 //previous (blocknum)
 | |
|                  +8 //resident_bytecount
 | |
|                  +8 //memarena size
 | |
|                  +log->rollentry_resident_bytecount;
 | |
|     return size;
 | |
| }
 | |
| 
 | |
| static void
 | |
| serialize_rollback_log_node_to_buf(ROLLBACK_LOG_NODE log, char *buf, size_t calculated_size, int UU(n_sub_blocks), struct sub_block UU(sub_block[])) {
 | |
|     struct wbuf wb;
 | |
|     wbuf_init(&wb, buf, calculated_size);
 | |
|     {   //Serialize rollback log to local wbuf
 | |
|         wbuf_nocrc_literal_bytes(&wb, "tokuroll", 8);
 | |
|         lazy_assert(log->layout_version == FT_LAYOUT_VERSION);
 | |
|         wbuf_nocrc_int(&wb, log->layout_version);
 | |
|         wbuf_nocrc_int(&wb, log->layout_version_original);
 | |
|         wbuf_nocrc_uint(&wb, BUILD_ID);
 | |
|         wbuf_nocrc_TXNID_PAIR(&wb, log->txnid);
 | |
|         wbuf_nocrc_ulonglong(&wb, log->sequence);
 | |
|         wbuf_nocrc_BLOCKNUM(&wb, log->blocknum);
 | |
|         wbuf_nocrc_BLOCKNUM(&wb, log->previous);
 | |
|         wbuf_nocrc_ulonglong(&wb, log->rollentry_resident_bytecount);
 | |
|         //Write down memarena size needed to restore
 | |
|         wbuf_nocrc_ulonglong(&wb, log->rollentry_arena.total_size_in_use());
 | |
| 
 | |
|         {
 | |
|             //Store rollback logs
 | |
|             struct roll_entry *item;
 | |
|             size_t done_before = wb.ndone;
 | |
|             for (item = log->newest_logentry; item; item = item->prev) {
 | |
|                 toku_logger_rollback_wbuf_nocrc_write(&wb, item);
 | |
|             }
 | |
|             lazy_assert(done_before + log->rollentry_resident_bytecount == wb.ndone);
 | |
|         }
 | |
|     }
 | |
|     lazy_assert(wb.ndone == wb.size);
 | |
|     lazy_assert(calculated_size==wb.ndone);
 | |
| }
 | |
| 
 | |
| static void
 | |
| serialize_uncompressed_block_to_memory(char * uncompressed_buf,
 | |
|                                        int n_sub_blocks,
 | |
|                                        struct sub_block sub_block[/*n_sub_blocks*/],
 | |
|                                        enum toku_compression_method method,
 | |
|                                /*out*/ size_t *n_bytes_to_write,
 | |
|                                /*out*/ char  **bytes_to_write)
 | |
| // Guarantees that the malloc'd BYTES_TO_WRITE is 512-byte aligned (so that O_DIRECT will work)
 | |
| {
 | |
|     // allocate space for the compressed uncompressed_buf
 | |
|     size_t compressed_len = get_sum_compressed_size_bound(n_sub_blocks, sub_block, method);
 | |
|     size_t sub_block_header_len = sub_block_header_size(n_sub_blocks);
 | |
|     size_t header_len = node_header_overhead + sub_block_header_len + sizeof (uint32_t); // node + sub_block + checksum
 | |
|     char *XMALLOC_N_ALIGNED(512, roundup_to_multiple(512, header_len + compressed_len), compressed_buf);
 | |
| 
 | |
|     // copy the header
 | |
|     memcpy(compressed_buf, uncompressed_buf, node_header_overhead);
 | |
|     if (0) printf("First 4 bytes before compressing data are %02x%02x%02x%02x\n",
 | |
|                   uncompressed_buf[node_header_overhead],   uncompressed_buf[node_header_overhead+1],
 | |
|                   uncompressed_buf[node_header_overhead+2], uncompressed_buf[node_header_overhead+3]);
 | |
| 
 | |
|     // compress all of the sub blocks
 | |
|     char *uncompressed_ptr = uncompressed_buf + node_header_overhead;
 | |
|     char *compressed_ptr = compressed_buf + header_len;
 | |
|     compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr, num_cores, ft_pool, method);
 | |
| 
 | |
|     //if (0) printf("Block %" PRId64 " Size before compressing %u, after compression %" PRIu64 "\n", blocknum.b, calculated_size-node_header_overhead, (uint64_t) compressed_len);
 | |
| 
 | |
|     // serialize the sub block header
 | |
|     uint32_t *ptr = (uint32_t *)(compressed_buf + node_header_overhead);
 | |
|     *ptr++ = toku_htod32(n_sub_blocks);
 | |
|     for (int i=0; i<n_sub_blocks; i++) {
 | |
|         ptr[0] = toku_htod32(sub_block[i].compressed_size);
 | |
|         ptr[1] = toku_htod32(sub_block[i].uncompressed_size);
 | |
|         ptr[2] = toku_htod32(sub_block[i].xsum);
 | |
|         ptr += 3;
 | |
|     }
 | |
| 
 | |
|     // compute the header checksum and serialize it
 | |
|     uint32_t header_length = (char *)ptr - (char *)compressed_buf;
 | |
|     uint32_t xsum = toku_x1764_memory(compressed_buf, header_length);
 | |
|     *ptr = toku_htod32(xsum);
 | |
| 
 | |
|     uint32_t padded_len = roundup_to_multiple(512, header_len + compressed_len);
 | |
|     // Zero out padding.
 | |
|     for (uint32_t i = header_len+compressed_len; i < padded_len; i++) {
 | |
|         compressed_buf[i] = 0;
 | |
|     }
 | |
|     *n_bytes_to_write = padded_len;
 | |
|     *bytes_to_write   = compressed_buf;
 | |
| }
 | |
| 
 | |
| void
 | |
| toku_serialize_rollback_log_to_memory_uncompressed(ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBACK_LOG_NODE serialized) {
 | |
|     // get the size of the serialized node
 | |
|     size_t calculated_size = serialize_rollback_log_size(log);
 | |
| 
 | |
|     serialized->len = calculated_size;
 | |
|     serialized->n_sub_blocks = 0;
 | |
|     // choose sub block parameters
 | |
|     int sub_block_size = 0;
 | |
|     size_t data_size = calculated_size - node_header_overhead;
 | |
|     choose_sub_block_size(data_size, max_sub_blocks, &sub_block_size, &serialized->n_sub_blocks);
 | |
|     lazy_assert(0 < serialized->n_sub_blocks && serialized->n_sub_blocks <= max_sub_blocks);
 | |
|     lazy_assert(sub_block_size > 0);
 | |
| 
 | |
|     // set the initial sub block size for all of the sub blocks
 | |
|     for (int i = 0; i < serialized->n_sub_blocks; i++) 
 | |
|         sub_block_init(&serialized->sub_block[i]);
 | |
|     set_all_sub_block_sizes(data_size, sub_block_size, serialized->n_sub_blocks, serialized->sub_block);
 | |
| 
 | |
|     // allocate space for the serialized node
 | |
|     XMALLOC_N(calculated_size, serialized->data);
 | |
|     // serialize the node into buf
 | |
|     serialize_rollback_log_node_to_buf(log, serialized->data, calculated_size, serialized->n_sub_blocks, serialized->sub_block);
 | |
|     serialized->blocknum = log->blocknum;
 | |
| }
 | |
| 
 | |
| int toku_serialize_rollback_log_to(int fd,
 | |
|                                    ROLLBACK_LOG_NODE log,
 | |
|                                    SERIALIZED_ROLLBACK_LOG_NODE serialized_log,
 | |
|                                    bool is_serialized,
 | |
|                                    FT ft,
 | |
|                                    bool for_checkpoint) {
 | |
|     size_t n_to_write;
 | |
|     char *compressed_buf;
 | |
|     struct serialized_rollback_log_node serialized_local;
 | |
| 
 | |
|     if (is_serialized) {
 | |
|         invariant_null(log);
 | |
|     } else {
 | |
|         invariant_null(serialized_log);
 | |
|         serialized_log = &serialized_local;
 | |
|         toku_serialize_rollback_log_to_memory_uncompressed(log, serialized_log);
 | |
|     }
 | |
| 
 | |
|     BLOCKNUM blocknum = serialized_log->blocknum;
 | |
|     invariant(blocknum.b >= 0);
 | |
| 
 | |
|     // Compress and malloc buffer to write
 | |
|     serialize_uncompressed_block_to_memory(serialized_log->data,
 | |
|                                            serialized_log->n_sub_blocks,
 | |
|                                            serialized_log->sub_block,
 | |
|                                            ft->h->compression_method,
 | |
|                                            &n_to_write,
 | |
|                                            &compressed_buf);
 | |
| 
 | |
|     // Dirties the ft
 | |
|     DISKOFF offset;
 | |
|     ft->blocktable.realloc_on_disk(
 | |
|         blocknum, n_to_write, &offset, ft, fd, for_checkpoint);
 | |
| 
 | |
|     toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset);
 | |
|     toku_free(compressed_buf);
 | |
|     if (!is_serialized) {
 | |
|         toku_static_serialized_rollback_log_destroy(&serialized_local);
 | |
|         log->dirty = false;  // See #1957.   Must set the node to be clean after
 | |
|                              // serializing it so that it doesn't get written again
 | |
|                              // on the next checkpoint or eviction.
 | |
|     }
 | |
|     return 0;
 | |
| }
 | |
| 
 | |
| static int
 | |
| deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, ROLLBACK_LOG_NODE *log_p, struct rbuf *rb) {
 | |
|     ROLLBACK_LOG_NODE MALLOC(result);
 | |
|     int r;
 | |
|     if (result==NULL) {
 | |
| 	r=get_error_errno();
 | |
| 	if (0) { died0: toku_free(result); }
 | |
| 	return r;
 | |
|     }
 | |
| 
 | |
|     const void *magic;
 | |
|     rbuf_literal_bytes(rb, &magic, 8);
 | |
|     lazy_assert(!memcmp(magic, "tokuroll", 8));
 | |
| 
 | |
|     result->layout_version    = rbuf_int(rb);
 | |
|     lazy_assert((FT_LAYOUT_VERSION_25 <= result->layout_version && result->layout_version <= FT_LAYOUT_VERSION_27) ||
 | |
|                 (result->layout_version == FT_LAYOUT_VERSION));
 | |
|     result->layout_version_original = rbuf_int(rb);
 | |
|     result->layout_version_read_from_disk = result->layout_version;
 | |
|     result->build_id = rbuf_int(rb);
 | |
|     result->dirty = false;
 | |
|     //TODO: Maybe add descriptor (or just descriptor version) here eventually?
 | |
|     //TODO: This is hard.. everything is shared in a single dictionary.
 | |
|     rbuf_TXNID_PAIR(rb, &result->txnid);
 | |
|     result->sequence = rbuf_ulonglong(rb);
 | |
|     result->blocknum = rbuf_blocknum(rb);
 | |
|     if (result->blocknum.b != blocknum.b) {
 | |
|         r = toku_db_badformat();
 | |
|         goto died0;
 | |
|     }
 | |
|     result->previous       = rbuf_blocknum(rb);
 | |
|     result->rollentry_resident_bytecount = rbuf_ulonglong(rb);
 | |
| 
 | |
|     size_t arena_initial_size = rbuf_ulonglong(rb);
 | |
|     result->rollentry_arena.create(arena_initial_size);
 | |
|     if (0) { died1: result->rollentry_arena.destroy(); goto died0; }
 | |
| 
 | |
|     //Load rollback entries
 | |
|     lazy_assert(rb->size > 4);
 | |
|     //Start with empty list
 | |
|     result->oldest_logentry = result->newest_logentry = NULL;
 | |
|     while (rb->ndone < rb->size) {
 | |
|         struct roll_entry *item;
 | |
|         uint32_t rollback_fsize = rbuf_int(rb); //Already read 4.  Rest is 4 smaller
 | |
|         const void *item_vec;
 | |
|         rbuf_literal_bytes(rb, &item_vec, rollback_fsize-4);
 | |
|         unsigned char* item_buf = (unsigned char*)item_vec;
 | |
|         r = toku_parse_rollback(item_buf, rollback_fsize-4, &item, &result->rollentry_arena);
 | |
|         if (r!=0) {
 | |
|             r = toku_db_badformat();
 | |
|             goto died1;
 | |
|         }
 | |
|         //Add to head of list
 | |
|         if (result->oldest_logentry) {
 | |
|             result->oldest_logentry->prev = item;
 | |
|             result->oldest_logentry       = item;
 | |
|             item->prev = NULL;
 | |
|         }
 | |
|         else {
 | |
|             result->oldest_logentry = result->newest_logentry = item;
 | |
|             item->prev = NULL;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     toku_free(rb->buf);
 | |
|     rb->buf = NULL;
 | |
|     *log_p = result;
 | |
|     return 0;
 | |
| }
 | |
| 
 | |
| static int
 | |
| deserialize_rollback_log_from_rbuf_versioned (uint32_t version, BLOCKNUM blocknum,
 | |
|                                               ROLLBACK_LOG_NODE *log,
 | |
|                                               struct rbuf *rb) {
 | |
|     int r = 0;
 | |
|     ROLLBACK_LOG_NODE rollback_log_node = NULL;
 | |
|     invariant((FT_LAYOUT_VERSION_25 <= version && version <= FT_LAYOUT_VERSION_27) || version == FT_LAYOUT_VERSION);
 | |
|     r = deserialize_rollback_log_from_rbuf(blocknum, &rollback_log_node, rb);
 | |
|     if (r==0) {
 | |
|         *log = rollback_log_node;
 | |
|     }
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| int
 | |
| decompress_from_raw_block_into_rbuf(uint8_t *raw_block, size_t raw_block_size, struct rbuf *rb, BLOCKNUM blocknum) {
 | |
|     int r = 0;
 | |
|     // get the number of compressed sub blocks
 | |
|     int n_sub_blocks;
 | |
|     n_sub_blocks = toku_dtoh32(*(uint32_t*)(&raw_block[node_header_overhead]));
 | |
| 
 | |
|     // verify the number of sub blocks
 | |
|     invariant(0 <= n_sub_blocks);
 | |
|     invariant(n_sub_blocks <= max_sub_blocks);
 | |
| 
 | |
|     { // verify the header checksum
 | |
|         uint32_t header_length = node_header_overhead + sub_block_header_size(n_sub_blocks);
 | |
|         invariant(header_length <= raw_block_size);
 | |
|         uint32_t xsum = toku_x1764_memory(raw_block, header_length);
 | |
|         uint32_t stored_xsum = toku_dtoh32(*(uint32_t *)(raw_block + header_length));
 | |
|         if (xsum != stored_xsum) {
 | |
|             r = TOKUDB_BAD_CHECKSUM;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     // deserialize the sub block header
 | |
|     struct sub_block sub_block[n_sub_blocks];
 | |
|     uint32_t *sub_block_header = (uint32_t *) &raw_block[node_header_overhead+4];
 | |
|     for (int i = 0; i < n_sub_blocks; i++) {
 | |
|         sub_block_init(&sub_block[i]);
 | |
|         sub_block[i].compressed_size = toku_dtoh32(sub_block_header[0]);
 | |
|         sub_block[i].uncompressed_size = toku_dtoh32(sub_block_header[1]);
 | |
|         sub_block[i].xsum = toku_dtoh32(sub_block_header[2]);
 | |
|         sub_block_header += 3;
 | |
|     }
 | |
| 
 | |
|     // This predicate needs to be here and instead of where it is set
 | |
|     // for the compiler.
 | |
|     if (r == TOKUDB_BAD_CHECKSUM) {
 | |
|         goto exit;
 | |
|     }
 | |
| 
 | |
|     // verify sub block sizes
 | |
|     for (int i = 0; i < n_sub_blocks; i++) {
 | |
|         uint32_t compressed_size = sub_block[i].compressed_size;
 | |
|         if (compressed_size<=0   || compressed_size>(1<<30)) { 
 | |
|             r = toku_db_badformat(); 
 | |
|             goto exit;
 | |
|         }
 | |
| 
 | |
|         uint32_t uncompressed_size = sub_block[i].uncompressed_size;
 | |
|         if (0) printf("Block %" PRId64 " Compressed size = %u, uncompressed size=%u\n", blocknum.b, compressed_size, uncompressed_size);
 | |
|         if (uncompressed_size<=0 || uncompressed_size>(1<<30)) { 
 | |
|             r = toku_db_badformat();
 | |
|             goto exit;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     // sum up the uncompressed size of the sub blocks
 | |
|     size_t uncompressed_size;
 | |
|     uncompressed_size = get_sum_uncompressed_size(n_sub_blocks, sub_block);
 | |
| 
 | |
|     // allocate the uncompressed buffer
 | |
|     size_t size;
 | |
|     size = node_header_overhead + uncompressed_size;
 | |
|     unsigned char *buf;
 | |
|     XMALLOC_N(size, buf);
 | |
|     rbuf_init(rb, buf, size);
 | |
| 
 | |
|     // copy the uncompressed node header to the uncompressed buffer
 | |
|     memcpy(rb->buf, raw_block, node_header_overhead);
 | |
| 
 | |
|     // point at the start of the compressed data (past the node header, the sub block header, and the header checksum)
 | |
|     unsigned char *compressed_data;
 | |
|     compressed_data = raw_block + node_header_overhead + sub_block_header_size(n_sub_blocks) + sizeof (uint32_t);
 | |
| 
 | |
|     // point at the start of the uncompressed data
 | |
|     unsigned char *uncompressed_data;
 | |
|     uncompressed_data = rb->buf + node_header_overhead;    
 | |
| 
 | |
|     // decompress all the compressed sub blocks into the uncompressed buffer
 | |
|     r = decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, num_cores, ft_pool);
 | |
|     if (r != 0) {
 | |
|         fprintf(stderr, "%s:%d block %" PRId64 " failed %d at %p size %zu\n", __FUNCTION__, __LINE__, blocknum.b, r, raw_block, raw_block_size);
 | |
|         dump_bad_block(raw_block, raw_block_size);
 | |
|         goto exit;
 | |
|     }
 | |
| 
 | |
|     rb->ndone=0;
 | |
| exit:
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| static int decompress_from_raw_block_into_rbuf_versioned(uint32_t version, uint8_t *raw_block, size_t raw_block_size, struct rbuf *rb, BLOCKNUM blocknum) {
 | |
|     // This function exists solely to accommodate future changes in compression.
 | |
|     int r = 0;
 | |
|     if ((version == FT_LAYOUT_VERSION_13 || version == FT_LAYOUT_VERSION_14) ||
 | |
|         (FT_LAYOUT_VERSION_25 <= version && version <= FT_LAYOUT_VERSION_27) ||
 | |
|         version == FT_LAYOUT_VERSION) {
 | |
|         r = decompress_from_raw_block_into_rbuf(raw_block, raw_block_size, rb, blocknum);
 | |
|     } else {
 | |
|         abort();
 | |
|     }
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| static int read_and_decompress_block_from_fd_into_rbuf(
 | |
|     int fd,
 | |
|     BLOCKNUM blocknum,
 | |
|     DISKOFF offset,
 | |
|     DISKOFF size,
 | |
|     FT ft,
 | |
|     struct rbuf *rb,
 | |
|     /* out */ int *layout_version_p) {
 | |
|     int r = 0;
 | |
|     if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b);
 | |
| 
 | |
|     DISKOFF size_aligned = roundup_to_multiple(512, size);
 | |
|     uint8_t *XMALLOC_N_ALIGNED(512, size_aligned, raw_block);
 | |
|     {
 | |
|         // read the (partially compressed) block
 | |
|         ssize_t rlen = toku_os_pread(fd, raw_block, size_aligned, offset);
 | |
|         lazy_assert((DISKOFF)rlen >= size);
 | |
|         lazy_assert((DISKOFF)rlen <= size_aligned);
 | |
|     }
 | |
|     // get the layout_version
 | |
|     int layout_version;
 | |
|     {
 | |
|         uint8_t *magic = raw_block + uncompressed_magic_offset;
 | |
|         if (memcmp(magic, "tokuleaf", 8)!=0 &&
 | |
|             memcmp(magic, "tokunode", 8)!=0 &&
 | |
|             memcmp(magic, "tokuroll", 8)!=0) {
 | |
|             r = toku_db_badformat();
 | |
|             goto cleanup;
 | |
|         }
 | |
|         uint8_t *version = raw_block + uncompressed_version_offset;
 | |
|         layout_version = toku_dtoh32(*(uint32_t*)version);
 | |
|         if (layout_version < FT_LAYOUT_MIN_SUPPORTED_VERSION || layout_version > FT_LAYOUT_VERSION) {
 | |
|             r = toku_db_badformat();
 | |
|             goto cleanup;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     r = decompress_from_raw_block_into_rbuf_versioned(layout_version, raw_block, size, rb, blocknum);
 | |
|     if (r != 0) {
 | |
|         // We either failed the checksome, or there is a bad format in
 | |
|         // the buffer.
 | |
|         if (r == TOKUDB_BAD_CHECKSUM) {
 | |
|             fprintf(stderr,
 | |
|                     "Checksum failure while reading raw block in file %s.\n",
 | |
|                     toku_cachefile_fname_in_env(ft->cf));
 | |
|             abort();
 | |
|         } else {
 | |
|             r = toku_db_badformat();
 | |
|             goto cleanup;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     *layout_version_p = layout_version;
 | |
| cleanup:
 | |
|     if (r!=0) {
 | |
|         if (rb->buf) toku_free(rb->buf);
 | |
|         rb->buf = NULL;
 | |
|     }
 | |
|     if (raw_block) {
 | |
|         toku_free(raw_block);
 | |
|     }
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| // Read rollback log node from file into struct.
 | |
| // Perform version upgrade if necessary.
 | |
| int toku_deserialize_rollback_log_from(int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *logp, FT ft) {
 | |
|     int layout_version = 0;
 | |
|     int r;
 | |
| 
 | |
|     struct rbuf rb;
 | |
|     rbuf_init(&rb, nullptr, 0);
 | |
| 
 | |
|     // get the file offset and block size for the block
 | |
|     DISKOFF offset, size;
 | |
|     ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size);
 | |
| 
 | |
|     // if the size is 0, then the blocknum is unused
 | |
|     if (size == 0) {
 | |
|         // blocknum is unused, just create an empty one and get out
 | |
|         ROLLBACK_LOG_NODE XMALLOC(log);
 | |
|         rollback_empty_log_init(log);
 | |
|         log->blocknum.b = blocknum.b;
 | |
|         r = 0;
 | |
|         *logp = log;
 | |
|         goto cleanup;
 | |
|     }
 | |
| 
 | |
|     r = read_and_decompress_block_from_fd_into_rbuf(fd, blocknum, offset, size, ft, &rb, &layout_version);
 | |
|     if (r!=0) goto cleanup;
 | |
| 
 | |
|     {
 | |
|         uint8_t *magic = rb.buf + uncompressed_magic_offset;
 | |
|         if (memcmp(magic, "tokuroll", 8)!=0) {
 | |
|             r = toku_db_badformat();
 | |
|             goto cleanup;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     r = deserialize_rollback_log_from_rbuf_versioned(layout_version, blocknum, logp, &rb);
 | |
| 
 | |
| cleanup:
 | |
|     if (rb.buf) {
 | |
|         toku_free(rb.buf);
 | |
|     }
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| int
 | |
| toku_upgrade_subtree_estimates_to_stat64info(int fd, FT ft)
 | |
| {
 | |
|     int r = 0;
 | |
|     // 15 was the last version with subtree estimates
 | |
|     invariant(ft->layout_version_read_from_disk <= FT_LAYOUT_VERSION_15);
 | |
| 
 | |
|     FTNODE unused_node = NULL;
 | |
|     FTNODE_DISK_DATA unused_ndd = NULL;
 | |
|     ftnode_fetch_extra bfe;
 | |
|     bfe.create_for_min_read(ft);
 | |
|     r = deserialize_ftnode_from_fd(fd, ft->h->root_blocknum, 0, &unused_node, &unused_ndd,
 | |
|                                    &bfe, &ft->h->on_disk_stats);
 | |
|     ft->in_memory_stats = ft->h->on_disk_stats;
 | |
| 
 | |
|     if (unused_node) {
 | |
|         toku_ftnode_free(&unused_node);
 | |
|     }
 | |
|     if (unused_ndd) {
 | |
|         toku_free(unused_ndd);
 | |
|     }
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| int
 | |
| toku_upgrade_msn_from_root_to_header(int fd, FT ft)
 | |
| {
 | |
|     int r;
 | |
|     // 21 was the first version with max_msn_in_ft in the header
 | |
|     invariant(ft->layout_version_read_from_disk <= FT_LAYOUT_VERSION_20);
 | |
| 
 | |
|     FTNODE node;
 | |
|     FTNODE_DISK_DATA ndd;
 | |
|     ftnode_fetch_extra bfe;
 | |
|     bfe.create_for_min_read(ft);
 | |
|     r = deserialize_ftnode_from_fd(fd, ft->h->root_blocknum, 0, &node, &ndd, &bfe, nullptr);
 | |
|     if (r != 0) {
 | |
|         goto exit;
 | |
|     }
 | |
| 
 | |
|     ft->h->max_msn_in_ft = node->max_msn_applied_to_node_on_disk;
 | |
|     toku_ftnode_free(&node);
 | |
|     toku_free(ndd);
 | |
|  exit:
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| #undef UPGRADE_STATUS_VALUE
 | 
