2013-04-17 00:00:59 -04:00
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
2013-04-17 00:00:35 -04:00
# ifndef FTLOADER_INTERNAL_H
# define FTLOADER_INTERNAL_H
2013-04-16 23:59:13 -04:00
# ident "$Id$"
2013-04-17 00:01:35 -04:00
/*
COPYING CONDITIONS NOTICE :
This program is free software ; you can redistribute it and / or modify
it under the terms of version 2 of the GNU General Public License as
published by the Free Software Foundation , and provided that the
following conditions are met :
* Redistributions of source code must retain this COPYING
CONDITIONS NOTICE , the COPYRIGHT NOTICE ( below ) , the
DISCLAIMER ( below ) , the UNIVERSITY PATENT NOTICE ( below ) , the
PATENT MARKING NOTICE ( below ) , and the PATENT RIGHTS
GRANT ( below ) .
* Redistributions in binary form must reproduce this COPYING
CONDITIONS NOTICE , the COPYRIGHT NOTICE ( below ) , the
DISCLAIMER ( below ) , the UNIVERSITY PATENT NOTICE ( below ) , the
PATENT MARKING NOTICE ( below ) , and the PATENT RIGHTS
GRANT ( below ) in the documentation and / or other materials
provided with the distribution .
You should have received a copy of the GNU General Public License
along with this program ; if not , write to the Free Software
Foundation , Inc . , 51 Franklin Street , Fifth Floor , Boston , MA
02110 - 1301 , USA .
COPYRIGHT NOTICE :
TokuDB , Tokutek Fractal Tree Indexing Library .
Copyright ( C ) 2007 - 2013 Tokutek , Inc .
DISCLAIMER :
This program is distributed in the hope that it will be useful , but
WITHOUT ANY WARRANTY ; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the GNU
General Public License for more details .
UNIVERSITY PATENT NOTICE :
The technology is licensed by the Massachusetts Institute of
Technology , Rutgers State University of New Jersey , and the Research
Foundation of State University of New York at Stony Brook under
United States of America Serial No . 11 / 760379 and to the patents
and / or patent applications resulting from it .
PATENT MARKING NOTICE :
This software is covered by US Patent No . 8 , 185 , 551.
2013-10-04 16:49:53 -04:00
This software is covered by US Patent No . 8 , 489 , 638.
2013-04-17 00:01:35 -04:00
PATENT RIGHTS GRANT :
2013-04-17 00:01:36 -04:00
" THIS IMPLEMENTATION " means the copyrightable works distributed by
2013-04-17 00:01:35 -04:00
Tokutek as part of the Fractal Tree project .
" PATENT CLAIMS " means the claims of patents that are owned or
licensable by Tokutek , both currently or in the future ; and that in
the absence of this license would be infringed by THIS
IMPLEMENTATION or by using or running THIS IMPLEMENTATION .
" PATENT CHALLENGE " shall mean a challenge to the validity ,
patentability , enforceability and / or non - infringement of any of the
PATENT CLAIMS or otherwise opposing any of the PATENT CLAIMS .
Tokutek hereby grants to you , for the term and geographical scope of
the PATENT CLAIMS , a non - exclusive , no - charge , royalty - free ,
irrevocable ( except as stated in this section ) patent license to
make , have made , use , offer to sell , sell , import , transfer , and
otherwise run , modify , and propagate the contents of THIS
IMPLEMENTATION , where such license applies only to the PATENT
CLAIMS . This grant does not include claims that would be infringed
only as a consequence of further modifications of THIS
IMPLEMENTATION . If you or your agent or licensee institute or order
or agree to the institution of patent litigation against any entity
( including a cross - claim or counterclaim in a lawsuit ) alleging that
THIS IMPLEMENTATION constitutes direct or contributory patent
infringement , or inducement of patent infringement , then any rights
granted to you under this License shall terminate as of the date
such litigation is filed . If you or your agent or exclusive
licensee institute or order or agree to the institution of a PATENT
CHALLENGE , then Tokutek may terminate any rights granted to you
under this License .
*/
2013-04-17 00:01:35 -04:00
# ident "Copyright (c) 2010-2013 Tokutek Inc. All rights reserved."
2013-04-16 23:59:09 -04:00
2013-04-16 23:59:01 -04:00
# include <db.h>
2013-04-17 00:00:35 -04:00
# include "fttypes.h"
# include "ftloader.h"
2013-04-16 23:59:09 -04:00
# include "queue.h"
2013-04-17 00:00:25 -04:00
# include <toku_pthread.h>
2013-04-16 23:59:13 -04:00
# include "dbufio.h"
2013-04-16 23:59:01 -04:00
2013-04-17 00:01:24 -04:00
enum { EXTRACTOR_QUEUE_DEPTH = 2 ,
FILE_BUFFER_SIZE = 1 < < 24 ,
MIN_ROWSET_MEMORY = 1 < < 23 ,
MIN_MERGE_FANIN = 2 ,
FRACTAL_WRITER_QUEUE_DEPTH = 3 ,
FRACTAL_WRITER_ROWSETS = FRACTAL_WRITER_QUEUE_DEPTH + 2 ,
DBUFIO_DEPTH = 2 ,
TARGET_MERGE_BUF_SIZE = 1 < < 24 , // we'd like the merge buffer to be this big.
MIN_MERGE_BUF_SIZE = 1 < < 20 , // always use at least this much
MAX_UNCOMPRESSED_BUF = MIN_MERGE_BUF_SIZE
} ;
2013-04-16 23:59:01 -04:00
/* These functions are exported to allow the tests to compile. */
2013-04-16 23:59:03 -04:00
/* These structures maintain a collection of all the open temporary files used by the loader. */
struct file_info {
2013-04-17 00:01:01 -04:00
bool is_open ;
bool is_extant ; // if true, the file must be unlinked.
2013-04-16 23:59:03 -04:00
char * fname ;
FILE * file ;
2013-04-17 00:01:01 -04:00
uint64_t n_rows ; // how many rows were written into that file
2013-04-16 23:59:09 -04:00
size_t buffer_size ;
void * buffer ;
2013-04-16 23:59:03 -04:00
} ;
struct file_infos {
int n_files ;
int n_files_limit ;
struct file_info * file_infos ;
int n_files_open , n_files_extant ;
2013-04-17 00:00:31 -04:00
toku_mutex_t lock ; // must protect this data structure because current activity performs a REALLOC(fi->file_infos).
2013-04-16 23:59:03 -04:00
} ;
typedef struct fidx { int idx ; } FIDX ;
static const FIDX FIDX_NULL __attribute__ ( ( __unused__ ) ) = { - 1 } ;
2013-04-16 23:59:09 -04:00
static int fidx_is_null ( const FIDX f ) __attribute__ ( ( __unused__ ) ) ;
static int fidx_is_null ( const FIDX f ) { return f . idx = = - 1 ; }
2013-04-17 00:00:35 -04:00
FILE * toku_bl_fidx2file ( FTLOADER bl , FIDX i ) ;
2013-04-16 23:59:03 -04:00
2013-04-17 00:00:35 -04:00
int ft_loader_open_temp_file ( FTLOADER bl , FIDX * file_idx ) ;
2013-04-16 23:59:03 -04:00
2013-04-16 23:59:09 -04:00
/* These data structures are used for manipulating a collection of rows in main memory. */
struct row {
size_t off ; // the offset in the data array.
int klen , vlen ;
} ;
struct rowset {
2013-04-16 23:59:12 -04:00
uint64_t memory_budget ;
2013-04-16 23:59:09 -04:00
size_t n_rows , n_rows_limit ;
struct row * rows ;
size_t n_bytes , n_bytes_limit ;
char * data ;
} ;
2013-04-16 23:59:12 -04:00
int init_rowset ( struct rowset * rows , uint64_t memory_budget ) ;
2013-04-16 23:59:09 -04:00
void destroy_rowset ( struct rowset * rows ) ;
2013-04-16 23:59:13 -04:00
int add_row ( struct rowset * rows , DBT * key , DBT * val ) ;
2013-04-16 23:59:09 -04:00
2013-04-17 00:01:24 -04:00
int loader_write_row ( DBT * key , DBT * val , FIDX data , FILE * , uint64_t * dataoff , struct wbuf * wb , FTLOADER bl ) ;
2013-04-16 23:59:11 -04:00
int loader_read_row ( FILE * f , DBT * key , DBT * val ) ;
2013-04-16 23:59:09 -04:00
struct merge_fileset {
2013-04-17 00:01:01 -04:00
bool have_sorted_output ; // Is there an previous key?
2013-04-16 23:59:20 -04:00
FIDX sorted_output ; // this points to one of the data_fidxs. If output_is_sorted then this is the file containing sorted data. It's still open
DBT prev_key ; // What is it? If it's here, its the last output in the merge fileset
2013-04-16 23:59:09 -04:00
int n_temp_files , n_temp_files_limit ;
FIDX * data_fidxs ;
} ;
void init_merge_fileset ( struct merge_fileset * fs ) ;
void destroy_merge_fileset ( struct merge_fileset * fs ) ;
struct poll_callback_s {
2013-04-17 00:00:35 -04:00
ft_loader_poll_func poll_function ;
2013-04-16 23:59:09 -04:00
void * poll_extra ;
} ;
2013-04-17 00:00:35 -04:00
typedef struct poll_callback_s * ft_loader_poll_callback ;
2013-04-16 23:59:09 -04:00
2013-04-17 00:00:35 -04:00
int ft_loader_init_poll_callback ( ft_loader_poll_callback ) ;
2013-04-16 23:59:09 -04:00
2013-04-17 00:00:35 -04:00
void ft_loader_destroy_poll_callback ( ft_loader_poll_callback ) ;
2013-04-16 23:59:09 -04:00
2013-04-17 00:00:35 -04:00
void ft_loader_set_poll_function ( ft_loader_poll_callback , ft_loader_poll_func poll_function , void * poll_extra ) ;
2013-04-16 23:59:09 -04:00
2013-04-17 00:00:35 -04:00
int ft_loader_call_poll_function ( ft_loader_poll_callback , float progress ) ;
2013-04-16 23:59:09 -04:00
struct error_callback_s {
2013-04-16 23:59:15 -04:00
int error ;
2013-04-17 00:00:35 -04:00
ft_loader_error_func error_callback ;
2013-04-16 23:59:09 -04:00
void * extra ;
DB * db ;
int which_db ;
DBT key ;
DBT val ;
2013-04-17 00:01:01 -04:00
bool did_callback ;
2013-04-17 00:00:31 -04:00
toku_mutex_t mutex ;
2013-04-16 23:59:09 -04:00
} ;
2013-04-17 00:00:35 -04:00
typedef struct error_callback_s * ft_loader_error_callback ;
2013-04-16 23:59:09 -04:00
2013-04-17 00:00:35 -04:00
void ft_loader_init_error_callback ( ft_loader_error_callback ) ;
2013-04-16 23:59:09 -04:00
2013-04-17 00:00:35 -04:00
void ft_loader_destroy_error_callback ( ft_loader_error_callback ) ;
2013-04-16 23:59:09 -04:00
2013-04-17 00:00:35 -04:00
int ft_loader_get_error ( ft_loader_error_callback ) ;
2013-04-16 23:59:09 -04:00
2013-04-17 00:00:35 -04:00
void ft_loader_set_error_function ( ft_loader_error_callback , ft_loader_error_func error_function , void * extra ) ;
2013-04-16 23:59:09 -04:00
2013-04-17 00:00:35 -04:00
int ft_loader_set_error ( ft_loader_error_callback , int error , DB * db , int which_db , DBT * key , DBT * val ) ;
2013-04-16 23:59:09 -04:00
2013-04-17 00:00:35 -04:00
int ft_loader_call_error_function ( ft_loader_error_callback ) ;
2013-04-16 23:59:09 -04:00
2013-04-17 00:00:35 -04:00
int ft_loader_set_error_and_callback ( ft_loader_error_callback , int error , DB * db , int which_db , DBT * key , DBT * val ) ;
2013-04-16 23:59:01 -04:00
2013-04-17 00:00:35 -04:00
struct ft_loader_s {
2013-04-16 23:59:15 -04:00
// These two are set in the close function, and used while running close
struct error_callback_s error_callback ;
struct poll_callback_s poll_callback ;
2013-04-16 23:59:01 -04:00
generate_row_for_put_func generate_row_for_put ;
2013-04-17 00:00:35 -04:00
ft_compare_func * bt_compare_funs ;
2013-04-16 23:59:01 -04:00
DB * src_db ;
int N ;
2013-04-16 23:59:14 -04:00
DB * * dbs ; // N of these
2013-04-16 23:59:17 -04:00
DESCRIPTOR * descriptors ; // N of these.
2013-04-16 23:59:22 -04:00
TXNID * root_xids_that_created ; // N of these.
2013-04-16 23:59:14 -04:00
const char * * new_fnames_in_env ; // N of these. The file names that the final data will be written to (relative to env).
uint64_t * extracted_datasizes ; // N of these.
2013-04-16 23:59:01 -04:00
2013-04-16 23:59:09 -04:00
struct rowset primary_rowset ; // the primary rows that have been put, but the secondary rows haven't been generated.
struct rowset primary_rowset_temp ; // the primary rows that are being worked on by the extractor_thread.
QUEUE primary_rowset_queue ; // main thread enqueues rowsets in this queue (in maybe 64MB chunks). The extractor thread removes them, sorts them, adn writes to file.
toku_pthread_t extractor_thread ; // the thread that takes primary rowset and does extraction and the first level sort and write to file.
2013-04-17 00:01:01 -04:00
bool extractor_live ;
2013-04-16 23:59:11 -04:00
2013-04-16 23:59:15 -04:00
DBT * last_key ; // for each rowset, remember the most recently output key. The system may choose not to keep this up-to-date when a rowset is unsorted. These keys are malloced and ulen maintains the size of the malloced block.
2013-04-16 23:59:09 -04:00
struct rowset * rows ; // secondary rows that have been put, but haven't been sorted and written to a file.
2013-04-17 00:01:01 -04:00
uint64_t n_rows ; // how many rows have been put?
2013-04-16 23:59:09 -04:00
struct merge_fileset * fs ;
2013-04-16 23:59:04 -04:00
2013-04-16 23:59:01 -04:00
const char * temp_file_template ;
2013-04-16 23:59:09 -04:00
2013-04-16 23:59:03 -04:00
CACHETABLE cachetable ;
2013-04-17 00:01:01 -04:00
bool did_reserve_memory ;
2013-04-17 00:01:24 -04:00
bool compress_intermediates ;
2013-04-16 23:59:12 -04:00
uint64_t reserved_memory ; // how much memory are we allowed to use?
2013-04-16 23:59:03 -04:00
/* To make it easier to recover from errors, we don't use FILE*, instead we use an index into the file_infos. */
struct file_infos file_infos ;
2013-04-16 23:59:04 -04:00
# define PROGRESS_MAX (1<<16)
int progress ; // Progress runs from 0 to PROGRESS_MAX. When we call the poll function we convert to a float from 0.0 to 1.0
// We use an integer so that we can add to the progress using a fetch-and-add instruction.
2013-04-16 23:59:19 -04:00
int progress_callback_result ; // initially zero, if any call to the poll function callback returns nonzero, we save the result here (and don't call the poll callback function again).
2013-04-16 23:59:05 -04:00
LSN load_lsn ; //LSN of the fsynced 'load' log entry. Write this LSN (as checkpoint_lsn) in brt headers made by this loader.
2013-04-16 23:59:22 -04:00
TXNID load_root_xid ; //(Root) transaction that performed the load.
2013-04-16 23:59:01 -04:00
2013-04-16 23:59:09 -04:00
QUEUE * fractal_queues ; // an array of work queues, one for each secondary index.
2013-04-16 23:59:10 -04:00
toku_pthread_t * fractal_threads ;
2013-04-17 00:01:01 -04:00
bool * fractal_threads_live ; // an array of bools indicating that fractal_threads[i] is a live thread. (There is no NULL for a pthread_t, so we have to maintain this separately).
2013-04-16 23:59:11 -04:00
2013-04-16 23:59:20 -04:00
unsigned fractal_workers ; // number of fractal tree writer threads
2013-04-17 00:00:31 -04:00
toku_mutex_t mutex ;
2013-04-17 00:01:01 -04:00
bool mutex_init ;
2013-04-16 23:59:01 -04:00
} ;
2013-04-16 23:59:09 -04:00
// Set the number of rows in the loader. Used for test.
2013-04-17 00:01:01 -04:00
void toku_ft_loader_set_n_rows ( FTLOADER bl , uint64_t n_rows ) ;
2013-04-16 23:59:01 -04:00
2013-04-16 23:59:09 -04:00
// Get the number of rows in the loader. Used for test.
2013-04-17 00:01:01 -04:00
uint64_t toku_ft_loader_get_n_rows ( FTLOADER bl ) ;
2013-04-16 23:59:03 -04:00
2013-04-16 23:59:09 -04:00
// The data passed into a fractal_thread via pthread_create.
struct fractal_thread_args {
2013-04-17 00:00:35 -04:00
FTLOADER bl ;
2013-04-16 23:59:17 -04:00
const DESCRIPTOR descriptor ;
2013-04-16 23:59:14 -04:00
int fd ; // write the brt into tfd.
int progress_allocation ;
QUEUE q ;
uint64_t total_disksize_estimate ;
int errno_result ; // the final result.
2013-04-16 23:59:22 -04:00
int which_db ;
2013-04-16 23:59:34 -04:00
uint32_t target_nodesize ;
2013-04-16 23:59:44 -04:00
uint32_t target_basementnodesize ;
2013-04-17 00:00:14 -04:00
enum toku_compression_method target_compression_method ;
2014-01-09 14:34:23 -05:00
uint32_t target_fanout ;
2013-04-16 23:59:03 -04:00
} ;
2013-04-16 23:59:01 -04:00
2013-04-17 00:01:01 -04:00
void toku_ft_loader_set_n_rows ( FTLOADER bl , uint64_t n_rows ) ;
uint64_t toku_ft_loader_get_n_rows ( FTLOADER bl ) ;
2013-04-16 23:59:01 -04:00
2013-04-16 23:59:09 -04:00
int merge_row_arrays_base ( struct row dest [ /*an+bn*/ ] , struct row a [ /*an*/ ] , int an , struct row b [ /*bn*/ ] , int bn ,
2013-04-17 00:00:35 -04:00
int which_db , DB * dest_db , ft_compare_func ,
FTLOADER ,
2013-04-16 23:59:09 -04:00
struct rowset * ) ;
2013-04-17 00:00:35 -04:00
int merge_files ( struct merge_fileset * fs , FTLOADER bl , int which_db , DB * dest_db , ft_compare_func , int progress_allocation , QUEUE ) ;
2013-04-16 23:59:09 -04:00
2013-04-17 00:00:35 -04:00
int sort_and_write_rows ( struct rowset rows , struct merge_fileset * fs , FTLOADER bl , int which_db , DB * dest_db , ft_compare_func ) ;
2013-04-16 23:59:09 -04:00
2013-04-17 00:00:35 -04:00
int mergesort_row_array ( struct row rows [ /*n*/ ] , int n , int which_db , DB * dest_db , ft_compare_func , FTLOADER , struct rowset * ) ;
2013-04-16 23:59:09 -04:00
2013-04-17 00:00:35 -04:00
//int write_file_to_dbfile (int outfile, FIDX infile, FTLOADER bl, const DESCRIPTOR descriptor, int progress_allocation);
2013-04-17 00:01:01 -04:00
int toku_merge_some_files_using_dbufio ( const bool to_q , FIDX dest_data , QUEUE q , int n_sources , DBUFIO_FILESET bfs , FIDX srcs_fidxs [ /*n_sources*/ ] , FTLOADER bl , int which_db , DB * dest_db , ft_compare_func compare , int progress_allocation ) ;
2013-04-16 23:59:13 -04:00
2013-04-17 00:00:35 -04:00
int ft_loader_sort_and_write_rows ( struct rowset * rows , struct merge_fileset * fs , FTLOADER bl , int which_db , DB * dest_db , ft_compare_func ) ;
2013-04-16 23:59:01 -04:00
2013-04-16 23:59:14 -04:00
// This is probably only for testing.
2013-04-17 00:00:35 -04:00
int toku_loader_write_brt_from_q_in_C ( FTLOADER bl ,
2013-04-16 23:59:17 -04:00
const DESCRIPTOR descriptor ,
2013-04-16 23:59:14 -04:00
int fd , // write to here
int progress_allocation ,
QUEUE q ,
2013-04-16 23:59:22 -04:00
uint64_t total_disksize_estimate ,
2013-04-16 23:59:34 -04:00
int which_db ,
2013-04-16 23:59:44 -04:00
uint32_t target_nodesize ,
2013-04-17 00:00:14 -04:00
uint32_t target_basementnodesize ,
2014-01-09 14:34:23 -05:00
enum toku_compression_method target_compression_method ,
uint32_t fanout ) ;
2013-04-16 23:59:03 -04:00
2013-04-17 00:00:35 -04:00
int ft_loader_mergesort_row_array ( struct row rows [ /*n*/ ] , int n , int which_db , DB * dest_db , ft_compare_func , FTLOADER , struct rowset * ) ;
2013-04-16 23:59:09 -04:00
2013-04-17 00:00:35 -04:00
int ft_loader_write_file_to_dbfile ( int outfile , FIDX infile , FTLOADER bl , const DESCRIPTOR descriptor , int progress_allocation ) ;
2013-04-16 23:59:01 -04:00
2013-04-17 00:00:35 -04:00
int ft_loader_init_file_infos ( struct file_infos * fi ) ;
2013-04-17 00:01:01 -04:00
void ft_loader_fi_destroy ( struct file_infos * fi , bool is_error ) ;
int ft_loader_fi_close ( struct file_infos * fi , FIDX idx , bool require_open ) ;
2013-04-17 00:00:35 -04:00
int ft_loader_fi_close_all ( struct file_infos * fi ) ;
int ft_loader_fi_reopen ( struct file_infos * fi , FIDX idx , const char * mode ) ;
int ft_loader_fi_unlink ( struct file_infos * fi , FIDX idx ) ;
2013-04-16 23:59:09 -04:00
2013-04-17 00:00:35 -04:00
int toku_ft_loader_internal_init ( /* out */ FTLOADER * blp ,
2013-04-16 23:59:13 -04:00
CACHETABLE cachetable ,
generate_row_for_put_func g ,
DB * src_db ,
2013-04-17 00:00:35 -04:00
int N , FT_HANDLE brts [ /*N*/ ] , DB * dbs [ /*N*/ ] ,
2013-04-16 23:59:13 -04:00
const char * new_fnames_in_env [ /*N*/ ] ,
2013-04-17 00:00:35 -04:00
ft_compare_func bt_compare_functions [ /*N*/ ] ,
2013-04-16 23:59:13 -04:00
const char * temp_file_template ,
2013-04-16 23:59:22 -04:00
LSN load_lsn ,
2013-04-17 00:00:57 -04:00
TOKUTXN txn ,
2013-04-17 00:01:24 -04:00
bool reserve_memory ,
2013-10-07 08:35:52 -04:00
uint64_t reserve_memory_size ,
2013-04-17 00:01:24 -04:00
bool compress_intermediates ) ;
2013-04-16 23:59:13 -04:00
2013-04-17 00:01:01 -04:00
void toku_ft_loader_internal_destroy ( FTLOADER bl , bool is_error ) ;
2013-04-16 23:59:14 -04:00
2013-04-16 23:59:15 -04:00
// For test purposes only. (In production, the rowset size is determined by negotation with the cachetable for some memory. See #2613.)
2013-04-17 00:00:35 -04:00
uint64_t toku_ft_loader_get_rowset_budget_for_testing ( void ) ;
2013-04-16 23:59:15 -04:00
2013-04-17 00:00:35 -04:00
int toku_ft_loader_finish_extractor ( FTLOADER bl ) ;
2013-04-16 23:59:15 -04:00
2013-04-17 00:00:35 -04:00
int toku_ft_loader_get_error ( FTLOADER bl , int * loader_errno ) ;
2013-04-16 23:59:15 -04:00
2013-04-17 00:00:35 -04:00
void ft_loader_lock_init ( FTLOADER bl ) ;
void ft_loader_lock_destroy ( FTLOADER bl ) ;
void ft_loader_set_fractal_workers_count_from_c ( FTLOADER bl ) ;
2013-04-16 23:59:20 -04:00
2013-04-17 00:00:35 -04:00
# endif // FTLOADER_INTERNAL_H