mirror of
				https://github.com/MariaDB/server.git
				synced 2025-10-30 18:36:12 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			1785 lines
		
	
	
	
		
			48 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1785 lines
		
	
	
	
		
			48 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*****************************************************************************
 | |
| 
 | |
| Copyright (c) 2010, 2016, Oracle and/or its affiliates. All Rights Reserved.
 | |
| Copyright (c) 2015, 2022, MariaDB Corporation.
 | |
| 
 | |
| This program is free software; you can redistribute it and/or modify it under
 | |
| the terms of the GNU General Public License as published by the Free Software
 | |
| Foundation; version 2 of the License.
 | |
| 
 | |
| This program is distributed in the hope that it will be useful, but WITHOUT
 | |
| ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 | |
| FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
 | |
| 
 | |
| You should have received a copy of the GNU General Public License along with
 | |
| this program; if not, write to the Free Software Foundation, Inc.,
 | |
| 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
 | |
| 
 | |
| *****************************************************************************/
 | |
| 
 | |
| /**************************************************//**
 | |
| @file row/row0ftsort.cc
 | |
| Create Full Text Index with (parallel) merge sort
 | |
| 
 | |
| Created 10/13/2010 Jimmy Yang
 | |
| *******************************************************/
 | |
| 
 | |
| #include "row0ftsort.h"
 | |
| #include "dict0dict.h"
 | |
| #include "row0merge.h"
 | |
| #include "row0row.h"
 | |
| #include "btr0cur.h"
 | |
| #include "fts0plugin.h"
 | |
| #include "log0crypt.h"
 | |
| 
 | |
| /** Read the next record to buffer N.
 | |
| @param N index into array of merge info structure */
 | |
| #define ROW_MERGE_READ_GET_NEXT(N)					\
 | |
| 	do {								\
 | |
| 		b[N] = row_merge_read_rec(				\
 | |
| 			block[N], buf[N], b[N], index,			\
 | |
| 			fd[N], &foffs[N], &mrec[N], offsets[N],		\
 | |
| 			crypt_block[N], space);				\
 | |
| 		if (UNIV_UNLIKELY(!b[N])) {				\
 | |
| 			if (mrec[N]) {					\
 | |
| 				goto exit;				\
 | |
| 			}						\
 | |
| 		}							\
 | |
| 	} while (0)
 | |
| 
 | |
| /** Parallel sort degree */
 | |
| ulong	fts_sort_pll_degree	= 2;
 | |
| 
 | |
| /*********************************************************************//**
 | |
| Create a temporary "fts sort index" used to merge sort the
 | |
| tokenized doc string. The index has three "fields":
 | |
| 
 | |
| 1) Tokenized word,
 | |
| 2) Doc ID (depend on number of records to sort, it can be a 4 bytes or 8 bytes
 | |
| integer value)
 | |
| 3) Word's position in original doc.
 | |
| 
 | |
| @see fts_create_one_index_table()
 | |
| 
 | |
| @return dict_index_t structure for the fts sort index */
 | |
| dict_index_t*
 | |
| row_merge_create_fts_sort_index(
 | |
| /*============================*/
 | |
| 	dict_index_t*	index,	/*!< in: Original FTS index
 | |
| 				based on which this sort index
 | |
| 				is created */
 | |
| 	dict_table_t*	table,	/*!< in,out: table that FTS index
 | |
| 				is being created on */
 | |
| 	ibool*		opt_doc_id_size)
 | |
| 				/*!< out: whether to use 4 bytes
 | |
| 				instead of 8 bytes integer to
 | |
| 				store Doc ID during sort */
 | |
| {
 | |
| 	dict_index_t*   new_index;
 | |
| 	dict_field_t*   field;
 | |
| 	dict_field_t*   idx_field;
 | |
| 	CHARSET_INFO*	charset;
 | |
| 
 | |
| 	// FIXME: This name shouldn't be hard coded here.
 | |
| 	new_index = dict_mem_index_create(table, "tmp_fts_idx", DICT_FTS, 3);
 | |
| 
 | |
| 	new_index->id = index->id;
 | |
| 	new_index->n_uniq = FTS_NUM_FIELDS_SORT;
 | |
| 	new_index->n_def = FTS_NUM_FIELDS_SORT;
 | |
| 	new_index->cached = TRUE;
 | |
| 	new_index->parser = index->parser;
 | |
| 
 | |
| 	idx_field = dict_index_get_nth_field(index, 0);
 | |
| 	charset = fts_index_get_charset(index);
 | |
| 
 | |
| 	/* The first field is on the Tokenized Word */
 | |
| 	field = dict_index_get_nth_field(new_index, 0);
 | |
| 	field->name = NULL;
 | |
| 	field->prefix_len = 0;
 | |
| 	field->col = static_cast<dict_col_t*>(
 | |
| 		mem_heap_zalloc(new_index->heap, sizeof(dict_col_t)));
 | |
| 	field->col->prtype = idx_field->col->prtype | DATA_NOT_NULL;
 | |
| 	field->col->mtype = charset == &my_charset_latin1
 | |
| 		? DATA_VARCHAR : DATA_VARMYSQL;
 | |
| 	field->col->mbminlen = idx_field->col->mbminlen;
 | |
| 	field->col->mbmaxlen = idx_field->col->mbmaxlen;
 | |
| 	field->col->len = static_cast<uint16_t>(
 | |
| 		HA_FT_MAXCHARLEN * field->col->mbmaxlen);
 | |
| 
 | |
| 	field->fixed_len = 0;
 | |
| 
 | |
| 	/* Doc ID */
 | |
| 	field = dict_index_get_nth_field(new_index, 1);
 | |
| 	field->name = NULL;
 | |
| 	field->prefix_len = 0;
 | |
| 	field->col = static_cast<dict_col_t*>(
 | |
| 		mem_heap_zalloc(new_index->heap, sizeof(dict_col_t)));
 | |
| 	field->col->mtype = DATA_INT;
 | |
| 	*opt_doc_id_size = FALSE;
 | |
| 
 | |
| 	/* Check whether we can use 4 bytes instead of 8 bytes integer
 | |
| 	field to hold the Doc ID, thus reduce the overall sort size */
 | |
| 	if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID)) {
 | |
| 		/* If Doc ID column is being added by this create
 | |
| 		index, then just check the number of rows in the table */
 | |
| 		if (dict_table_get_n_rows(table) < MAX_DOC_ID_OPT_VAL) {
 | |
| 			*opt_doc_id_size = TRUE;
 | |
| 		}
 | |
| 	} else {
 | |
| 		doc_id_t	max_doc_id;
 | |
| 
 | |
| 		/* If the Doc ID column is supplied by user, then
 | |
| 		check the maximum Doc ID in the table */
 | |
| 		max_doc_id = fts_get_max_doc_id((dict_table_t*) table);
 | |
| 
 | |
| 		if (max_doc_id && max_doc_id < MAX_DOC_ID_OPT_VAL) {
 | |
| 			*opt_doc_id_size = TRUE;
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if (*opt_doc_id_size) {
 | |
| 		field->col->len = sizeof(ib_uint32_t);
 | |
| 		field->fixed_len = sizeof(ib_uint32_t);
 | |
| 	} else {
 | |
| 		field->col->len = FTS_DOC_ID_LEN;
 | |
| 		field->fixed_len = FTS_DOC_ID_LEN;
 | |
| 	}
 | |
| 
 | |
| 	field->col->prtype = DATA_NOT_NULL | DATA_BINARY_TYPE;
 | |
| 
 | |
| 	/* The third field is on the word's position in the original doc */
 | |
| 	field = dict_index_get_nth_field(new_index, 2);
 | |
| 	field->name = NULL;
 | |
| 	field->prefix_len = 0;
 | |
| 	field->col = static_cast<dict_col_t*>(
 | |
| 		mem_heap_zalloc(new_index->heap, sizeof(dict_col_t)));
 | |
| 	field->col->mtype = DATA_INT;
 | |
| 	field->col->len = 4 ;
 | |
| 	field->fixed_len = 4;
 | |
| 	field->col->prtype = DATA_NOT_NULL;
 | |
| 
 | |
| 	return(new_index);
 | |
| }
 | |
| 
 | |
| /** Initialize FTS parallel sort structures.
 | |
| @param[in]	trx		transaction
 | |
| @param[in,out]	dup		descriptor of FTS index being created
 | |
| @param[in,out]	new_table	table where indexes are created
 | |
| @param[in]	opt_doc_id_size	whether to use 4 bytes instead of 8 bytes
 | |
| 				integer to store Doc ID during sort
 | |
| @param[in]	old_zip_size	page size of the old table during alter
 | |
| @param[out]	psort		parallel sort info to be instantiated
 | |
| @param[out]	merge		parallel merge info to be instantiated
 | |
| @return true if all successful */
 | |
| bool
 | |
| row_fts_psort_info_init(
 | |
| 	trx_t*		trx,
 | |
| 	row_merge_dup_t*dup,
 | |
| 	dict_table_t*	new_table,
 | |
| 	bool		opt_doc_id_size,
 | |
| 	ulint		old_zip_size,
 | |
| 	fts_psort_t**	psort,
 | |
| 	fts_psort_t**	merge)
 | |
| {
 | |
| 	ulint			i;
 | |
| 	ulint			j;
 | |
| 	fts_psort_common_t*	common_info = NULL;
 | |
| 	fts_psort_t*		psort_info = NULL;
 | |
| 	fts_psort_t*		merge_info = NULL;
 | |
| 	ulint			block_size;
 | |
| 	ibool			ret = TRUE;
 | |
| 	bool			encrypted = false;
 | |
| 	ut_ad(ut_is_2pow(old_zip_size));
 | |
| 
 | |
| 	block_size = 3 * srv_sort_buf_size;
 | |
| 
 | |
| 	*psort = psort_info = static_cast<fts_psort_t*>(ut_zalloc_nokey(
 | |
| 		 fts_sort_pll_degree * sizeof *psort_info));
 | |
| 
 | |
| 	if (!psort_info) {
 | |
| 		ut_free(dup);
 | |
| 		return(FALSE);
 | |
| 	}
 | |
| 
 | |
| 	/* Common Info for all sort threads */
 | |
| 	common_info = static_cast<fts_psort_common_t*>(
 | |
| 		ut_malloc_nokey(sizeof *common_info));
 | |
| 
 | |
| 	if (!common_info) {
 | |
| 		ut_free(dup);
 | |
| 		ut_free(psort_info);
 | |
| 		return(FALSE);
 | |
| 	}
 | |
| 
 | |
| 	common_info->dup = dup;
 | |
| 	common_info->new_table = new_table;
 | |
| 	common_info->old_zip_size = old_zip_size;
 | |
| 	common_info->trx = trx;
 | |
| 	common_info->all_info = psort_info;
 | |
| 	pthread_cond_init(&common_info->sort_cond, nullptr);
 | |
| 	common_info->opt_doc_id_size = opt_doc_id_size;
 | |
| 
 | |
| 	if (log_tmp_is_encrypted()) {
 | |
| 		encrypted = true;
 | |
| 	}
 | |
| 
 | |
| 	ut_ad(trx->mysql_thd != NULL);
 | |
| 	const char*	path = thd_innodb_tmpdir(trx->mysql_thd);
 | |
| 	/* There will be FTS_NUM_AUX_INDEX number of "sort buckets" for
 | |
| 	each parallel sort thread. Each "sort bucket" holds records for
 | |
| 	a particular "FTS index partition" */
 | |
| 	for (j = 0; j < fts_sort_pll_degree; j++) {
 | |
| 
 | |
| 		UT_LIST_INIT(
 | |
| 			psort_info[j].fts_doc_list, &fts_doc_item_t::doc_list);
 | |
| 
 | |
| 		for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
 | |
| 
 | |
| 			psort_info[j].merge_file[i] =
 | |
| 				 static_cast<merge_file_t*>(
 | |
| 					ut_zalloc_nokey(sizeof(merge_file_t)));
 | |
| 
 | |
| 			if (!psort_info[j].merge_file[i]) {
 | |
| 				ret = FALSE;
 | |
| 				goto func_exit;
 | |
| 			}
 | |
| 
 | |
| 			psort_info[j].merge_buf[i] = row_merge_buf_create(
 | |
| 				dup->index);
 | |
| 
 | |
| 			if (row_merge_file_create(psort_info[j].merge_file[i],
 | |
| 						  path) == OS_FILE_CLOSED) {
 | |
| 				goto func_exit;
 | |
| 			}
 | |
| 
 | |
| 			/* Need to align memory for O_DIRECT write */
 | |
| 			psort_info[j].merge_block[i] =
 | |
| 				static_cast<row_merge_block_t*>(
 | |
| 					aligned_malloc(block_size, 1024));
 | |
| 
 | |
| 			if (!psort_info[j].merge_block[i]) {
 | |
| 				ret = FALSE;
 | |
| 				goto func_exit;
 | |
| 			}
 | |
| 
 | |
| 			/* If tablespace is encrypted, allocate additional buffer for
 | |
| 			encryption/decryption. */
 | |
| 			if (encrypted) {
 | |
| 				/* Need to align memory for O_DIRECT write */
 | |
| 				psort_info[j].crypt_block[i] =
 | |
| 					static_cast<row_merge_block_t*>(
 | |
| 						aligned_malloc(block_size,
 | |
| 							       1024));
 | |
| 
 | |
| 				if (!psort_info[j].crypt_block[i]) {
 | |
| 					ret = FALSE;
 | |
| 					goto func_exit;
 | |
| 				}
 | |
| 			} else {
 | |
| 				psort_info[j].crypt_block[i] = NULL;
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		psort_info[j].child_status = 0;
 | |
| 		psort_info[j].state = 0;
 | |
| 		psort_info[j].psort_common = common_info;
 | |
| 		psort_info[j].error = DB_SUCCESS;
 | |
| 		psort_info[j].memory_used = 0;
 | |
| 		mysql_mutex_init(0, &psort_info[j].mutex, nullptr);
 | |
| 	}
 | |
| 
 | |
| 	/* Initialize merge_info structures parallel merge and insert
 | |
| 	into auxiliary FTS tables (FTS_INDEX_TABLE) */
 | |
| 	*merge = merge_info = static_cast<fts_psort_t*>(
 | |
| 		ut_malloc_nokey(FTS_NUM_AUX_INDEX * sizeof *merge_info));
 | |
| 
 | |
| 	for (j = 0; j < FTS_NUM_AUX_INDEX; j++) {
 | |
| 
 | |
| 		merge_info[j].child_status = 0;
 | |
| 		merge_info[j].state = 0;
 | |
| 		merge_info[j].psort_common = common_info;
 | |
| 	}
 | |
| 
 | |
| func_exit:
 | |
| 	if (!ret) {
 | |
| 		row_fts_psort_info_destroy(psort_info, merge_info);
 | |
| 	}
 | |
| 
 | |
| 	return(ret);
 | |
| }
 | |
| /*********************************************************************//**
 | |
| Clean up and deallocate FTS parallel sort structures, and close the
 | |
| merge sort files  */
 | |
| void
 | |
| row_fts_psort_info_destroy(
 | |
| /*=======================*/
 | |
| 	fts_psort_t*	psort_info,	/*!< parallel sort info */
 | |
| 	fts_psort_t*	merge_info)	/*!< parallel merge info */
 | |
| {
 | |
| 	ulint	i;
 | |
| 	ulint	j;
 | |
| 
 | |
| 	if (psort_info) {
 | |
| 		for (j = 0; j < fts_sort_pll_degree; j++) {
 | |
| 			for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
 | |
| 				if (psort_info[j].merge_file[i]) {
 | |
| 					row_merge_file_destroy(
 | |
| 						psort_info[j].merge_file[i]);
 | |
| 				}
 | |
| 
 | |
| 				aligned_free(psort_info[j].merge_block[i]);
 | |
| 				ut_free(psort_info[j].merge_file[i]);
 | |
| 				aligned_free(psort_info[j].crypt_block[i]);
 | |
| 			}
 | |
| 
 | |
| 			mysql_mutex_destroy(&psort_info[j].mutex);
 | |
| 		}
 | |
| 
 | |
| 		pthread_cond_destroy(&merge_info[0].psort_common->sort_cond);
 | |
| 		ut_free(merge_info[0].psort_common->dup);
 | |
| 		ut_free(merge_info[0].psort_common);
 | |
| 		ut_free(psort_info);
 | |
| 	}
 | |
| 
 | |
| 	ut_free(merge_info);
 | |
| }
 | |
| /*********************************************************************//**
 | |
| Free up merge buffers when merge sort is done */
 | |
| void
 | |
| row_fts_free_pll_merge_buf(
 | |
| /*=======================*/
 | |
| 	fts_psort_t*	psort_info)	/*!< in: parallel sort info */
 | |
| {
 | |
| 	ulint	j;
 | |
| 	ulint	i;
 | |
| 
 | |
| 	if (!psort_info) {
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| 	for (j = 0; j < fts_sort_pll_degree; j++) {
 | |
| 		for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
 | |
| 			row_merge_buf_free(psort_info[j].merge_buf[i]);
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return;
 | |
| }
 | |
| 
 | |
| /*********************************************************************//**
 | |
| FTS plugin parser 'myql_add_word' callback function for row merge.
 | |
| Refer to 'st_mysql_ftparser_param' for more detail.
 | |
| @return always returns 0 */
 | |
| static
 | |
| int
 | |
| row_merge_fts_doc_add_word_for_parser(
 | |
| /*==================================*/
 | |
| 	MYSQL_FTPARSER_PARAM	*param,		/* in: parser paramter */
 | |
| 	const char		*word,		/* in: token word */
 | |
| 	int			word_len,	/* in: word len */
 | |
| 	MYSQL_FTPARSER_BOOLEAN_INFO*	boolean_info)	/* in: boolean info */
 | |
| {
 | |
| 	fts_string_t		str;
 | |
| 	fts_tokenize_ctx_t*	t_ctx;
 | |
| 	row_fts_token_t*	fts_token;
 | |
| 	byte*			ptr;
 | |
| 
 | |
| 	ut_ad(param);
 | |
| 	ut_ad(param->mysql_ftparam);
 | |
| 	ut_ad(word);
 | |
| 	ut_ad(boolean_info);
 | |
| 
 | |
| 	t_ctx = static_cast<fts_tokenize_ctx_t*>(param->mysql_ftparam);
 | |
| 	ut_ad(t_ctx);
 | |
| 
 | |
| 	str.f_str = (byte*)(word);
 | |
| 	str.f_len = ulint(word_len);
 | |
| 	str.f_n_char = fts_get_token_size(
 | |
| 		(CHARSET_INFO*)param->cs, word, ulint(word_len));
 | |
| 
 | |
| 	/* JAN: TODO: MySQL 5.7 FTS
 | |
| 	ut_ad(boolean_info->position >= 0);
 | |
| 	*/
 | |
| 
 | |
| 	ptr = static_cast<byte*>(ut_malloc_nokey(sizeof(row_fts_token_t)
 | |
| 			+ sizeof(fts_string_t) + str.f_len));
 | |
| 	fts_token = reinterpret_cast<row_fts_token_t*>(ptr);
 | |
| 	fts_token->text = reinterpret_cast<fts_string_t*>(
 | |
| 			ptr + sizeof(row_fts_token_t));
 | |
| 	fts_token->text->f_str = static_cast<byte*>(
 | |
| 			ptr + sizeof(row_fts_token_t) + sizeof(fts_string_t));
 | |
| 
 | |
| 	fts_token->text->f_len = str.f_len;
 | |
| 	fts_token->text->f_n_char = str.f_n_char;
 | |
| 	memcpy(fts_token->text->f_str, str.f_str, str.f_len);
 | |
| 
 | |
| 	/* JAN: TODO: MySQL 5.7 FTS
 | |
| 	fts_token->position = boolean_info->position;
 | |
| 	*/
 | |
| 
 | |
| 	/* Add token to list */
 | |
| 	UT_LIST_ADD_LAST(t_ctx->fts_token_list, fts_token);
 | |
| 
 | |
| 	return(0);
 | |
| }
 | |
| 
 | |
| /*********************************************************************//**
 | |
| Tokenize by fts plugin parser */
 | |
| static
 | |
| void
 | |
| row_merge_fts_doc_tokenize_by_parser(
 | |
| /*=================================*/
 | |
| 	fts_doc_t*		doc,	/* in: doc to tokenize */
 | |
| 	st_mysql_ftparser*	parser,	/* in: plugin parser instance */
 | |
| 	fts_tokenize_ctx_t*	t_ctx)	/* in/out: tokenize ctx instance */
 | |
| {
 | |
| 	MYSQL_FTPARSER_PARAM	param;
 | |
| 
 | |
| 	ut_a(parser);
 | |
| 
 | |
| 	/* Set paramters for param */
 | |
| 	param.mysql_parse = fts_tokenize_document_internal;
 | |
| 	param.mysql_add_word = row_merge_fts_doc_add_word_for_parser;
 | |
| 	param.mysql_ftparam = t_ctx;
 | |
| 	param.cs = doc->charset;
 | |
| 	param.doc = reinterpret_cast<char*>(doc->text.f_str);
 | |
| 	param.length = static_cast<int>(doc->text.f_len);
 | |
| 	param.mode= MYSQL_FTPARSER_SIMPLE_MODE;
 | |
| 
 | |
| 	PARSER_INIT(parser, ¶m);
 | |
| 	/* We assume parse returns successfully here. */
 | |
| 	parser->parse(¶m);
 | |
| 	PARSER_DEINIT(parser, ¶m);
 | |
| }
 | |
| 
 | |
| /*********************************************************************//**
 | |
| Tokenize incoming text data and add to the sort buffer.
 | |
| @see row_merge_buf_encode()
 | |
| @return	TRUE if the record passed, FALSE if out of space */
 | |
| static
 | |
| ibool
 | |
| row_merge_fts_doc_tokenize(
 | |
| /*=======================*/
 | |
| 	row_merge_buf_t**	sort_buf,	/*!< in/out: sort buffer */
 | |
| 	doc_id_t		doc_id,		/*!< in: Doc ID */
 | |
| 	fts_doc_t*		doc,		/*!< in: Doc to be tokenized */
 | |
| 	merge_file_t**		merge_file,	/*!< in/out: merge file */
 | |
| 	ibool			opt_doc_id_size,/*!< in: whether to use 4 bytes
 | |
| 						instead of 8 bytes integer to
 | |
| 						store Doc ID during sort*/
 | |
| 	fts_tokenize_ctx_t*	t_ctx)          /*!< in/out: tokenize context */
 | |
| {
 | |
| 	ulint		inc = 0;
 | |
| 	fts_string_t	str;
 | |
| 	ulint		len;
 | |
| 	row_merge_buf_t* buf;
 | |
| 	dfield_t*	field;
 | |
| 	fts_string_t	t_str;
 | |
| 	ibool		buf_full = FALSE;
 | |
| 	byte		str_buf[FTS_MAX_WORD_LEN + 1];
 | |
| 	ulint		data_size[FTS_NUM_AUX_INDEX];
 | |
| 	ulint		n_tuple[FTS_NUM_AUX_INDEX];
 | |
| 	st_mysql_ftparser*	parser;
 | |
| 
 | |
| 	t_str.f_n_char = 0;
 | |
| 	t_ctx->buf_used = 0;
 | |
| 
 | |
| 	memset(n_tuple, 0, FTS_NUM_AUX_INDEX * sizeof(ulint));
 | |
| 	memset(data_size, 0, FTS_NUM_AUX_INDEX * sizeof(ulint));
 | |
| 
 | |
| 	parser = sort_buf[0]->index->parser;
 | |
| 
 | |
| 	/* Tokenize the data and add each word string, its corresponding
 | |
| 	doc id and position to sort buffer */
 | |
| 	while (parser
 | |
|                ? (!t_ctx->processed_len
 | |
|                   || UT_LIST_GET_LEN(t_ctx->fts_token_list))
 | |
|                : t_ctx->processed_len < doc->text.f_len) {
 | |
| 		ulint		idx = 0;
 | |
| 		ulint		cur_len;
 | |
| 		doc_id_t	write_doc_id;
 | |
| 		row_fts_token_t* fts_token = NULL;
 | |
| 
 | |
| 		if (parser != NULL) {
 | |
| 			if (t_ctx->processed_len == 0) {
 | |
| 				UT_LIST_INIT(t_ctx->fts_token_list, &row_fts_token_t::token_list);
 | |
| 
 | |
| 				/* Parse the whole doc and cache tokens */
 | |
| 				row_merge_fts_doc_tokenize_by_parser(doc,
 | |
| 					parser, t_ctx);
 | |
| 
 | |
| 				/* Just indictate we have parsed all the word */
 | |
| 				t_ctx->processed_len += 1;
 | |
| 			}
 | |
| 
 | |
| 			/* Then get a token */
 | |
| 			fts_token = UT_LIST_GET_FIRST(t_ctx->fts_token_list);
 | |
| 			if (fts_token) {
 | |
| 				str.f_len = fts_token->text->f_len;
 | |
| 				str.f_n_char = fts_token->text->f_n_char;
 | |
| 				str.f_str = fts_token->text->f_str;
 | |
| 			} else {
 | |
| 				ut_ad(UT_LIST_GET_LEN(t_ctx->fts_token_list) == 0);
 | |
| 				/* Reach the end of the list */
 | |
| 				t_ctx->processed_len = doc->text.f_len;
 | |
| 				break;
 | |
| 			}
 | |
| 		} else {
 | |
| 			inc = innobase_mysql_fts_get_token(
 | |
| 				doc->charset,
 | |
| 				doc->text.f_str + t_ctx->processed_len,
 | |
| 				doc->text.f_str + doc->text.f_len, &str);
 | |
| 
 | |
| 			ut_a(inc > 0);
 | |
| 		}
 | |
| 
 | |
| 		/* Ignore string whose character number is less than
 | |
| 		"fts_min_token_size" or more than "fts_max_token_size" */
 | |
| 		if (!fts_check_token(&str, NULL, NULL)) {
 | |
| 			if (parser != NULL) {
 | |
| 				UT_LIST_REMOVE(t_ctx->fts_token_list, fts_token);
 | |
| 				ut_free(fts_token);
 | |
| 			} else {
 | |
| 				t_ctx->processed_len += inc;
 | |
| 			}
 | |
| 
 | |
| 			continue;
 | |
| 		}
 | |
| 
 | |
| 		t_str.f_len = innobase_fts_casedn_str(
 | |
| 			doc->charset, (char*) str.f_str, str.f_len,
 | |
| 			(char*) &str_buf, FTS_MAX_WORD_LEN + 1);
 | |
| 
 | |
| 		t_str.f_str = (byte*) &str_buf;
 | |
| 
 | |
| 		/* if "cached_stopword" is defined, ignore words in the
 | |
| 		stopword list */
 | |
| 		if (!fts_check_token(&str, t_ctx->cached_stopword,
 | |
| 				     doc->charset)) {
 | |
| 			if (parser != NULL) {
 | |
| 				UT_LIST_REMOVE(t_ctx->fts_token_list, fts_token);
 | |
| 				ut_free(fts_token);
 | |
| 			} else {
 | |
| 				t_ctx->processed_len += inc;
 | |
| 			}
 | |
| 
 | |
| 			continue;
 | |
| 		}
 | |
| 
 | |
| 		/* There are FTS_NUM_AUX_INDEX auxiliary tables, find
 | |
| 		out which sort buffer to put this word record in */
 | |
| 		t_ctx->buf_used = fts_select_index(
 | |
| 			doc->charset, t_str.f_str, t_str.f_len);
 | |
| 
 | |
| 		buf = sort_buf[t_ctx->buf_used];
 | |
| 
 | |
| 		ut_a(t_ctx->buf_used < FTS_NUM_AUX_INDEX);
 | |
| 		idx = t_ctx->buf_used;
 | |
| 
 | |
| 		mtuple_t* mtuple = &buf->tuples[buf->n_tuples + n_tuple[idx]];
 | |
| 
 | |
| 		field = mtuple->fields = static_cast<dfield_t*>(
 | |
| 			mem_heap_alloc(buf->heap,
 | |
| 				       FTS_NUM_FIELDS_SORT * sizeof *field));
 | |
| 
 | |
| 		/* The first field is the tokenized word */
 | |
| 		dfield_set_data(field, t_str.f_str, t_str.f_len);
 | |
| 		len = dfield_get_len(field);
 | |
| 
 | |
| 		dict_col_copy_type(dict_index_get_nth_col(buf->index, 0), &field->type);
 | |
| 		field->type.prtype |= DATA_NOT_NULL;
 | |
| 		ut_ad(len <= field->type.len);
 | |
| 
 | |
| 		/* For the temporary file, row_merge_buf_encode() uses
 | |
| 		1 byte for representing the number of extra_size bytes.
 | |
| 		This number will always be 1, because for this 3-field index
 | |
| 		consisting of one variable-size column, extra_size will always
 | |
| 		be 1 or 2, which can be encoded in one byte.
 | |
| 
 | |
| 		The extra_size is 1 byte if the length of the
 | |
| 		variable-length column is less than 128 bytes or the
 | |
| 		maximum length is less than 256 bytes. */
 | |
| 
 | |
| 		/* One variable length column, word with its lenght less than
 | |
| 		fts_max_token_size, add one extra size and one extra byte.
 | |
| 
 | |
| 		Since the max length for FTS token now is larger than 255,
 | |
| 		so we will need to signify length byte itself, so only 1 to 128
 | |
| 		bytes can be used for 1 bytes, larger than that 2 bytes. */
 | |
| 		if (len < 128 || field->type.len < 256) {
 | |
| 			/* Extra size is one byte. */
 | |
| 			cur_len = 2 + len;
 | |
| 		} else {
 | |
| 			/* Extra size is two bytes. */
 | |
| 			cur_len = 3 + len;
 | |
| 		}
 | |
| 
 | |
| 		dfield_dup(field, buf->heap);
 | |
| 		field++;
 | |
| 
 | |
| 		/* The second field is the Doc ID */
 | |
| 
 | |
| 		ib_uint32_t	doc_id_32_bit;
 | |
| 
 | |
| 		if (!opt_doc_id_size) {
 | |
| 			fts_write_doc_id((byte*) &write_doc_id, doc_id);
 | |
| 
 | |
| 			dfield_set_data(
 | |
| 				field, &write_doc_id, sizeof(write_doc_id));
 | |
| 		} else {
 | |
| 			mach_write_to_4(
 | |
| 				(byte*) &doc_id_32_bit, (ib_uint32_t) doc_id);
 | |
| 
 | |
| 			dfield_set_data(
 | |
| 				field, &doc_id_32_bit, sizeof(doc_id_32_bit));
 | |
| 		}
 | |
| 
 | |
| 		len = field->len;
 | |
| 		ut_ad(len == FTS_DOC_ID_LEN || len == sizeof(ib_uint32_t));
 | |
| 
 | |
| 		field->type.mtype = DATA_INT;
 | |
| 		field->type.prtype = DATA_NOT_NULL | DATA_BINARY_TYPE;
 | |
| 		field->type.len = static_cast<uint16_t>(field->len);
 | |
| 		field->type.mbminlen = 0;
 | |
| 		field->type.mbmaxlen = 0;
 | |
| 
 | |
| 		cur_len += len;
 | |
| 		dfield_dup(field, buf->heap);
 | |
| 
 | |
| 		++field;
 | |
| 
 | |
| 		/* The third field is the position.
 | |
| 		MySQL 5.7 changed the fulltext parser plugin interface
 | |
| 		by adding MYSQL_FTPARSER_BOOLEAN_INFO::position.
 | |
| 		Below we assume that the field is always 0. */
 | |
| 		ulint	pos = t_ctx->init_pos;
 | |
| 		byte		position[4];
 | |
| 		if (parser == NULL) {
 | |
| 			pos += t_ctx->processed_len + inc - str.f_len;
 | |
| 		}
 | |
| 		len = 4;
 | |
| 		mach_write_to_4(position, pos);
 | |
| 		dfield_set_data(field, &position, len);
 | |
| 
 | |
| 		field->type.mtype = DATA_INT;
 | |
| 		field->type.prtype = DATA_NOT_NULL;
 | |
| 		field->type.len = 4;
 | |
| 		field->type.mbminlen = 0;
 | |
| 		field->type.mbmaxlen = 0;
 | |
| 		cur_len += len;
 | |
| 		dfield_dup(field, buf->heap);
 | |
| 
 | |
| 		/* Reserve one byte for the end marker of row_merge_block_t */
 | |
| 		if (buf->total_size + data_size[idx] + cur_len
 | |
| 		    >= srv_sort_buf_size - 1) {
 | |
| 
 | |
| 			buf_full = TRUE;
 | |
| 			break;
 | |
| 		}
 | |
| 
 | |
| 		/* Increment the number of tuples */
 | |
| 		n_tuple[idx]++;
 | |
| 		if (parser != NULL) {
 | |
| 			UT_LIST_REMOVE(t_ctx->fts_token_list, fts_token);
 | |
| 			ut_free(fts_token);
 | |
| 		} else {
 | |
| 			t_ctx->processed_len += inc;
 | |
| 		}
 | |
| 		data_size[idx] += cur_len;
 | |
| 	}
 | |
| 
 | |
| 	/* Update the data length and the number of new word tuples
 | |
| 	added in this round of tokenization */
 | |
| 	for (ulint i = 0; i <  FTS_NUM_AUX_INDEX; i++) {
 | |
| 		/* The computation of total_size below assumes that no
 | |
| 		delete-mark flags will be stored and that all fields
 | |
| 		are NOT NULL and fixed-length. */
 | |
| 
 | |
| 		sort_buf[i]->total_size += data_size[i];
 | |
| 
 | |
| 		sort_buf[i]->n_tuples += n_tuple[i];
 | |
| 
 | |
| 		merge_file[i]->n_rec += n_tuple[i];
 | |
| 		t_ctx->rows_added[i] += n_tuple[i];
 | |
| 	}
 | |
| 
 | |
| 	if (!buf_full) {
 | |
| 		/* we pad one byte between text accross two fields */
 | |
| 		t_ctx->init_pos += doc->text.f_len + 1;
 | |
| 	}
 | |
| 
 | |
| 	return(!buf_full);
 | |
| }
 | |
| 
 | |
| /*********************************************************************//**
 | |
| Get next doc item from fts_doc_list */
 | |
| UNIV_INLINE
 | |
| void
 | |
| row_merge_fts_get_next_doc_item(
 | |
| /*============================*/
 | |
| 	fts_psort_t*		psort_info,	/*!< in: psort_info */
 | |
| 	fts_doc_item_t**	doc_item)	/*!< in/out: doc item */
 | |
| {
 | |
| 	if (*doc_item != NULL) {
 | |
| 		ut_free(*doc_item);
 | |
| 	}
 | |
| 
 | |
| 	mysql_mutex_lock(&psort_info->mutex);
 | |
| 
 | |
| 	*doc_item = UT_LIST_GET_FIRST(psort_info->fts_doc_list);
 | |
| 	if (*doc_item != NULL) {
 | |
| 		UT_LIST_REMOVE(psort_info->fts_doc_list, *doc_item);
 | |
| 
 | |
| 		ut_ad(psort_info->memory_used >= sizeof(fts_doc_item_t)
 | |
| 		      + (*doc_item)->field->len);
 | |
| 		psort_info->memory_used -= sizeof(fts_doc_item_t)
 | |
| 			+ (*doc_item)->field->len;
 | |
| 	}
 | |
| 
 | |
| 	mysql_mutex_unlock(&psort_info->mutex);
 | |
| }
 | |
| 
 | |
| /*********************************************************************//**
 | |
| Function performs parallel tokenization of the incoming doc strings.
 | |
| It also performs the initial in memory sort of the parsed records.
 | |
| */
 | |
| static
 | |
| void fts_parallel_tokenization(
 | |
| /*======================*/
 | |
| 	void*		arg)	/*!< in: psort_info for the thread */
 | |
| {
 | |
| 	fts_psort_t*		psort_info = (fts_psort_t*) arg;
 | |
| 	ulint			i;
 | |
| 	fts_doc_item_t*		doc_item = NULL;
 | |
| 	row_merge_buf_t**	buf;
 | |
| 	ibool			processed = FALSE;
 | |
| 	merge_file_t**		merge_file;
 | |
| 	row_merge_block_t**	block;
 | |
| 	row_merge_block_t**	crypt_block;
 | |
| 	pfs_os_file_t		tmpfd[FTS_NUM_AUX_INDEX];
 | |
| 	ulint			mycount[FTS_NUM_AUX_INDEX];
 | |
| 	ulint			num_doc_processed = 0;
 | |
| 	doc_id_t		last_doc_id = 0;
 | |
| 	mem_heap_t*		blob_heap = NULL;
 | |
| 	fts_doc_t		doc;
 | |
| 	dict_table_t*		table = psort_info->psort_common->new_table;
 | |
| 	fts_tokenize_ctx_t	t_ctx;
 | |
| 	ulint			retried = 0;
 | |
| 	dberr_t			error = DB_SUCCESS;
 | |
| 
 | |
| 	ut_ad(psort_info->psort_common->trx->mysql_thd != NULL);
 | |
| 
 | |
| 	/* const char*		path = thd_innodb_tmpdir(
 | |
| 		psort_info->psort_common->trx->mysql_thd);
 | |
| 	*/
 | |
| 
 | |
| 	ut_ad(psort_info->psort_common->trx->mysql_thd != NULL);
 | |
| 
 | |
| 	const char*		path = thd_innodb_tmpdir(
 | |
| 		psort_info->psort_common->trx->mysql_thd);
 | |
| 
 | |
| 	ut_ad(psort_info);
 | |
| 
 | |
| 	buf = psort_info->merge_buf;
 | |
| 	merge_file = psort_info->merge_file;
 | |
| 	blob_heap = mem_heap_create(512);
 | |
| 	memset(&doc, 0, sizeof(doc));
 | |
| 	memset(mycount, 0, FTS_NUM_AUX_INDEX * sizeof(int));
 | |
| 
 | |
| 	doc.charset = fts_index_get_charset(
 | |
| 		psort_info->psort_common->dup->index);
 | |
| 
 | |
| 	block = psort_info->merge_block;
 | |
| 	crypt_block = psort_info->crypt_block;
 | |
| 
 | |
| 	const ulint zip_size = psort_info->psort_common->old_zip_size;
 | |
| 
 | |
| 	row_merge_fts_get_next_doc_item(psort_info, &doc_item);
 | |
| 
 | |
| 	t_ctx.cached_stopword = table->fts->cache->stopword_info.cached_stopword;
 | |
| 	processed = TRUE;
 | |
| loop:
 | |
| 	while (doc_item) {
 | |
| 		dfield_t*	dfield = doc_item->field;
 | |
| 
 | |
| 		last_doc_id = doc_item->doc_id;
 | |
| 
 | |
| 		ut_ad (dfield->data != NULL
 | |
| 		       && dfield_get_len(dfield) != UNIV_SQL_NULL);
 | |
| 
 | |
| 		/* If finish processing the last item, update "doc" with
 | |
| 		strings in the doc_item, otherwise continue processing last
 | |
| 		item */
 | |
| 		if (processed) {
 | |
| 			byte*		data;
 | |
| 			ulint		data_len;
 | |
| 
 | |
| 			dfield = doc_item->field;
 | |
| 			data = static_cast<byte*>(dfield_get_data(dfield));
 | |
| 			data_len = dfield_get_len(dfield);
 | |
| 
 | |
| 			if (dfield_is_ext(dfield)) {
 | |
| 				doc.text.f_str =
 | |
| 					btr_copy_externally_stored_field(
 | |
| 						&doc.text.f_len, data,
 | |
| 						zip_size, data_len, blob_heap);
 | |
| 			} else {
 | |
| 				doc.text.f_str = data;
 | |
| 				doc.text.f_len = data_len;
 | |
| 			}
 | |
| 
 | |
| 			doc.tokens = 0;
 | |
| 			t_ctx.processed_len = 0;
 | |
| 		} else {
 | |
| 			/* Not yet finish processing the "doc" on hand,
 | |
| 			continue processing it */
 | |
| 			ut_ad(doc.text.f_str);
 | |
| 			ut_ad(buf[0]->index->parser
 | |
| 			      || t_ctx.processed_len < doc.text.f_len);
 | |
| 		}
 | |
| 
 | |
| 		processed = row_merge_fts_doc_tokenize(
 | |
| 			buf, doc_item->doc_id, &doc,
 | |
| 			merge_file, psort_info->psort_common->opt_doc_id_size,
 | |
| 			&t_ctx);
 | |
| 
 | |
| 		/* Current sort buffer full, need to recycle */
 | |
| 		if (!processed) {
 | |
| 			ut_ad(buf[0]->index->parser
 | |
| 			      || t_ctx.processed_len < doc.text.f_len);
 | |
| 			ut_ad(t_ctx.rows_added[t_ctx.buf_used]);
 | |
| 			break;
 | |
| 		}
 | |
| 
 | |
| 		num_doc_processed++;
 | |
| 
 | |
| 		if (UNIV_UNLIKELY(fts_enable_diag_print)
 | |
| 		    && num_doc_processed % 10000 == 1) {
 | |
| 			ib::info() << "Number of documents processed: "
 | |
| 				<< num_doc_processed;
 | |
| #ifdef FTS_INTERNAL_DIAG_PRINT
 | |
| 			for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
 | |
| 				ib::info() << "ID " << psort_info->psort_id
 | |
| 					<< ", partition " << i << ", word "
 | |
| 					<< mycount[i];
 | |
| 			}
 | |
| #endif
 | |
| 		}
 | |
| 
 | |
| 		mem_heap_empty(blob_heap);
 | |
| 
 | |
| 		row_merge_fts_get_next_doc_item(psort_info, &doc_item);
 | |
| 
 | |
| 		if (doc_item && last_doc_id != doc_item->doc_id) {
 | |
| 			t_ctx.init_pos = 0;
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	/* If we run out of current sort buffer, need to sort
 | |
| 	and flush the sort buffer to disk */
 | |
| 	if (t_ctx.rows_added[t_ctx.buf_used] && !processed) {
 | |
| 		row_merge_buf_sort(buf[t_ctx.buf_used], NULL);
 | |
| 		row_merge_buf_write(buf[t_ctx.buf_used],
 | |
| 				    merge_file[t_ctx.buf_used],
 | |
| 				    block[t_ctx.buf_used]);
 | |
| 
 | |
| 		if (!row_merge_write(merge_file[t_ctx.buf_used]->fd,
 | |
| 				     merge_file[t_ctx.buf_used]->offset++,
 | |
| 				     block[t_ctx.buf_used],
 | |
| 				     crypt_block[t_ctx.buf_used],
 | |
| 				     table->space_id)) {
 | |
| 			error = DB_TEMP_FILE_WRITE_FAIL;
 | |
| 			goto func_exit;
 | |
| 		}
 | |
| 
 | |
| 		MEM_UNDEFINED(block[t_ctx.buf_used], srv_sort_buf_size);
 | |
| 		buf[t_ctx.buf_used] = row_merge_buf_empty(buf[t_ctx.buf_used]);
 | |
| 		mycount[t_ctx.buf_used] += t_ctx.rows_added[t_ctx.buf_used];
 | |
| 		t_ctx.rows_added[t_ctx.buf_used] = 0;
 | |
| 
 | |
| 		ut_a(doc_item);
 | |
| 		goto loop;
 | |
| 	}
 | |
| 
 | |
| 	/* Parent done scanning, and if finish processing all the docs, exit */
 | |
| 	if (psort_info->state == FTS_PARENT_COMPLETE) {
 | |
| 		if (UT_LIST_GET_LEN(psort_info->fts_doc_list) == 0) {
 | |
| 			goto exit;
 | |
| 		} else if (retried > 10000) {
 | |
| 			ut_ad(!doc_item);
 | |
| 			/* retried too many times and cannot get new record */
 | |
| 			ib::error() << "FTS parallel sort processed "
 | |
| 				<< num_doc_processed
 | |
| 				<< " records, the sort queue has "
 | |
| 				<< UT_LIST_GET_LEN(psort_info->fts_doc_list)
 | |
| 				<< " records. But sort cannot get the next"
 | |
| 				" records during alter table " << table->name;
 | |
| 			goto exit;
 | |
| 		}
 | |
| 	} else if (psort_info->state == FTS_PARENT_EXITING) {
 | |
| 		/* Parent abort */
 | |
| 		goto func_exit;
 | |
| 	}
 | |
| 
 | |
| 	if (doc_item == NULL) {
 | |
| 		std::this_thread::yield();
 | |
| 	}
 | |
| 
 | |
| 	row_merge_fts_get_next_doc_item(psort_info, &doc_item);
 | |
| 
 | |
| 	if (doc_item != NULL) {
 | |
| 		if (last_doc_id != doc_item->doc_id) {
 | |
| 			t_ctx.init_pos = 0;
 | |
| 		}
 | |
| 
 | |
| 		retried = 0;
 | |
| 	} else if (psort_info->state == FTS_PARENT_COMPLETE) {
 | |
| 		retried++;
 | |
| 	}
 | |
| 
 | |
| 	goto loop;
 | |
| 
 | |
| exit:
 | |
| 	/* Do a final sort of the last (or latest) batch of records
 | |
| 	in block memory. Flush them to temp file if records cannot
 | |
| 	be hold in one block memory */
 | |
| 	for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
 | |
| 		if (t_ctx.rows_added[i]) {
 | |
| 			row_merge_buf_sort(buf[i], NULL);
 | |
| 			row_merge_buf_write(
 | |
| 				buf[i], merge_file[i], block[i]);
 | |
| 
 | |
| 			/* Write to temp file, only if records have
 | |
| 			been flushed to temp file before (offset > 0):
 | |
| 			The pseudo code for sort is following:
 | |
| 
 | |
| 				while (there are rows) {
 | |
| 					tokenize rows, put result in block[]
 | |
| 					if (block[] runs out) {
 | |
| 						sort rows;
 | |
| 						write to temp file with
 | |
| 						row_merge_write();
 | |
| 						offset++;
 | |
| 					}
 | |
| 				}
 | |
| 
 | |
| 				# write out the last batch
 | |
| 				if (offset > 0) {
 | |
| 					row_merge_write();
 | |
| 					offset++;
 | |
| 				} else {
 | |
| 					# no need to write anything
 | |
| 					offset stay as 0
 | |
| 				}
 | |
| 
 | |
| 			so if merge_file[i]->offset is 0 when we come to
 | |
| 			here as the last batch, this means rows have
 | |
| 			never flush to temp file, it can be held all in
 | |
| 			memory */
 | |
| 			if (merge_file[i]->offset != 0) {
 | |
| 				if (!row_merge_write(merge_file[i]->fd,
 | |
| 						merge_file[i]->offset++,
 | |
| 						block[i],
 | |
| 						crypt_block[i],
 | |
| 						table->space_id)) {
 | |
| 					error = DB_TEMP_FILE_WRITE_FAIL;
 | |
| 					goto func_exit;
 | |
| 				}
 | |
| 
 | |
| #ifdef HAVE_valgrind
 | |
| 				MEM_UNDEFINED(block[i], srv_sort_buf_size);
 | |
| 
 | |
| 				if (crypt_block[i]) {
 | |
| 					MEM_UNDEFINED(crypt_block[i],
 | |
| 						      srv_sort_buf_size);
 | |
| 				}
 | |
| #endif /* HAVE_valgrind */
 | |
| 			}
 | |
| 
 | |
| 			buf[i] = row_merge_buf_empty(buf[i]);
 | |
| 			t_ctx.rows_added[i] = 0;
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if (UNIV_UNLIKELY(fts_enable_diag_print)) {
 | |
| 		DEBUG_FTS_SORT_PRINT("  InnoDB_FTS: start merge sort\n");
 | |
| 	}
 | |
| 
 | |
| 	for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
 | |
| 		if (!merge_file[i]->offset) {
 | |
| 			continue;
 | |
| 		}
 | |
| 
 | |
| 		tmpfd[i] = row_merge_file_create_low(path);
 | |
| 		if (tmpfd[i] == OS_FILE_CLOSED) {
 | |
| 			error = DB_OUT_OF_MEMORY;
 | |
| 			goto func_exit;
 | |
| 		}
 | |
| 
 | |
| 		error = row_merge_sort(psort_info->psort_common->trx,
 | |
| 				       psort_info->psort_common->dup,
 | |
| 				       merge_file[i], block[i], &tmpfd[i],
 | |
| 				       false, 0.0/* pct_progress */, 0.0/* pct_cost */,
 | |
| 				       crypt_block[i], table->space_id);
 | |
| 
 | |
| 		if (error != DB_SUCCESS) {
 | |
| 			row_merge_file_destroy_low(tmpfd[i]);
 | |
| 			goto func_exit;
 | |
| 		}
 | |
| 
 | |
| 		row_merge_file_destroy_low(tmpfd[i]);
 | |
| 	}
 | |
| 
 | |
| func_exit:
 | |
| 	if (UNIV_UNLIKELY(fts_enable_diag_print)) {
 | |
| 		DEBUG_FTS_SORT_PRINT("  InnoDB_FTS: complete merge sort\n");
 | |
| 	}
 | |
| 
 | |
| 	mem_heap_free(blob_heap);
 | |
| 
 | |
| 	mysql_mutex_lock(&psort_info->mutex);
 | |
| 	psort_info->error = error;
 | |
| 	mysql_mutex_unlock(&psort_info->mutex);
 | |
| 
 | |
| 	if (UT_LIST_GET_LEN(psort_info->fts_doc_list) > 0) {
 | |
| 		/* child can exit either with error or told by parent. */
 | |
| 		ut_ad(error != DB_SUCCESS
 | |
| 		      || psort_info->state == FTS_PARENT_EXITING);
 | |
| 	}
 | |
| 
 | |
| 	/* Free fts doc list in case of error. */
 | |
| 	do {
 | |
| 		row_merge_fts_get_next_doc_item(psort_info, &doc_item);
 | |
| 	} while (doc_item != NULL);
 | |
| 
 | |
| 	mysql_mutex_lock(&psort_info->mutex);
 | |
| 	psort_info->child_status = FTS_CHILD_COMPLETE;
 | |
| 	pthread_cond_signal(&psort_info->psort_common->sort_cond);
 | |
| 	mysql_mutex_unlock(&psort_info->mutex);
 | |
| }
 | |
| 
 | |
| /*********************************************************************//**
 | |
| Start the parallel tokenization and parallel merge sort */
 | |
| void
 | |
| row_fts_start_psort(
 | |
| /*================*/
 | |
| 	fts_psort_t*	psort_info)	/*!< parallel sort structure */
 | |
| {
 | |
| 	ulint		i = 0;
 | |
| 
 | |
| 	for (i = 0; i < fts_sort_pll_degree; i++) {
 | |
| 		psort_info[i].psort_id = i;
 | |
| 		psort_info[i].task =
 | |
| 			new tpool::waitable_task(fts_parallel_tokenization,&psort_info[i]);
 | |
| 		srv_thread_pool->submit_task(psort_info[i].task);
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /*********************************************************************//**
 | |
| Function performs the merge and insertion of the sorted records. */
 | |
| static
 | |
| void
 | |
| fts_parallel_merge(
 | |
| /*===============*/
 | |
| 	void*		arg)		/*!< in: parallel merge info */
 | |
| {
 | |
| 	fts_psort_t*	psort_info = (fts_psort_t*) arg;
 | |
| 	ulint		id;
 | |
| 
 | |
| 	ut_ad(psort_info);
 | |
| 
 | |
| 	id = psort_info->psort_id;
 | |
| 
 | |
| 	row_fts_merge_insert(psort_info->psort_common->dup->index,
 | |
| 			     psort_info->psort_common->new_table,
 | |
| 			     psort_info->psort_common->all_info, id);
 | |
| }
 | |
| 
 | |
| /*********************************************************************//**
 | |
| Kick off the parallel merge and insert thread */
 | |
| void
 | |
| row_fts_start_parallel_merge(
 | |
| /*=========================*/
 | |
| 	fts_psort_t*	merge_info)	/*!< in: parallel sort info */
 | |
| {
 | |
| 	ulint		i = 0;
 | |
| 
 | |
| 	/* Kick off merge/insert tasks */
 | |
| 	for (i = 0; i <  FTS_NUM_AUX_INDEX; i++) {
 | |
| 		merge_info[i].psort_id = i;
 | |
| 		merge_info[i].child_status = 0;
 | |
| 
 | |
| 		merge_info[i].task = new tpool::waitable_task(
 | |
| 			fts_parallel_merge,
 | |
| 			(void*) &merge_info[i]);
 | |
| 		srv_thread_pool->submit_task(merge_info[i].task);
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /**
 | |
| Write out a single word's data as new entry/entries in the INDEX table.
 | |
| @param[in]	ins_ctx	insert context
 | |
| @param[in]	word	word string
 | |
| @param[in]	node	node colmns
 | |
| @return	DB_SUCCUESS if insertion runs fine, otherwise error code */
 | |
| static
 | |
| dberr_t
 | |
| row_merge_write_fts_node(
 | |
| 	const	fts_psort_insert_t*	ins_ctx,
 | |
| 	const	fts_string_t*		word,
 | |
| 	const	fts_node_t*		node)
 | |
| {
 | |
| 	dtuple_t*	tuple;
 | |
| 	dfield_t*	field;
 | |
| 	dberr_t		ret = DB_SUCCESS;
 | |
| 	doc_id_t	write_first_doc_id[8];
 | |
| 	doc_id_t	write_last_doc_id[8];
 | |
| 	ib_uint32_t	write_doc_count;
 | |
| 
 | |
| 	tuple = ins_ctx->tuple;
 | |
| 
 | |
| 	/* The first field is the tokenized word */
 | |
| 	field = dtuple_get_nth_field(tuple, 0);
 | |
| 	dfield_set_data(field, word->f_str, word->f_len);
 | |
| 
 | |
| 	/* The second field is first_doc_id */
 | |
| 	field = dtuple_get_nth_field(tuple, 1);
 | |
| 	fts_write_doc_id((byte*)&write_first_doc_id, node->first_doc_id);
 | |
| 	dfield_set_data(field, &write_first_doc_id, sizeof(doc_id_t));
 | |
| 
 | |
| 	/* The third and fourth fileds(TRX_ID, ROLL_PTR) are filled already.*/
 | |
| 	/* The fifth field is last_doc_id */
 | |
| 	field = dtuple_get_nth_field(tuple, 4);
 | |
| 	fts_write_doc_id((byte*)&write_last_doc_id, node->last_doc_id);
 | |
| 	dfield_set_data(field, &write_last_doc_id, sizeof(doc_id_t));
 | |
| 
 | |
| 	/* The sixth field is doc_count */
 | |
| 	field = dtuple_get_nth_field(tuple, 5);
 | |
| 	mach_write_to_4((byte*)&write_doc_count, (ib_uint32_t)node->doc_count);
 | |
| 	dfield_set_data(field, &write_doc_count, sizeof(ib_uint32_t));
 | |
| 
 | |
| 	/* The seventh field is ilist */
 | |
| 	field = dtuple_get_nth_field(tuple, 6);
 | |
| 	dfield_set_data(field, node->ilist, node->ilist_size);
 | |
| 
 | |
| 	ret = ins_ctx->btr_bulk->insert(tuple);
 | |
| 
 | |
| 	return(ret);
 | |
| }
 | |
| 
 | |
| /********************************************************************//**
 | |
| Insert processed FTS data to auxillary index tables.
 | |
| @return DB_SUCCESS if insertion runs fine */
 | |
| static MY_ATTRIBUTE((nonnull))
 | |
| dberr_t
 | |
| row_merge_write_fts_word(
 | |
| /*=====================*/
 | |
| 	fts_psort_insert_t*	ins_ctx,	/*!< in: insert context */
 | |
| 	fts_tokenizer_word_t*	word)		/*!< in: sorted and tokenized
 | |
| 						word */
 | |
| {
 | |
| 	dberr_t	ret = DB_SUCCESS;
 | |
| 
 | |
| 	ut_ad(ins_ctx->aux_index_id == fts_select_index(
 | |
| 		ins_ctx->charset, word->text.f_str, word->text.f_len));
 | |
| 
 | |
| 	/* Pop out each fts_node in word->nodes write them to auxiliary table */
 | |
| 	for (ulint i = 0; i < ib_vector_size(word->nodes); i++) {
 | |
| 		dberr_t		error;
 | |
| 		fts_node_t*	fts_node;
 | |
| 
 | |
| 		fts_node = static_cast<fts_node_t*>(ib_vector_get(word->nodes, i));
 | |
| 
 | |
| 		error = row_merge_write_fts_node(ins_ctx, &word->text, fts_node);
 | |
| 
 | |
| 		if (UNIV_UNLIKELY(error != DB_SUCCESS)) {
 | |
| 			ib::error() << "Failed to write word to FTS auxiliary"
 | |
| 				" index table "
 | |
| 				<< ins_ctx->btr_bulk->table_name()
 | |
| 				<< ", error " << error;
 | |
| 			ret = error;
 | |
| 		}
 | |
| 
 | |
| 		ut_free(fts_node->ilist);
 | |
| 		fts_node->ilist = NULL;
 | |
| 	}
 | |
| 
 | |
| 	ib_vector_reset(word->nodes);
 | |
| 
 | |
| 	return(ret);
 | |
| }
 | |
| 
 | |
| /*********************************************************************//**
 | |
| Read sorted FTS data files and insert data tuples to auxillary tables.
 | |
| @return DB_SUCCESS or error number */
 | |
| static
 | |
| void
 | |
| row_fts_insert_tuple(
 | |
| /*=================*/
 | |
| 	fts_psort_insert_t*
 | |
| 			ins_ctx,	/*!< in: insert context */
 | |
| 	fts_tokenizer_word_t* word,	/*!< in: last processed
 | |
| 					tokenized word */
 | |
| 	ib_vector_t*	positions,	/*!< in: word position */
 | |
| 	doc_id_t*	in_doc_id,	/*!< in: last item doc id */
 | |
| 	dtuple_t*	dtuple)		/*!< in: entry to insert */
 | |
| {
 | |
| 	fts_node_t*	fts_node = NULL;
 | |
| 	dfield_t*	dfield;
 | |
| 	doc_id_t	doc_id;
 | |
| 	ulint		position;
 | |
| 	fts_string_t	token_word;
 | |
| 	ulint		i;
 | |
| 
 | |
| 	/* Get fts_node for the FTS auxillary INDEX table */
 | |
| 	if (ib_vector_size(word->nodes) > 0) {
 | |
| 		fts_node = static_cast<fts_node_t*>(
 | |
| 			ib_vector_last(word->nodes));
 | |
| 	}
 | |
| 
 | |
| 	if (fts_node == NULL
 | |
| 	    || fts_node->ilist_size > FTS_ILIST_MAX_SIZE) {
 | |
| 
 | |
| 		fts_node = static_cast<fts_node_t*>(
 | |
| 			ib_vector_push(word->nodes, NULL));
 | |
| 
 | |
| 		memset(fts_node, 0x0, sizeof(*fts_node));
 | |
| 	}
 | |
| 
 | |
| 	/* If dtuple == NULL, this is the last word to be processed */
 | |
| 	if (!dtuple) {
 | |
| 		if (fts_node && ib_vector_size(positions) > 0) {
 | |
| 			fts_cache_node_add_positions(
 | |
| 				NULL, fts_node, *in_doc_id,
 | |
| 				positions);
 | |
| 
 | |
| 			/* Write out the current word */
 | |
| 			row_merge_write_fts_word(ins_ctx, word);
 | |
| 		}
 | |
| 
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| 	/* Get the first field for the tokenized word */
 | |
| 	dfield = dtuple_get_nth_field(dtuple, 0);
 | |
| 
 | |
| 	token_word.f_n_char = 0;
 | |
| 	token_word.f_len = dfield->len;
 | |
| 	token_word.f_str = static_cast<byte*>(dfield_get_data(dfield));
 | |
| 
 | |
| 	if (!word->text.f_str) {
 | |
| 		fts_string_dup(&word->text, &token_word, ins_ctx->heap);
 | |
| 	}
 | |
| 
 | |
| 	/* compare to the last word, to see if they are the same
 | |
| 	word */
 | |
| 	if (innobase_fts_text_cmp(ins_ctx->charset,
 | |
| 				  &word->text, &token_word) != 0) {
 | |
| 		ulint	num_item;
 | |
| 
 | |
| 		/* Getting a new word, flush the last position info
 | |
| 		for the currnt word in fts_node */
 | |
| 		if (ib_vector_size(positions) > 0) {
 | |
| 			fts_cache_node_add_positions(
 | |
| 				NULL, fts_node, *in_doc_id, positions);
 | |
| 		}
 | |
| 
 | |
| 		/* Write out the current word */
 | |
| 		row_merge_write_fts_word(ins_ctx, word);
 | |
| 
 | |
| 		/* Copy the new word */
 | |
| 		fts_string_dup(&word->text, &token_word, ins_ctx->heap);
 | |
| 
 | |
| 		num_item = ib_vector_size(positions);
 | |
| 
 | |
| 		/* Clean up position queue */
 | |
| 		for (i = 0; i < num_item; i++) {
 | |
| 			ib_vector_pop(positions);
 | |
| 		}
 | |
| 
 | |
| 		/* Reset Doc ID */
 | |
| 		*in_doc_id = 0;
 | |
| 		memset(fts_node, 0x0, sizeof(*fts_node));
 | |
| 	}
 | |
| 
 | |
| 	/* Get the word's Doc ID */
 | |
| 	dfield = dtuple_get_nth_field(dtuple, 1);
 | |
| 
 | |
| 	if (!ins_ctx->opt_doc_id_size) {
 | |
| 		doc_id = fts_read_doc_id(
 | |
| 			static_cast<byte*>(dfield_get_data(dfield)));
 | |
| 	} else {
 | |
| 		doc_id = (doc_id_t) mach_read_from_4(
 | |
| 			static_cast<byte*>(dfield_get_data(dfield)));
 | |
| 	}
 | |
| 
 | |
| 	/* Get the word's position info */
 | |
| 	dfield = dtuple_get_nth_field(dtuple, 2);
 | |
| 	position = mach_read_from_4(static_cast<byte*>(dfield_get_data(dfield)));
 | |
| 
 | |
| 	/* If this is the same word as the last word, and they
 | |
| 	have the same Doc ID, we just need to add its position
 | |
| 	info. Otherwise, we will flush position info to the
 | |
| 	fts_node and initiate a new position vector  */
 | |
| 	if (!(*in_doc_id) || *in_doc_id == doc_id) {
 | |
| 		ib_vector_push(positions, &position);
 | |
| 	} else {
 | |
| 		ulint	num_pos = ib_vector_size(positions);
 | |
| 
 | |
| 		fts_cache_node_add_positions(NULL, fts_node,
 | |
| 					     *in_doc_id, positions);
 | |
| 		for (i = 0; i < num_pos; i++) {
 | |
| 			ib_vector_pop(positions);
 | |
| 		}
 | |
| 		ib_vector_push(positions, &position);
 | |
| 	}
 | |
| 
 | |
| 	/* record the current Doc ID */
 | |
| 	*in_doc_id = doc_id;
 | |
| }
 | |
| 
 | |
| /*********************************************************************//**
 | |
| Propagate a newly added record up one level in the selection tree
 | |
| @return parent where this value propagated to */
 | |
| static
 | |
| ulint
 | |
| row_fts_sel_tree_propagate(
 | |
| /*=======================*/
 | |
| 	ulint		propogated,	/*<! in: tree node propagated */
 | |
| 	int*		sel_tree,	/*<! in: selection tree */
 | |
| 	const mrec_t**	mrec,		/*<! in: sort record */
 | |
| 	rec_offs**	offsets,	/*<! in: record offsets */
 | |
| 	dict_index_t*	index)		/*<! in/out: FTS index */
 | |
| {
 | |
| 	ulint	parent;
 | |
| 	int	child_left;
 | |
| 	int	child_right;
 | |
| 	int	selected;
 | |
| 
 | |
| 	/* Find which parent this value will be propagated to */
 | |
| 	parent = (propogated - 1) / 2;
 | |
| 
 | |
| 	/* Find out which value is smaller, and to propagate */
 | |
| 	child_left = sel_tree[parent * 2 + 1];
 | |
| 	child_right = sel_tree[parent * 2 + 2];
 | |
| 
 | |
| 	if (child_left == -1 || mrec[child_left] == NULL) {
 | |
| 		if (child_right == -1
 | |
| 		    || mrec[child_right] == NULL) {
 | |
| 			selected = -1;
 | |
| 		} else {
 | |
| 			selected = child_right ;
 | |
| 		}
 | |
| 	} else if (child_right == -1
 | |
| 		   || mrec[child_right] == NULL) {
 | |
| 		selected = child_left;
 | |
| 	} else if (cmp_rec_rec_simple(mrec[child_left], mrec[child_right],
 | |
| 				      offsets[child_left],
 | |
| 				      offsets[child_right],
 | |
| 				      index, NULL) < 0) {
 | |
| 		selected = child_left;
 | |
| 	} else {
 | |
| 		selected = child_right;
 | |
| 	}
 | |
| 
 | |
| 	sel_tree[parent] = selected;
 | |
| 
 | |
| 	return parent;
 | |
| }
 | |
| 
 | |
| /*********************************************************************//**
 | |
| Readjust selection tree after popping the root and read a new value
 | |
| @return the new root */
 | |
| static
 | |
| int
 | |
| row_fts_sel_tree_update(
 | |
| /*====================*/
 | |
| 	int*		sel_tree,	/*<! in/out: selection tree */
 | |
| 	ulint		propagated,	/*<! in: node to propagate up */
 | |
| 	ulint		height,		/*<! in: tree height */
 | |
| 	const mrec_t**	mrec,		/*<! in: sort record */
 | |
| 	rec_offs**	offsets,	/*<! in: record offsets */
 | |
| 	dict_index_t*	index)		/*<! in: index dictionary */
 | |
| {
 | |
| 	ulint	i;
 | |
| 
 | |
| 	for (i = 1; i <= height; i++) {
 | |
| 		propagated = row_fts_sel_tree_propagate(
 | |
| 			propagated, sel_tree, mrec, offsets, index);
 | |
| 	}
 | |
| 
 | |
| 	return(sel_tree[0]);
 | |
| }
 | |
| 
 | |
| /*********************************************************************//**
 | |
| Build selection tree at a specified level */
 | |
| static
 | |
| void
 | |
| row_fts_build_sel_tree_level(
 | |
| /*=========================*/
 | |
| 	int*		sel_tree,	/*<! in/out: selection tree */
 | |
| 	ulint		level,		/*<! in: selection tree level */
 | |
| 	const mrec_t**	mrec,		/*<! in: sort record */
 | |
| 	rec_offs**	offsets,	/*<! in: record offsets */
 | |
| 	dict_index_t*	index)		/*<! in: index dictionary */
 | |
| {
 | |
| 	ulint	start;
 | |
| 	int	child_left;
 | |
| 	int	child_right;
 | |
| 	ulint	i;
 | |
| 	ulint	num_item	= ulint(1) << level;
 | |
| 
 | |
| 	start = num_item - 1;
 | |
| 
 | |
| 	for (i = 0; i < num_item;  i++) {
 | |
| 		child_left = sel_tree[(start + i) * 2 + 1];
 | |
| 		child_right = sel_tree[(start + i) * 2 + 2];
 | |
| 
 | |
| 		if (child_left == -1) {
 | |
| 			if (child_right == -1) {
 | |
| 				sel_tree[start + i] = -1;
 | |
| 			} else {
 | |
| 				sel_tree[start + i] =  child_right;
 | |
| 			}
 | |
| 			continue;
 | |
| 		} else if (child_right == -1) {
 | |
| 			sel_tree[start + i] = child_left;
 | |
| 			continue;
 | |
| 		}
 | |
| 
 | |
| 		/* Deal with NULL child conditions */
 | |
| 		if (!mrec[child_left]) {
 | |
| 			if (!mrec[child_right]) {
 | |
| 				sel_tree[start + i] = -1;
 | |
| 			} else {
 | |
| 				sel_tree[start + i] = child_right;
 | |
| 			}
 | |
| 			continue;
 | |
| 		} else if (!mrec[child_right]) {
 | |
| 			sel_tree[start + i] = child_left;
 | |
| 			continue;
 | |
| 		}
 | |
| 
 | |
| 		/* Select the smaller one to set parent pointer */
 | |
| 		int cmp = cmp_rec_rec_simple(
 | |
| 			mrec[child_left], mrec[child_right],
 | |
| 			offsets[child_left], offsets[child_right],
 | |
| 			index, NULL);
 | |
| 
 | |
| 		sel_tree[start + i] = cmp < 0 ? child_left : child_right;
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /*********************************************************************//**
 | |
| Build a selection tree for merge. The selection tree is a binary tree
 | |
| and should have fts_sort_pll_degree / 2 levels. With root as level 0
 | |
| @return number of tree levels */
 | |
| static
 | |
| ulint
 | |
| row_fts_build_sel_tree(
 | |
| /*===================*/
 | |
| 	int*		sel_tree,	/*<! in/out: selection tree */
 | |
| 	const mrec_t**	mrec,		/*<! in: sort record */
 | |
| 	rec_offs**	offsets,	/*<! in: record offsets */
 | |
| 	dict_index_t*	index)		/*<! in: index dictionary */
 | |
| {
 | |
| 	ulint	treelevel = 1;
 | |
| 	ulint	num = 2;
 | |
| 	ulint	i = 0;
 | |
| 	ulint	start;
 | |
| 
 | |
| 	/* No need to build selection tree if we only have two merge threads */
 | |
| 	if (fts_sort_pll_degree <= 2) {
 | |
| 		return(0);
 | |
| 	}
 | |
| 
 | |
| 	while (num < fts_sort_pll_degree) {
 | |
| 		num = num << 1;
 | |
| 		treelevel++;
 | |
| 	}
 | |
| 
 | |
| 	start = (ulint(1) << treelevel) - 1;
 | |
| 
 | |
| 	for (i = 0; i < fts_sort_pll_degree; i++) {
 | |
| 		sel_tree[i + start] = int(i);
 | |
| 	}
 | |
| 
 | |
| 	i = treelevel;
 | |
| 	do {
 | |
| 		row_fts_build_sel_tree_level(
 | |
| 			sel_tree, --i, mrec, offsets, index);
 | |
| 	} while (i > 0);
 | |
| 
 | |
| 	return(treelevel);
 | |
| }
 | |
| 
 | |
| /*********************************************************************//**
 | |
| Read sorted file containing index data tuples and insert these data
 | |
| tuples to the index
 | |
| @return DB_SUCCESS or error number */
 | |
| dberr_t
 | |
| row_fts_merge_insert(
 | |
| /*=================*/
 | |
| 	dict_index_t*		index,	/*!< in: index */
 | |
| 	dict_table_t*		table,	/*!< in: new table */
 | |
| 	fts_psort_t*		psort_info, /*!< parallel sort info */
 | |
| 	ulint			id)	/* !< in: which auxiliary table's data
 | |
| 					to insert to */
 | |
| {
 | |
| 	const byte**		b;
 | |
| 	mem_heap_t*		tuple_heap;
 | |
| 	mem_heap_t*		heap;
 | |
| 	dberr_t			error = DB_SUCCESS;
 | |
| 	ulint*			foffs;
 | |
| 	rec_offs**		offsets;
 | |
| 	fts_tokenizer_word_t	new_word;
 | |
| 	ib_vector_t*		positions;
 | |
| 	doc_id_t		last_doc_id;
 | |
| 	ib_alloc_t*		heap_alloc;
 | |
| 	ulint			i;
 | |
| 	mrec_buf_t**		buf;
 | |
| 	pfs_os_file_t*			fd;
 | |
| 	byte**			block;
 | |
| 	byte**			crypt_block;
 | |
| 	const mrec_t**		mrec;
 | |
| 	ulint			count = 0;
 | |
| 	int*			sel_tree;
 | |
| 	ulint			height;
 | |
| 	ulint			start;
 | |
| 	fts_psort_insert_t	ins_ctx;
 | |
| 	uint64_t		count_diag = 0;
 | |
| 	fts_table_t		fts_table;
 | |
| 	char			aux_table_name[MAX_FULL_NAME_LEN];
 | |
| 	dict_table_t*		aux_table;
 | |
| 	dict_index_t*		aux_index;
 | |
| 	trx_t*			trx;
 | |
| 
 | |
| 	/* We use the insert query graph as the dummy graph
 | |
| 	needed in the row module call */
 | |
| 
 | |
| 	trx = trx_create();
 | |
| 	trx_start_if_not_started(trx, true);
 | |
| 
 | |
| 	trx->op_info = "inserting index entries";
 | |
| 
 | |
| 	ins_ctx.opt_doc_id_size = psort_info[0].psort_common->opt_doc_id_size;
 | |
| 
 | |
| 	heap = mem_heap_create(500 + sizeof(mrec_buf_t));
 | |
| 
 | |
| 	b = (const byte**) mem_heap_alloc(
 | |
| 		heap, sizeof (*b) * fts_sort_pll_degree);
 | |
| 	foffs = (ulint*) mem_heap_alloc(
 | |
| 		heap, sizeof(*foffs) * fts_sort_pll_degree);
 | |
| 	offsets = (rec_offs**) mem_heap_alloc(
 | |
| 		heap, sizeof(*offsets) * fts_sort_pll_degree);
 | |
| 	buf = (mrec_buf_t**) mem_heap_alloc(
 | |
| 		heap, sizeof(*buf) * fts_sort_pll_degree);
 | |
| 	fd = (pfs_os_file_t*) mem_heap_alloc(heap, sizeof(*fd) * fts_sort_pll_degree);
 | |
| 	block = (byte**) mem_heap_alloc(
 | |
| 		heap, sizeof(*block) * fts_sort_pll_degree);
 | |
| 	crypt_block = (byte**) mem_heap_alloc(
 | |
| 		heap, sizeof(*block) * fts_sort_pll_degree);
 | |
| 	mrec = (const mrec_t**) mem_heap_alloc(
 | |
| 		heap, sizeof(*mrec) * fts_sort_pll_degree);
 | |
| 	sel_tree = (int*) mem_heap_alloc(
 | |
| 		heap, sizeof(*sel_tree) * (fts_sort_pll_degree * 2));
 | |
| 
 | |
| 	tuple_heap = mem_heap_create(1000);
 | |
| 
 | |
| 	ins_ctx.charset = fts_index_get_charset(index);
 | |
| 	ins_ctx.heap = heap;
 | |
| 
 | |
| 	for (i = 0; i < fts_sort_pll_degree; i++) {
 | |
| 		ulint	num;
 | |
| 
 | |
| 		num = 1 + REC_OFFS_HEADER_SIZE
 | |
| 			+ dict_index_get_n_fields(index);
 | |
| 		offsets[i] = static_cast<rec_offs*>(mem_heap_zalloc(
 | |
| 			heap, num * sizeof *offsets[i]));
 | |
| 		rec_offs_set_n_alloc(offsets[i], num);
 | |
| 		rec_offs_set_n_fields(offsets[i], dict_index_get_n_fields(index));
 | |
| 		block[i] = psort_info[i].merge_block[id];
 | |
| 		crypt_block[i] = psort_info[i].crypt_block[id];
 | |
| 		b[i] = psort_info[i].merge_block[id];
 | |
| 		fd[i] = psort_info[i].merge_file[id]->fd;
 | |
| 		foffs[i] = 0;
 | |
| 
 | |
| 		buf[i] = static_cast<mrec_buf_t*>(
 | |
| 			mem_heap_alloc(heap, sizeof *buf[i]));
 | |
| 
 | |
| 		count_diag += psort_info[i].merge_file[id]->n_rec;
 | |
| 	}
 | |
| 
 | |
| 	if (UNIV_UNLIKELY(fts_enable_diag_print)) {
 | |
| 		ib::info() << "InnoDB_FTS: to insert " << count_diag
 | |
| 			<< " records";
 | |
| 	}
 | |
| 
 | |
| 	/* Initialize related variables if creating FTS indexes */
 | |
| 	heap_alloc = ib_heap_allocator_create(heap);
 | |
| 
 | |
| 	memset(&new_word, 0, sizeof(new_word));
 | |
| 
 | |
| 	new_word.nodes = ib_vector_create(heap_alloc, sizeof(fts_node_t), 4);
 | |
| 	positions = ib_vector_create(heap_alloc, sizeof(ulint), 32);
 | |
| 	last_doc_id = 0;
 | |
| 
 | |
| 	/* We should set the flags2 with aux_table_name here,
 | |
| 	in order to get the correct aux table names. */
 | |
| 	index->table->flags2 |= DICT_TF2_FTS_AUX_HEX_NAME;
 | |
| 	fts_table.type = FTS_INDEX_TABLE;
 | |
| 	fts_table.index_id = index->id;
 | |
| 	fts_table.table_id = table->id;
 | |
| 	fts_table.table = index->table;
 | |
| 	fts_table.suffix = fts_get_suffix(id);
 | |
| 
 | |
| 	/* Get aux index */
 | |
| 	fts_get_table_name(&fts_table, aux_table_name);
 | |
| 	aux_table = dict_table_open_on_name(aux_table_name, false,
 | |
| 					    DICT_ERR_IGNORE_NONE);
 | |
| 	ut_ad(aux_table != NULL);
 | |
| 	aux_index = dict_table_get_first_index(aux_table);
 | |
| 
 | |
| 	ut_ad(!aux_index->is_instant());
 | |
| 	/* row_merge_write_fts_node() depends on the correct value */
 | |
| 	ut_ad(aux_index->n_core_null_bytes
 | |
| 	      == UT_BITS_IN_BYTES(aux_index->n_nullable));
 | |
| 
 | |
| 	/* Create bulk load instance */
 | |
| 	ins_ctx.btr_bulk = UT_NEW_NOKEY(BtrBulk(aux_index, trx));
 | |
| 
 | |
| 	/* Create tuple for insert */
 | |
| 	ins_ctx.tuple = dtuple_create(heap, dict_index_get_n_fields(aux_index));
 | |
| 	dict_index_copy_types(ins_ctx.tuple, aux_index,
 | |
| 			      dict_index_get_n_fields(aux_index));
 | |
| 
 | |
| 	/* Set TRX_ID and ROLL_PTR */
 | |
| 	dfield_set_data(dtuple_get_nth_field(ins_ctx.tuple, 2),
 | |
| 			&reset_trx_id, DATA_TRX_ID_LEN);
 | |
| 	dfield_set_data(dtuple_get_nth_field(ins_ctx.tuple, 3),
 | |
| 			&reset_trx_id[DATA_TRX_ID_LEN], DATA_ROLL_PTR_LEN);
 | |
| 
 | |
| 	ut_d(ins_ctx.aux_index_id = id);
 | |
| 
 | |
| 	const ulint space = table->space_id;
 | |
| 
 | |
| 	for (i = 0; i < fts_sort_pll_degree; i++) {
 | |
| 		if (psort_info[i].merge_file[id]->n_rec == 0) {
 | |
| 			/* No Rows to read */
 | |
| 			mrec[i] = b[i] = NULL;
 | |
| 		} else {
 | |
| 			/* Read from temp file only if it has been
 | |
| 			written to. Otherwise, block memory holds
 | |
| 			all the sorted records */
 | |
| 			if (psort_info[i].merge_file[id]->offset > 0
 | |
| 			    && (!row_merge_read(
 | |
| 					fd[i], foffs[i],
 | |
| 					(row_merge_block_t*) block[i],
 | |
| 					(row_merge_block_t*) crypt_block[i],
 | |
| 					space))) {
 | |
| 				error = DB_CORRUPTION;
 | |
| 				goto exit;
 | |
| 			}
 | |
| 
 | |
| 			ROW_MERGE_READ_GET_NEXT(i);
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	height = row_fts_build_sel_tree(sel_tree, (const mrec_t **) mrec,
 | |
| 					offsets, index);
 | |
| 
 | |
| 	start = (1U << height) - 1;
 | |
| 
 | |
| 	/* Fetch sorted records from sort buffer and insert them into
 | |
| 	corresponding FTS index auxiliary tables */
 | |
| 	for (;;) {
 | |
| 		dtuple_t*	dtuple;
 | |
| 		int		min_rec = 0;
 | |
| 
 | |
| 		if (fts_sort_pll_degree <= 2) {
 | |
| 			while (!mrec[min_rec]) {
 | |
| 				min_rec++;
 | |
| 
 | |
| 				if (min_rec >= (int) fts_sort_pll_degree) {
 | |
| 					row_fts_insert_tuple(
 | |
| 						&ins_ctx, &new_word,
 | |
| 						positions, &last_doc_id,
 | |
| 						NULL);
 | |
| 
 | |
| 					goto exit;
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			for (i = min_rec + 1; i < fts_sort_pll_degree; i++) {
 | |
| 				if (!mrec[i]) {
 | |
| 					continue;
 | |
| 				}
 | |
| 
 | |
| 				if (cmp_rec_rec_simple(
 | |
| 					    mrec[i], mrec[min_rec],
 | |
| 					    offsets[i], offsets[min_rec],
 | |
| 					    index, NULL) < 0) {
 | |
| 					min_rec = static_cast<int>(i);
 | |
| 				}
 | |
| 			}
 | |
| 		} else {
 | |
| 			min_rec = sel_tree[0];
 | |
| 
 | |
| 			if (min_rec ==  -1) {
 | |
| 				row_fts_insert_tuple(
 | |
| 					&ins_ctx, &new_word,
 | |
| 					positions, &last_doc_id,
 | |
| 					NULL);
 | |
| 
 | |
| 				goto exit;
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		dtuple = row_rec_to_index_entry_low(
 | |
| 			mrec[min_rec], index, offsets[min_rec],
 | |
| 			tuple_heap);
 | |
| 
 | |
| 		row_fts_insert_tuple(
 | |
| 			&ins_ctx, &new_word, positions,
 | |
| 			&last_doc_id, dtuple);
 | |
| 
 | |
| 
 | |
| 		ROW_MERGE_READ_GET_NEXT(min_rec);
 | |
| 
 | |
| 		if (fts_sort_pll_degree > 2) {
 | |
| 			if (!mrec[min_rec]) {
 | |
| 				sel_tree[start + min_rec] = -1;
 | |
| 			}
 | |
| 
 | |
| 			row_fts_sel_tree_update(sel_tree, start + min_rec,
 | |
| 						height, mrec,
 | |
| 						offsets, index);
 | |
| 		}
 | |
| 
 | |
| 		count++;
 | |
| 
 | |
| 		mem_heap_empty(tuple_heap);
 | |
| 	}
 | |
| 
 | |
| exit:
 | |
| 	fts_sql_commit(trx);
 | |
| 
 | |
| 	trx->op_info = "";
 | |
| 
 | |
| 	mem_heap_free(tuple_heap);
 | |
| 
 | |
| 	error = ins_ctx.btr_bulk->finish(error);
 | |
| 	UT_DELETE(ins_ctx.btr_bulk);
 | |
| 
 | |
| 	aux_table->release();
 | |
| 
 | |
| 	trx->free();
 | |
| 
 | |
| 	mem_heap_free(heap);
 | |
| 
 | |
| 	if (UNIV_UNLIKELY(fts_enable_diag_print)) {
 | |
| 		ib::info() << "InnoDB_FTS: inserted " << count << " records";
 | |
| 	}
 | |
| 
 | |
| 	return(error);
 | |
| }
 | 
