mirror of
				https://github.com/MariaDB/server.git
				synced 2025-11-03 20:36:16 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			1788 lines
		
	
	
	
		
			48 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1788 lines
		
	
	
	
		
			48 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/*****************************************************************************
 | 
						|
 | 
						|
Copyright (c) 2010, 2016, Oracle and/or its affiliates. All Rights Reserved.
 | 
						|
Copyright (c) 2015, 2022, MariaDB Corporation.
 | 
						|
 | 
						|
This program is free software; you can redistribute it and/or modify it under
 | 
						|
the terms of the GNU General Public License as published by the Free Software
 | 
						|
Foundation; version 2 of the License.
 | 
						|
 | 
						|
This program is distributed in the hope that it will be useful, but WITHOUT
 | 
						|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 | 
						|
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
 | 
						|
 | 
						|
You should have received a copy of the GNU General Public License along with
 | 
						|
this program; if not, write to the Free Software Foundation, Inc.,
 | 
						|
51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
 | 
						|
 | 
						|
*****************************************************************************/
 | 
						|
 | 
						|
/**************************************************//**
 | 
						|
@file row/row0ftsort.cc
 | 
						|
Create Full Text Index with (parallel) merge sort
 | 
						|
 | 
						|
Created 10/13/2010 Jimmy Yang
 | 
						|
*******************************************************/
 | 
						|
 | 
						|
#include "row0ftsort.h"
 | 
						|
#include "dict0dict.h"
 | 
						|
#include "row0merge.h"
 | 
						|
#include "row0row.h"
 | 
						|
#include "btr0cur.h"
 | 
						|
#include "fts0plugin.h"
 | 
						|
#include "log0crypt.h"
 | 
						|
 | 
						|
/** Read the next record to buffer N.
 | 
						|
@param N index into array of merge info structure */
 | 
						|
#define ROW_MERGE_READ_GET_NEXT(N)					\
 | 
						|
	do {								\
 | 
						|
		b[N] = row_merge_read_rec(				\
 | 
						|
			block[N], buf[N], b[N], index,			\
 | 
						|
			fd[N], &foffs[N], &mrec[N], offsets[N],		\
 | 
						|
			crypt_block[N], space);				\
 | 
						|
		if (UNIV_UNLIKELY(!b[N])) {				\
 | 
						|
			if (mrec[N]) {					\
 | 
						|
				goto exit;				\
 | 
						|
			}						\
 | 
						|
		}							\
 | 
						|
	} while (0)
 | 
						|
 | 
						|
/** Parallel sort degree */
 | 
						|
ulong	fts_sort_pll_degree	= 2;
 | 
						|
 | 
						|
/*********************************************************************//**
 | 
						|
Create a temporary "fts sort index" used to merge sort the
 | 
						|
tokenized doc string. The index has three "fields":
 | 
						|
 | 
						|
1) Tokenized word,
 | 
						|
2) Doc ID (depend on number of records to sort, it can be a 4 bytes or 8 bytes
 | 
						|
integer value)
 | 
						|
3) Word's position in original doc.
 | 
						|
 | 
						|
@see fts_create_one_index_table()
 | 
						|
 | 
						|
@return dict_index_t structure for the fts sort index */
 | 
						|
dict_index_t*
 | 
						|
row_merge_create_fts_sort_index(
 | 
						|
/*============================*/
 | 
						|
	dict_index_t*	index,	/*!< in: Original FTS index
 | 
						|
				based on which this sort index
 | 
						|
				is created */
 | 
						|
	dict_table_t*	table,	/*!< in,out: table that FTS index
 | 
						|
				is being created on */
 | 
						|
	ibool*		opt_doc_id_size)
 | 
						|
				/*!< out: whether to use 4 bytes
 | 
						|
				instead of 8 bytes integer to
 | 
						|
				store Doc ID during sort */
 | 
						|
{
 | 
						|
	dict_index_t*   new_index;
 | 
						|
	dict_field_t*   field;
 | 
						|
	dict_field_t*   idx_field;
 | 
						|
	CHARSET_INFO*	charset;
 | 
						|
 | 
						|
	// FIXME: This name shouldn't be hard coded here.
 | 
						|
	new_index = dict_mem_index_create(table, "tmp_fts_idx", DICT_FTS, 3);
 | 
						|
 | 
						|
	new_index->id = index->id;
 | 
						|
	new_index->n_uniq = FTS_NUM_FIELDS_SORT;
 | 
						|
	new_index->n_def = FTS_NUM_FIELDS_SORT;
 | 
						|
	new_index->cached = TRUE;
 | 
						|
	new_index->parser = index->parser;
 | 
						|
 | 
						|
	idx_field = dict_index_get_nth_field(index, 0);
 | 
						|
	charset = fts_index_get_charset(index);
 | 
						|
 | 
						|
	/* The first field is on the Tokenized Word */
 | 
						|
	field = dict_index_get_nth_field(new_index, 0);
 | 
						|
	field->name = NULL;
 | 
						|
	field->prefix_len = 0;
 | 
						|
	field->descending = false;
 | 
						|
	field->col = static_cast<dict_col_t*>(
 | 
						|
		mem_heap_zalloc(new_index->heap, sizeof(dict_col_t)));
 | 
						|
	field->col->prtype = idx_field->col->prtype | DATA_NOT_NULL;
 | 
						|
	field->col->mtype = charset == &my_charset_latin1
 | 
						|
		? DATA_VARCHAR : DATA_VARMYSQL;
 | 
						|
	field->col->mbminlen = idx_field->col->mbminlen;
 | 
						|
	field->col->mbmaxlen = idx_field->col->mbmaxlen;
 | 
						|
	field->col->len = static_cast<uint16_t>(
 | 
						|
		HA_FT_MAXCHARLEN * field->col->mbmaxlen);
 | 
						|
 | 
						|
	field->fixed_len = 0;
 | 
						|
 | 
						|
	/* Doc ID */
 | 
						|
	field = dict_index_get_nth_field(new_index, 1);
 | 
						|
	field->name = NULL;
 | 
						|
	field->prefix_len = 0;
 | 
						|
	field->descending = false;
 | 
						|
	field->col = static_cast<dict_col_t*>(
 | 
						|
		mem_heap_zalloc(new_index->heap, sizeof(dict_col_t)));
 | 
						|
	field->col->mtype = DATA_INT;
 | 
						|
	*opt_doc_id_size = FALSE;
 | 
						|
 | 
						|
	/* Check whether we can use 4 bytes instead of 8 bytes integer
 | 
						|
	field to hold the Doc ID, thus reduce the overall sort size */
 | 
						|
	if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID)) {
 | 
						|
		/* If Doc ID column is being added by this create
 | 
						|
		index, then just check the number of rows in the table */
 | 
						|
		if (dict_table_get_n_rows(table) < MAX_DOC_ID_OPT_VAL) {
 | 
						|
			*opt_doc_id_size = TRUE;
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		doc_id_t	max_doc_id;
 | 
						|
 | 
						|
		/* If the Doc ID column is supplied by user, then
 | 
						|
		check the maximum Doc ID in the table */
 | 
						|
		max_doc_id = fts_get_max_doc_id((dict_table_t*) table);
 | 
						|
 | 
						|
		if (max_doc_id && max_doc_id < MAX_DOC_ID_OPT_VAL) {
 | 
						|
			*opt_doc_id_size = TRUE;
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if (*opt_doc_id_size) {
 | 
						|
		field->col->len = sizeof(ib_uint32_t);
 | 
						|
		field->fixed_len = sizeof(ib_uint32_t);
 | 
						|
	} else {
 | 
						|
		field->col->len = FTS_DOC_ID_LEN;
 | 
						|
		field->fixed_len = FTS_DOC_ID_LEN;
 | 
						|
	}
 | 
						|
 | 
						|
	field->col->prtype = DATA_NOT_NULL | DATA_BINARY_TYPE;
 | 
						|
 | 
						|
	/* The third field is on the word's position in the original doc */
 | 
						|
	field = dict_index_get_nth_field(new_index, 2);
 | 
						|
	field->name = NULL;
 | 
						|
	field->prefix_len = 0;
 | 
						|
	field->descending = false;
 | 
						|
	field->col = static_cast<dict_col_t*>(
 | 
						|
		mem_heap_zalloc(new_index->heap, sizeof(dict_col_t)));
 | 
						|
	field->col->mtype = DATA_INT;
 | 
						|
	field->col->len = 4 ;
 | 
						|
	field->fixed_len = 4;
 | 
						|
	field->col->prtype = DATA_NOT_NULL;
 | 
						|
 | 
						|
	return(new_index);
 | 
						|
}
 | 
						|
 | 
						|
/** Initialize FTS parallel sort structures.
 | 
						|
@param[in]	trx		transaction
 | 
						|
@param[in,out]	dup		descriptor of FTS index being created
 | 
						|
@param[in,out]	new_table	table where indexes are created
 | 
						|
@param[in]	opt_doc_id_size	whether to use 4 bytes instead of 8 bytes
 | 
						|
				integer to store Doc ID during sort
 | 
						|
@param[in]	old_zip_size	page size of the old table during alter
 | 
						|
@param[out]	psort		parallel sort info to be instantiated
 | 
						|
@param[out]	merge		parallel merge info to be instantiated
 | 
						|
@return true if all successful */
 | 
						|
bool
 | 
						|
row_fts_psort_info_init(
 | 
						|
	trx_t*		trx,
 | 
						|
	row_merge_dup_t*dup,
 | 
						|
	dict_table_t*	new_table,
 | 
						|
	bool		opt_doc_id_size,
 | 
						|
	ulint		old_zip_size,
 | 
						|
	fts_psort_t**	psort,
 | 
						|
	fts_psort_t**	merge)
 | 
						|
{
 | 
						|
	ulint			i;
 | 
						|
	ulint			j;
 | 
						|
	fts_psort_common_t*	common_info = NULL;
 | 
						|
	fts_psort_t*		psort_info = NULL;
 | 
						|
	fts_psort_t*		merge_info = NULL;
 | 
						|
	ulint			block_size;
 | 
						|
	ibool			ret = TRUE;
 | 
						|
	ut_ad(ut_is_2pow(old_zip_size));
 | 
						|
 | 
						|
	block_size = 3 * srv_sort_buf_size;
 | 
						|
 | 
						|
	*psort = psort_info = static_cast<fts_psort_t*>(ut_zalloc_nokey(
 | 
						|
		 fts_sort_pll_degree * sizeof *psort_info));
 | 
						|
 | 
						|
	if (!psort_info) {
 | 
						|
		ut_free(dup);
 | 
						|
		return(FALSE);
 | 
						|
	}
 | 
						|
 | 
						|
	/* Common Info for all sort threads */
 | 
						|
	common_info = static_cast<fts_psort_common_t*>(
 | 
						|
		ut_malloc_nokey(sizeof *common_info));
 | 
						|
 | 
						|
	if (!common_info) {
 | 
						|
		ut_free(dup);
 | 
						|
		ut_free(psort_info);
 | 
						|
		return(FALSE);
 | 
						|
	}
 | 
						|
 | 
						|
	common_info->dup = dup;
 | 
						|
	common_info->new_table = new_table;
 | 
						|
	common_info->old_zip_size = old_zip_size;
 | 
						|
	common_info->trx = trx;
 | 
						|
	common_info->all_info = psort_info;
 | 
						|
	pthread_cond_init(&common_info->sort_cond, nullptr);
 | 
						|
	common_info->opt_doc_id_size = opt_doc_id_size;
 | 
						|
 | 
						|
	ut_ad(trx->mysql_thd != NULL);
 | 
						|
	const char*	path = thd_innodb_tmpdir(trx->mysql_thd);
 | 
						|
	/* There will be FTS_NUM_AUX_INDEX number of "sort buckets" for
 | 
						|
	each parallel sort thread. Each "sort bucket" holds records for
 | 
						|
	a particular "FTS index partition" */
 | 
						|
	for (j = 0; j < fts_sort_pll_degree; j++) {
 | 
						|
 | 
						|
		UT_LIST_INIT(
 | 
						|
			psort_info[j].fts_doc_list, &fts_doc_item_t::doc_list);
 | 
						|
 | 
						|
		for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
 | 
						|
 | 
						|
			psort_info[j].merge_file[i] =
 | 
						|
				 static_cast<merge_file_t*>(
 | 
						|
					ut_zalloc_nokey(sizeof(merge_file_t)));
 | 
						|
 | 
						|
			if (!psort_info[j].merge_file[i]) {
 | 
						|
				ret = FALSE;
 | 
						|
				goto func_exit;
 | 
						|
			}
 | 
						|
 | 
						|
			psort_info[j].merge_buf[i] = row_merge_buf_create(
 | 
						|
				dup->index);
 | 
						|
 | 
						|
			if (row_merge_file_create(psort_info[j].merge_file[i],
 | 
						|
						  path) == OS_FILE_CLOSED) {
 | 
						|
				goto func_exit;
 | 
						|
			}
 | 
						|
 | 
						|
			/* Need to align memory for O_DIRECT write */
 | 
						|
			psort_info[j].merge_block[i] =
 | 
						|
				static_cast<row_merge_block_t*>(
 | 
						|
					aligned_malloc(block_size, 1024));
 | 
						|
 | 
						|
			if (!psort_info[j].merge_block[i]) {
 | 
						|
				ret = FALSE;
 | 
						|
				goto func_exit;
 | 
						|
			}
 | 
						|
 | 
						|
			/* If tablespace is encrypted, allocate additional buffer for
 | 
						|
			encryption/decryption. */
 | 
						|
			if (srv_encrypt_log) {
 | 
						|
				/* Need to align memory for O_DIRECT write */
 | 
						|
				psort_info[j].crypt_block[i] =
 | 
						|
					static_cast<row_merge_block_t*>(
 | 
						|
						aligned_malloc(block_size,
 | 
						|
							       1024));
 | 
						|
 | 
						|
				if (!psort_info[j].crypt_block[i]) {
 | 
						|
					ret = FALSE;
 | 
						|
					goto func_exit;
 | 
						|
				}
 | 
						|
			} else {
 | 
						|
				psort_info[j].crypt_block[i] = NULL;
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		psort_info[j].child_status = 0;
 | 
						|
		psort_info[j].state = 0;
 | 
						|
		psort_info[j].psort_common = common_info;
 | 
						|
		psort_info[j].error = DB_SUCCESS;
 | 
						|
		psort_info[j].memory_used = 0;
 | 
						|
		mysql_mutex_init(0, &psort_info[j].mutex, nullptr);
 | 
						|
	}
 | 
						|
 | 
						|
	/* Initialize merge_info structures parallel merge and insert
 | 
						|
	into auxiliary FTS tables (FTS_INDEX_TABLE) */
 | 
						|
	*merge = merge_info = static_cast<fts_psort_t*>(
 | 
						|
		ut_malloc_nokey(FTS_NUM_AUX_INDEX * sizeof *merge_info));
 | 
						|
 | 
						|
	for (j = 0; j < FTS_NUM_AUX_INDEX; j++) {
 | 
						|
 | 
						|
		merge_info[j].child_status = 0;
 | 
						|
		merge_info[j].state = 0;
 | 
						|
		merge_info[j].psort_common = common_info;
 | 
						|
	}
 | 
						|
 | 
						|
func_exit:
 | 
						|
	if (!ret) {
 | 
						|
		row_fts_psort_info_destroy(psort_info, merge_info);
 | 
						|
	}
 | 
						|
 | 
						|
	return(ret);
 | 
						|
}
 | 
						|
/*********************************************************************//**
 | 
						|
Clean up and deallocate FTS parallel sort structures, and close the
 | 
						|
merge sort files  */
 | 
						|
void
 | 
						|
row_fts_psort_info_destroy(
 | 
						|
/*=======================*/
 | 
						|
	fts_psort_t*	psort_info,	/*!< parallel sort info */
 | 
						|
	fts_psort_t*	merge_info)	/*!< parallel merge info */
 | 
						|
{
 | 
						|
	ulint	i;
 | 
						|
	ulint	j;
 | 
						|
 | 
						|
	if (psort_info) {
 | 
						|
		for (j = 0; j < fts_sort_pll_degree; j++) {
 | 
						|
			for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
 | 
						|
				if (psort_info[j].merge_file[i]) {
 | 
						|
					row_merge_file_destroy(
 | 
						|
						psort_info[j].merge_file[i]);
 | 
						|
				}
 | 
						|
 | 
						|
				aligned_free(psort_info[j].merge_block[i]);
 | 
						|
				ut_free(psort_info[j].merge_file[i]);
 | 
						|
				aligned_free(psort_info[j].crypt_block[i]);
 | 
						|
			}
 | 
						|
 | 
						|
			mysql_mutex_destroy(&psort_info[j].mutex);
 | 
						|
		}
 | 
						|
 | 
						|
		pthread_cond_destroy(&merge_info[0].psort_common->sort_cond);
 | 
						|
		ut_free(merge_info[0].psort_common->dup);
 | 
						|
		ut_free(merge_info[0].psort_common);
 | 
						|
		ut_free(psort_info);
 | 
						|
	}
 | 
						|
 | 
						|
	ut_free(merge_info);
 | 
						|
}
 | 
						|
/*********************************************************************//**
 | 
						|
Free up merge buffers when merge sort is done */
 | 
						|
void
 | 
						|
row_fts_free_pll_merge_buf(
 | 
						|
/*=======================*/
 | 
						|
	fts_psort_t*	psort_info)	/*!< in: parallel sort info */
 | 
						|
{
 | 
						|
	ulint	j;
 | 
						|
	ulint	i;
 | 
						|
 | 
						|
	if (!psort_info) {
 | 
						|
		return;
 | 
						|
	}
 | 
						|
 | 
						|
	for (j = 0; j < fts_sort_pll_degree; j++) {
 | 
						|
		for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
 | 
						|
			row_merge_buf_free(psort_info[j].merge_buf[i]);
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return;
 | 
						|
}
 | 
						|
 | 
						|
/*********************************************************************//**
 | 
						|
FTS plugin parser 'myql_add_word' callback function for row merge.
 | 
						|
Refer to 'st_mysql_ftparser_param' for more detail.
 | 
						|
@return always returns 0 */
 | 
						|
static
 | 
						|
int
 | 
						|
row_merge_fts_doc_add_word_for_parser(
 | 
						|
/*==================================*/
 | 
						|
	MYSQL_FTPARSER_PARAM	*param,		/* in: parser paramter */
 | 
						|
	const char		*word,		/* in: token word */
 | 
						|
	int			word_len,	/* in: word len */
 | 
						|
	MYSQL_FTPARSER_BOOLEAN_INFO*	boolean_info)	/* in: boolean info */
 | 
						|
{
 | 
						|
	fts_string_t		str;
 | 
						|
	fts_tokenize_ctx_t*	t_ctx;
 | 
						|
	row_fts_token_t*	fts_token;
 | 
						|
	byte*			ptr;
 | 
						|
 | 
						|
	ut_ad(param);
 | 
						|
	ut_ad(param->mysql_ftparam);
 | 
						|
	ut_ad(word);
 | 
						|
	ut_ad(boolean_info);
 | 
						|
 | 
						|
	t_ctx = static_cast<fts_tokenize_ctx_t*>(param->mysql_ftparam);
 | 
						|
	ut_ad(t_ctx);
 | 
						|
 | 
						|
	str.f_str = (byte*)(word);
 | 
						|
	str.f_len = ulint(word_len);
 | 
						|
	str.f_n_char = fts_get_token_size(
 | 
						|
		(CHARSET_INFO*)param->cs, word, ulint(word_len));
 | 
						|
 | 
						|
	/* JAN: TODO: MySQL 5.7 FTS
 | 
						|
	ut_ad(boolean_info->position >= 0);
 | 
						|
	*/
 | 
						|
 | 
						|
	ptr = static_cast<byte*>(ut_malloc_nokey(sizeof(row_fts_token_t)
 | 
						|
			+ sizeof(fts_string_t) + str.f_len));
 | 
						|
	fts_token = reinterpret_cast<row_fts_token_t*>(ptr);
 | 
						|
	fts_token->text = reinterpret_cast<fts_string_t*>(
 | 
						|
			ptr + sizeof(row_fts_token_t));
 | 
						|
	fts_token->text->f_str = static_cast<byte*>(
 | 
						|
			ptr + sizeof(row_fts_token_t) + sizeof(fts_string_t));
 | 
						|
 | 
						|
	fts_token->text->f_len = str.f_len;
 | 
						|
	fts_token->text->f_n_char = str.f_n_char;
 | 
						|
	memcpy(fts_token->text->f_str, str.f_str, str.f_len);
 | 
						|
 | 
						|
	/* JAN: TODO: MySQL 5.7 FTS
 | 
						|
	fts_token->position = boolean_info->position;
 | 
						|
	*/
 | 
						|
 | 
						|
	/* Add token to list */
 | 
						|
	UT_LIST_ADD_LAST(t_ctx->fts_token_list, fts_token);
 | 
						|
 | 
						|
	return(0);
 | 
						|
}
 | 
						|
 | 
						|
/*********************************************************************//**
 | 
						|
Tokenize by fts plugin parser */
 | 
						|
static
 | 
						|
void
 | 
						|
row_merge_fts_doc_tokenize_by_parser(
 | 
						|
/*=================================*/
 | 
						|
	fts_doc_t*		doc,	/* in: doc to tokenize */
 | 
						|
	st_mysql_ftparser*	parser,	/* in: plugin parser instance */
 | 
						|
	fts_tokenize_ctx_t*	t_ctx)	/* in/out: tokenize ctx instance */
 | 
						|
{
 | 
						|
	MYSQL_FTPARSER_PARAM	param;
 | 
						|
 | 
						|
	ut_a(parser);
 | 
						|
 | 
						|
	/* Set paramters for param */
 | 
						|
	param.mysql_parse = fts_tokenize_document_internal;
 | 
						|
	param.mysql_add_word = row_merge_fts_doc_add_word_for_parser;
 | 
						|
	param.mysql_ftparam = t_ctx;
 | 
						|
	param.cs = doc->charset;
 | 
						|
	param.doc = reinterpret_cast<char*>(doc->text.f_str);
 | 
						|
	param.length = static_cast<int>(doc->text.f_len);
 | 
						|
	param.mode= MYSQL_FTPARSER_SIMPLE_MODE;
 | 
						|
 | 
						|
	PARSER_INIT(parser, ¶m);
 | 
						|
	/* We assume parse returns successfully here. */
 | 
						|
	parser->parse(¶m);
 | 
						|
	PARSER_DEINIT(parser, ¶m);
 | 
						|
}
 | 
						|
 | 
						|
/*********************************************************************//**
 | 
						|
Tokenize incoming text data and add to the sort buffer.
 | 
						|
@see row_merge_buf_encode()
 | 
						|
@return	TRUE if the record passed, FALSE if out of space */
 | 
						|
static
 | 
						|
ibool
 | 
						|
row_merge_fts_doc_tokenize(
 | 
						|
/*=======================*/
 | 
						|
	row_merge_buf_t**	sort_buf,	/*!< in/out: sort buffer */
 | 
						|
	doc_id_t		doc_id,		/*!< in: Doc ID */
 | 
						|
	fts_doc_t*		doc,		/*!< in: Doc to be tokenized */
 | 
						|
	merge_file_t**		merge_file,	/*!< in/out: merge file */
 | 
						|
	ibool			opt_doc_id_size,/*!< in: whether to use 4 bytes
 | 
						|
						instead of 8 bytes integer to
 | 
						|
						store Doc ID during sort*/
 | 
						|
	fts_tokenize_ctx_t*	t_ctx)          /*!< in/out: tokenize context */
 | 
						|
{
 | 
						|
	ulint		inc = 0;
 | 
						|
	fts_string_t	str;
 | 
						|
	ulint		len;
 | 
						|
	row_merge_buf_t* buf;
 | 
						|
	dfield_t*	field;
 | 
						|
	fts_string_t	t_str;
 | 
						|
	ibool		buf_full = FALSE;
 | 
						|
	byte		str_buf[FTS_MAX_WORD_LEN + 1];
 | 
						|
	ulint		data_size[FTS_NUM_AUX_INDEX];
 | 
						|
	ulint		n_tuple[FTS_NUM_AUX_INDEX];
 | 
						|
	st_mysql_ftparser*	parser;
 | 
						|
 | 
						|
	t_str.f_n_char = 0;
 | 
						|
	t_ctx->buf_used = 0;
 | 
						|
 | 
						|
	memset(n_tuple, 0, FTS_NUM_AUX_INDEX * sizeof(ulint));
 | 
						|
	memset(data_size, 0, FTS_NUM_AUX_INDEX * sizeof(ulint));
 | 
						|
 | 
						|
	parser = sort_buf[0]->index->parser;
 | 
						|
 | 
						|
	/* Tokenize the data and add each word string, its corresponding
 | 
						|
	doc id and position to sort buffer */
 | 
						|
	while (parser
 | 
						|
               ? (!t_ctx->processed_len
 | 
						|
                  || UT_LIST_GET_LEN(t_ctx->fts_token_list))
 | 
						|
               : t_ctx->processed_len < doc->text.f_len) {
 | 
						|
		ulint		idx = 0;
 | 
						|
		ulint		cur_len;
 | 
						|
		doc_id_t	write_doc_id;
 | 
						|
		row_fts_token_t* fts_token = NULL;
 | 
						|
 | 
						|
		if (parser != NULL) {
 | 
						|
			if (t_ctx->processed_len == 0) {
 | 
						|
				UT_LIST_INIT(t_ctx->fts_token_list, &row_fts_token_t::token_list);
 | 
						|
 | 
						|
				/* Parse the whole doc and cache tokens */
 | 
						|
				row_merge_fts_doc_tokenize_by_parser(doc,
 | 
						|
					parser, t_ctx);
 | 
						|
 | 
						|
				/* Just indictate we have parsed all the word */
 | 
						|
				t_ctx->processed_len += 1;
 | 
						|
			}
 | 
						|
 | 
						|
			/* Then get a token */
 | 
						|
			fts_token = UT_LIST_GET_FIRST(t_ctx->fts_token_list);
 | 
						|
			if (fts_token) {
 | 
						|
				str.f_len = fts_token->text->f_len;
 | 
						|
				str.f_n_char = fts_token->text->f_n_char;
 | 
						|
				str.f_str = fts_token->text->f_str;
 | 
						|
			} else {
 | 
						|
				ut_ad(UT_LIST_GET_LEN(t_ctx->fts_token_list) == 0);
 | 
						|
				/* Reach the end of the list */
 | 
						|
				t_ctx->processed_len = doc->text.f_len;
 | 
						|
				break;
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			inc = innobase_mysql_fts_get_token(
 | 
						|
				doc->charset,
 | 
						|
				doc->text.f_str + t_ctx->processed_len,
 | 
						|
				doc->text.f_str + doc->text.f_len, &str);
 | 
						|
 | 
						|
			ut_a(inc > 0);
 | 
						|
		}
 | 
						|
 | 
						|
		/* Ignore string whose character number is less than
 | 
						|
		"fts_min_token_size" or more than "fts_max_token_size" */
 | 
						|
		if (!fts_check_token(&str, NULL, NULL)) {
 | 
						|
			if (parser != NULL) {
 | 
						|
				UT_LIST_REMOVE(t_ctx->fts_token_list, fts_token);
 | 
						|
				ut_free(fts_token);
 | 
						|
			} else {
 | 
						|
				t_ctx->processed_len += inc;
 | 
						|
			}
 | 
						|
 | 
						|
			continue;
 | 
						|
		}
 | 
						|
 | 
						|
		t_str.f_len = innobase_fts_casedn_str(
 | 
						|
			doc->charset, (char*) str.f_str, str.f_len,
 | 
						|
			(char*) &str_buf, FTS_MAX_WORD_LEN + 1);
 | 
						|
 | 
						|
		t_str.f_str = (byte*) &str_buf;
 | 
						|
 | 
						|
		/* if "cached_stopword" is defined, ignore words in the
 | 
						|
		stopword list */
 | 
						|
		if (!fts_check_token(&str, t_ctx->cached_stopword,
 | 
						|
				     doc->charset)) {
 | 
						|
			if (parser != NULL) {
 | 
						|
				UT_LIST_REMOVE(t_ctx->fts_token_list, fts_token);
 | 
						|
				ut_free(fts_token);
 | 
						|
			} else {
 | 
						|
				t_ctx->processed_len += inc;
 | 
						|
			}
 | 
						|
 | 
						|
			continue;
 | 
						|
		}
 | 
						|
 | 
						|
		/* There are FTS_NUM_AUX_INDEX auxiliary tables, find
 | 
						|
		out which sort buffer to put this word record in */
 | 
						|
		t_ctx->buf_used = fts_select_index(
 | 
						|
			doc->charset, t_str.f_str, t_str.f_len);
 | 
						|
 | 
						|
		buf = sort_buf[t_ctx->buf_used];
 | 
						|
 | 
						|
		ut_a(t_ctx->buf_used < FTS_NUM_AUX_INDEX);
 | 
						|
		idx = t_ctx->buf_used;
 | 
						|
 | 
						|
		mtuple_t* mtuple = &buf->tuples[buf->n_tuples + n_tuple[idx]];
 | 
						|
 | 
						|
		field = mtuple->fields = static_cast<dfield_t*>(
 | 
						|
			mem_heap_alloc(buf->heap,
 | 
						|
				       FTS_NUM_FIELDS_SORT * sizeof *field));
 | 
						|
 | 
						|
		/* The first field is the tokenized word */
 | 
						|
		dfield_set_data(field, t_str.f_str, t_str.f_len);
 | 
						|
		len = dfield_get_len(field);
 | 
						|
 | 
						|
		dict_col_copy_type(dict_index_get_nth_col(buf->index, 0), &field->type);
 | 
						|
		field->type.prtype |= DATA_NOT_NULL;
 | 
						|
		ut_ad(len <= field->type.len);
 | 
						|
 | 
						|
		/* For the temporary file, row_merge_buf_encode() uses
 | 
						|
		1 byte for representing the number of extra_size bytes.
 | 
						|
		This number will always be 1, because for this 3-field index
 | 
						|
		consisting of one variable-size column, extra_size will always
 | 
						|
		be 1 or 2, which can be encoded in one byte.
 | 
						|
 | 
						|
		The extra_size is 1 byte if the length of the
 | 
						|
		variable-length column is less than 128 bytes or the
 | 
						|
		maximum length is less than 256 bytes. */
 | 
						|
 | 
						|
		/* One variable length column, word with its lenght less than
 | 
						|
		fts_max_token_size, add one extra size and one extra byte.
 | 
						|
 | 
						|
		Since the max length for FTS token now is larger than 255,
 | 
						|
		so we will need to signify length byte itself, so only 1 to 128
 | 
						|
		bytes can be used for 1 bytes, larger than that 2 bytes. */
 | 
						|
		if (len < 128 || field->type.len < 256) {
 | 
						|
			/* Extra size is one byte. */
 | 
						|
			cur_len = 2 + len;
 | 
						|
		} else {
 | 
						|
			/* Extra size is two bytes. */
 | 
						|
			cur_len = 3 + len;
 | 
						|
		}
 | 
						|
 | 
						|
		dfield_dup(field, buf->heap);
 | 
						|
		field++;
 | 
						|
 | 
						|
		/* The second field is the Doc ID */
 | 
						|
 | 
						|
		ib_uint32_t	doc_id_32_bit;
 | 
						|
 | 
						|
		if (!opt_doc_id_size) {
 | 
						|
			fts_write_doc_id((byte*) &write_doc_id, doc_id);
 | 
						|
 | 
						|
			dfield_set_data(
 | 
						|
				field, &write_doc_id, sizeof(write_doc_id));
 | 
						|
		} else {
 | 
						|
			mach_write_to_4(
 | 
						|
				(byte*) &doc_id_32_bit, (ib_uint32_t) doc_id);
 | 
						|
 | 
						|
			dfield_set_data(
 | 
						|
				field, &doc_id_32_bit, sizeof(doc_id_32_bit));
 | 
						|
		}
 | 
						|
 | 
						|
		len = field->len;
 | 
						|
		ut_ad(len == FTS_DOC_ID_LEN || len == sizeof(ib_uint32_t));
 | 
						|
 | 
						|
		field->type.mtype = DATA_INT;
 | 
						|
		field->type.prtype = DATA_NOT_NULL | DATA_BINARY_TYPE;
 | 
						|
		field->type.len = static_cast<uint16_t>(field->len);
 | 
						|
		field->type.mbminlen = 0;
 | 
						|
		field->type.mbmaxlen = 0;
 | 
						|
 | 
						|
		cur_len += len;
 | 
						|
		dfield_dup(field, buf->heap);
 | 
						|
 | 
						|
		++field;
 | 
						|
 | 
						|
		/* The third field is the position.
 | 
						|
		MySQL 5.7 changed the fulltext parser plugin interface
 | 
						|
		by adding MYSQL_FTPARSER_BOOLEAN_INFO::position.
 | 
						|
		Below we assume that the field is always 0. */
 | 
						|
		ulint	pos = t_ctx->init_pos;
 | 
						|
		byte		position[4];
 | 
						|
		if (parser == NULL) {
 | 
						|
			pos += t_ctx->processed_len + inc - str.f_len;
 | 
						|
		}
 | 
						|
		len = 4;
 | 
						|
		mach_write_to_4(position, pos);
 | 
						|
		dfield_set_data(field, &position, len);
 | 
						|
 | 
						|
		field->type.mtype = DATA_INT;
 | 
						|
		field->type.prtype = DATA_NOT_NULL;
 | 
						|
		field->type.len = 4;
 | 
						|
		field->type.mbminlen = 0;
 | 
						|
		field->type.mbmaxlen = 0;
 | 
						|
		cur_len += len;
 | 
						|
		dfield_dup(field, buf->heap);
 | 
						|
 | 
						|
		/* Reserve one byte for the end marker of row_merge_block_t */
 | 
						|
		if (buf->total_size + data_size[idx] + cur_len
 | 
						|
		    >= srv_sort_buf_size - 1) {
 | 
						|
 | 
						|
			buf_full = TRUE;
 | 
						|
			break;
 | 
						|
		}
 | 
						|
 | 
						|
		/* Increment the number of tuples */
 | 
						|
		n_tuple[idx]++;
 | 
						|
		if (parser != NULL) {
 | 
						|
			UT_LIST_REMOVE(t_ctx->fts_token_list, fts_token);
 | 
						|
			ut_free(fts_token);
 | 
						|
		} else {
 | 
						|
			t_ctx->processed_len += inc;
 | 
						|
		}
 | 
						|
		data_size[idx] += cur_len;
 | 
						|
	}
 | 
						|
 | 
						|
	/* Update the data length and the number of new word tuples
 | 
						|
	added in this round of tokenization */
 | 
						|
	for (ulint i = 0; i <  FTS_NUM_AUX_INDEX; i++) {
 | 
						|
		/* The computation of total_size below assumes that no
 | 
						|
		delete-mark flags will be stored and that all fields
 | 
						|
		are NOT NULL and fixed-length. */
 | 
						|
 | 
						|
		sort_buf[i]->total_size += data_size[i];
 | 
						|
 | 
						|
		sort_buf[i]->n_tuples += n_tuple[i];
 | 
						|
 | 
						|
		merge_file[i]->n_rec += n_tuple[i];
 | 
						|
		t_ctx->rows_added[i] += n_tuple[i];
 | 
						|
	}
 | 
						|
 | 
						|
	if (!buf_full) {
 | 
						|
		/* we pad one byte between text accross two fields */
 | 
						|
		t_ctx->init_pos += doc->text.f_len + 1;
 | 
						|
	}
 | 
						|
 | 
						|
	return(!buf_full);
 | 
						|
}
 | 
						|
 | 
						|
/*********************************************************************//**
 | 
						|
Get next doc item from fts_doc_list */
 | 
						|
UNIV_INLINE
 | 
						|
void
 | 
						|
row_merge_fts_get_next_doc_item(
 | 
						|
/*============================*/
 | 
						|
	fts_psort_t*		psort_info,	/*!< in: psort_info */
 | 
						|
	fts_doc_item_t**	doc_item)	/*!< in/out: doc item */
 | 
						|
{
 | 
						|
	if (*doc_item != NULL) {
 | 
						|
		ut_free(*doc_item);
 | 
						|
	}
 | 
						|
 | 
						|
	mysql_mutex_lock(&psort_info->mutex);
 | 
						|
 | 
						|
	*doc_item = UT_LIST_GET_FIRST(psort_info->fts_doc_list);
 | 
						|
	if (*doc_item != NULL) {
 | 
						|
		UT_LIST_REMOVE(psort_info->fts_doc_list, *doc_item);
 | 
						|
 | 
						|
		ut_ad(psort_info->memory_used >= sizeof(fts_doc_item_t)
 | 
						|
		      + (*doc_item)->field->len);
 | 
						|
		psort_info->memory_used -= sizeof(fts_doc_item_t)
 | 
						|
			+ (*doc_item)->field->len;
 | 
						|
	}
 | 
						|
 | 
						|
	mysql_mutex_unlock(&psort_info->mutex);
 | 
						|
}
 | 
						|
 | 
						|
/*********************************************************************//**
 | 
						|
Function performs parallel tokenization of the incoming doc strings.
 | 
						|
It also performs the initial in memory sort of the parsed records.
 | 
						|
*/
 | 
						|
static
 | 
						|
void fts_parallel_tokenization(
 | 
						|
/*======================*/
 | 
						|
	void*		arg)	/*!< in: psort_info for the thread */
 | 
						|
{
 | 
						|
	fts_psort_t*		psort_info = (fts_psort_t*) arg;
 | 
						|
	ulint			i;
 | 
						|
	fts_doc_item_t*		doc_item = NULL;
 | 
						|
	row_merge_buf_t**	buf;
 | 
						|
	ibool			processed = FALSE;
 | 
						|
	merge_file_t**		merge_file;
 | 
						|
	row_merge_block_t**	block;
 | 
						|
	row_merge_block_t**	crypt_block;
 | 
						|
	pfs_os_file_t		tmpfd[FTS_NUM_AUX_INDEX];
 | 
						|
	ulint			mycount[FTS_NUM_AUX_INDEX];
 | 
						|
	ulint			num_doc_processed = 0;
 | 
						|
	doc_id_t		last_doc_id = 0;
 | 
						|
	mem_heap_t*		blob_heap = NULL;
 | 
						|
	fts_doc_t		doc;
 | 
						|
	dict_table_t*		table = psort_info->psort_common->new_table;
 | 
						|
	fts_tokenize_ctx_t	t_ctx;
 | 
						|
	ulint			retried = 0;
 | 
						|
	dberr_t			error = DB_SUCCESS;
 | 
						|
 | 
						|
	ut_ad(psort_info->psort_common->trx->mysql_thd != NULL);
 | 
						|
 | 
						|
	/* const char*		path = thd_innodb_tmpdir(
 | 
						|
		psort_info->psort_common->trx->mysql_thd);
 | 
						|
	*/
 | 
						|
 | 
						|
	ut_ad(psort_info->psort_common->trx->mysql_thd != NULL);
 | 
						|
 | 
						|
	const char*		path = thd_innodb_tmpdir(
 | 
						|
		psort_info->psort_common->trx->mysql_thd);
 | 
						|
 | 
						|
	ut_ad(psort_info);
 | 
						|
 | 
						|
	buf = psort_info->merge_buf;
 | 
						|
	merge_file = psort_info->merge_file;
 | 
						|
	blob_heap = mem_heap_create(512);
 | 
						|
	memset(&doc, 0, sizeof(doc));
 | 
						|
	memset(mycount, 0, FTS_NUM_AUX_INDEX * sizeof(int));
 | 
						|
 | 
						|
	doc.charset = fts_index_get_charset(
 | 
						|
		psort_info->psort_common->dup->index);
 | 
						|
 | 
						|
	block = psort_info->merge_block;
 | 
						|
	crypt_block = psort_info->crypt_block;
 | 
						|
 | 
						|
	const ulint zip_size = psort_info->psort_common->old_zip_size;
 | 
						|
 | 
						|
	row_merge_fts_get_next_doc_item(psort_info, &doc_item);
 | 
						|
 | 
						|
	t_ctx.cached_stopword = table->fts->cache->stopword_info.cached_stopword;
 | 
						|
	processed = TRUE;
 | 
						|
loop:
 | 
						|
	while (doc_item) {
 | 
						|
		dfield_t*	dfield = doc_item->field;
 | 
						|
 | 
						|
		last_doc_id = doc_item->doc_id;
 | 
						|
 | 
						|
		ut_ad (dfield->data != NULL
 | 
						|
		       && dfield_get_len(dfield) != UNIV_SQL_NULL);
 | 
						|
 | 
						|
		/* If finish processing the last item, update "doc" with
 | 
						|
		strings in the doc_item, otherwise continue processing last
 | 
						|
		item */
 | 
						|
		if (processed) {
 | 
						|
			byte*		data;
 | 
						|
			ulint		data_len;
 | 
						|
 | 
						|
			dfield = doc_item->field;
 | 
						|
			data = static_cast<byte*>(dfield_get_data(dfield));
 | 
						|
			data_len = dfield_get_len(dfield);
 | 
						|
 | 
						|
			if (dfield_is_ext(dfield)) {
 | 
						|
				doc.text.f_str =
 | 
						|
					btr_copy_externally_stored_field(
 | 
						|
						&doc.text.f_len, data,
 | 
						|
						zip_size, data_len, blob_heap);
 | 
						|
			} else {
 | 
						|
				doc.text.f_str = data;
 | 
						|
				doc.text.f_len = data_len;
 | 
						|
			}
 | 
						|
 | 
						|
			doc.tokens = 0;
 | 
						|
			t_ctx.processed_len = 0;
 | 
						|
		} else {
 | 
						|
			/* Not yet finish processing the "doc" on hand,
 | 
						|
			continue processing it */
 | 
						|
			ut_ad(doc.text.f_str);
 | 
						|
			ut_ad(buf[0]->index->parser
 | 
						|
			      || t_ctx.processed_len < doc.text.f_len);
 | 
						|
		}
 | 
						|
 | 
						|
		processed = row_merge_fts_doc_tokenize(
 | 
						|
			buf, doc_item->doc_id, &doc,
 | 
						|
			merge_file, psort_info->psort_common->opt_doc_id_size,
 | 
						|
			&t_ctx);
 | 
						|
 | 
						|
		/* Current sort buffer full, need to recycle */
 | 
						|
		if (!processed) {
 | 
						|
			ut_ad(buf[0]->index->parser
 | 
						|
			      || t_ctx.processed_len < doc.text.f_len);
 | 
						|
			ut_ad(t_ctx.rows_added[t_ctx.buf_used]);
 | 
						|
			break;
 | 
						|
		}
 | 
						|
 | 
						|
		num_doc_processed++;
 | 
						|
 | 
						|
		if (UNIV_UNLIKELY(fts_enable_diag_print)
 | 
						|
		    && num_doc_processed % 10000 == 1) {
 | 
						|
			ib::info() << "Number of documents processed: "
 | 
						|
				<< num_doc_processed;
 | 
						|
#ifdef FTS_INTERNAL_DIAG_PRINT
 | 
						|
			for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
 | 
						|
				ib::info() << "ID " << psort_info->psort_id
 | 
						|
					<< ", partition " << i << ", word "
 | 
						|
					<< mycount[i];
 | 
						|
			}
 | 
						|
#endif
 | 
						|
		}
 | 
						|
 | 
						|
		mem_heap_empty(blob_heap);
 | 
						|
 | 
						|
		row_merge_fts_get_next_doc_item(psort_info, &doc_item);
 | 
						|
 | 
						|
		if (doc_item && last_doc_id != doc_item->doc_id) {
 | 
						|
			t_ctx.init_pos = 0;
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	/* If we run out of current sort buffer, need to sort
 | 
						|
	and flush the sort buffer to disk */
 | 
						|
	if (t_ctx.rows_added[t_ctx.buf_used] && !processed) {
 | 
						|
		row_merge_buf_sort(buf[t_ctx.buf_used], NULL);
 | 
						|
		row_merge_buf_write(buf[t_ctx.buf_used],
 | 
						|
#ifndef DBUG_OFF
 | 
						|
				    merge_file[t_ctx.buf_used],
 | 
						|
#endif
 | 
						|
				    block[t_ctx.buf_used]);
 | 
						|
 | 
						|
		if (!row_merge_write(merge_file[t_ctx.buf_used]->fd,
 | 
						|
				     merge_file[t_ctx.buf_used]->offset++,
 | 
						|
				     block[t_ctx.buf_used],
 | 
						|
				     crypt_block[t_ctx.buf_used],
 | 
						|
				     table->space_id)) {
 | 
						|
			error = DB_TEMP_FILE_WRITE_FAIL;
 | 
						|
			goto func_exit;
 | 
						|
		}
 | 
						|
 | 
						|
		MEM_UNDEFINED(block[t_ctx.buf_used], srv_sort_buf_size);
 | 
						|
		buf[t_ctx.buf_used] = row_merge_buf_empty(buf[t_ctx.buf_used]);
 | 
						|
		mycount[t_ctx.buf_used] += t_ctx.rows_added[t_ctx.buf_used];
 | 
						|
		t_ctx.rows_added[t_ctx.buf_used] = 0;
 | 
						|
 | 
						|
		ut_a(doc_item);
 | 
						|
		goto loop;
 | 
						|
	}
 | 
						|
 | 
						|
	/* Parent done scanning, and if finish processing all the docs, exit */
 | 
						|
	if (psort_info->state == FTS_PARENT_COMPLETE) {
 | 
						|
		if (UT_LIST_GET_LEN(psort_info->fts_doc_list) == 0) {
 | 
						|
			goto exit;
 | 
						|
		} else if (retried > 10000) {
 | 
						|
			ut_ad(!doc_item);
 | 
						|
			/* retried too many times and cannot get new record */
 | 
						|
			ib::error() << "FTS parallel sort processed "
 | 
						|
				<< num_doc_processed
 | 
						|
				<< " records, the sort queue has "
 | 
						|
				<< UT_LIST_GET_LEN(psort_info->fts_doc_list)
 | 
						|
				<< " records. But sort cannot get the next"
 | 
						|
				" records during alter table " << table->name;
 | 
						|
			goto exit;
 | 
						|
		}
 | 
						|
	} else if (psort_info->state == FTS_PARENT_EXITING) {
 | 
						|
		/* Parent abort */
 | 
						|
		goto func_exit;
 | 
						|
	}
 | 
						|
 | 
						|
	if (doc_item == NULL) {
 | 
						|
		std::this_thread::yield();
 | 
						|
	}
 | 
						|
 | 
						|
	row_merge_fts_get_next_doc_item(psort_info, &doc_item);
 | 
						|
 | 
						|
	if (doc_item != NULL) {
 | 
						|
		if (last_doc_id != doc_item->doc_id) {
 | 
						|
			t_ctx.init_pos = 0;
 | 
						|
		}
 | 
						|
 | 
						|
		retried = 0;
 | 
						|
	} else if (psort_info->state == FTS_PARENT_COMPLETE) {
 | 
						|
		retried++;
 | 
						|
	}
 | 
						|
 | 
						|
	goto loop;
 | 
						|
 | 
						|
exit:
 | 
						|
	/* Do a final sort of the last (or latest) batch of records
 | 
						|
	in block memory. Flush them to temp file if records cannot
 | 
						|
	be hold in one block memory */
 | 
						|
	for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
 | 
						|
		if (t_ctx.rows_added[i]) {
 | 
						|
			row_merge_buf_sort(buf[i], NULL);
 | 
						|
			row_merge_buf_write(buf[i],
 | 
						|
#ifndef DBUG_OFF
 | 
						|
					    merge_file[i],
 | 
						|
#endif
 | 
						|
					    block[i]);
 | 
						|
 | 
						|
			/* Write to temp file, only if records have
 | 
						|
			been flushed to temp file before (offset > 0):
 | 
						|
			The pseudo code for sort is following:
 | 
						|
 | 
						|
				while (there are rows) {
 | 
						|
					tokenize rows, put result in block[]
 | 
						|
					if (block[] runs out) {
 | 
						|
						sort rows;
 | 
						|
						write to temp file with
 | 
						|
						row_merge_write();
 | 
						|
						offset++;
 | 
						|
					}
 | 
						|
				}
 | 
						|
 | 
						|
				# write out the last batch
 | 
						|
				if (offset > 0) {
 | 
						|
					row_merge_write();
 | 
						|
					offset++;
 | 
						|
				} else {
 | 
						|
					# no need to write anything
 | 
						|
					offset stay as 0
 | 
						|
				}
 | 
						|
 | 
						|
			so if merge_file[i]->offset is 0 when we come to
 | 
						|
			here as the last batch, this means rows have
 | 
						|
			never flush to temp file, it can be held all in
 | 
						|
			memory */
 | 
						|
			if (merge_file[i]->offset != 0) {
 | 
						|
				if (!row_merge_write(merge_file[i]->fd,
 | 
						|
						merge_file[i]->offset++,
 | 
						|
						block[i],
 | 
						|
						crypt_block[i],
 | 
						|
						table->space_id)) {
 | 
						|
					error = DB_TEMP_FILE_WRITE_FAIL;
 | 
						|
					goto func_exit;
 | 
						|
				}
 | 
						|
 | 
						|
#ifdef HAVE_valgrind
 | 
						|
				MEM_UNDEFINED(block[i], srv_sort_buf_size);
 | 
						|
 | 
						|
				if (crypt_block[i]) {
 | 
						|
					MEM_UNDEFINED(crypt_block[i],
 | 
						|
						      srv_sort_buf_size);
 | 
						|
				}
 | 
						|
#endif /* HAVE_valgrind */
 | 
						|
			}
 | 
						|
 | 
						|
			buf[i] = row_merge_buf_empty(buf[i]);
 | 
						|
			t_ctx.rows_added[i] = 0;
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if (UNIV_UNLIKELY(fts_enable_diag_print)) {
 | 
						|
		DEBUG_FTS_SORT_PRINT("  InnoDB_FTS: start merge sort\n");
 | 
						|
	}
 | 
						|
 | 
						|
	for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
 | 
						|
		if (!merge_file[i]->offset) {
 | 
						|
			continue;
 | 
						|
		}
 | 
						|
 | 
						|
		tmpfd[i] = row_merge_file_create_low(path);
 | 
						|
		if (tmpfd[i] == OS_FILE_CLOSED) {
 | 
						|
			error = DB_OUT_OF_MEMORY;
 | 
						|
			goto func_exit;
 | 
						|
		}
 | 
						|
 | 
						|
		error = row_merge_sort(psort_info->psort_common->trx,
 | 
						|
				       psort_info->psort_common->dup,
 | 
						|
				       merge_file[i], block[i], &tmpfd[i],
 | 
						|
				       false, 0.0/* pct_progress */, 0.0/* pct_cost */,
 | 
						|
				       crypt_block[i], table->space_id);
 | 
						|
 | 
						|
		if (error != DB_SUCCESS) {
 | 
						|
			row_merge_file_destroy_low(tmpfd[i]);
 | 
						|
			goto func_exit;
 | 
						|
		}
 | 
						|
 | 
						|
		row_merge_file_destroy_low(tmpfd[i]);
 | 
						|
	}
 | 
						|
 | 
						|
func_exit:
 | 
						|
	if (UNIV_UNLIKELY(fts_enable_diag_print)) {
 | 
						|
		DEBUG_FTS_SORT_PRINT("  InnoDB_FTS: complete merge sort\n");
 | 
						|
	}
 | 
						|
 | 
						|
	mem_heap_free(blob_heap);
 | 
						|
 | 
						|
	mysql_mutex_lock(&psort_info->mutex);
 | 
						|
	psort_info->error = error;
 | 
						|
	mysql_mutex_unlock(&psort_info->mutex);
 | 
						|
 | 
						|
	if (UT_LIST_GET_LEN(psort_info->fts_doc_list) > 0) {
 | 
						|
		/* child can exit either with error or told by parent. */
 | 
						|
		ut_ad(error != DB_SUCCESS
 | 
						|
		      || psort_info->state == FTS_PARENT_EXITING);
 | 
						|
	}
 | 
						|
 | 
						|
	/* Free fts doc list in case of error. */
 | 
						|
	do {
 | 
						|
		row_merge_fts_get_next_doc_item(psort_info, &doc_item);
 | 
						|
	} while (doc_item != NULL);
 | 
						|
 | 
						|
	mysql_mutex_lock(&psort_info->mutex);
 | 
						|
	psort_info->child_status = FTS_CHILD_COMPLETE;
 | 
						|
	pthread_cond_signal(&psort_info->psort_common->sort_cond);
 | 
						|
	mysql_mutex_unlock(&psort_info->mutex);
 | 
						|
}
 | 
						|
 | 
						|
/*********************************************************************//**
 | 
						|
Start the parallel tokenization and parallel merge sort */
 | 
						|
void
 | 
						|
row_fts_start_psort(
 | 
						|
/*================*/
 | 
						|
	fts_psort_t*	psort_info)	/*!< parallel sort structure */
 | 
						|
{
 | 
						|
	ulint		i = 0;
 | 
						|
 | 
						|
	for (i = 0; i < fts_sort_pll_degree; i++) {
 | 
						|
		psort_info[i].psort_id = i;
 | 
						|
		psort_info[i].task =
 | 
						|
			new tpool::waitable_task(fts_parallel_tokenization,&psort_info[i]);
 | 
						|
		srv_thread_pool->submit_task(psort_info[i].task);
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
/*********************************************************************//**
 | 
						|
Function performs the merge and insertion of the sorted records. */
 | 
						|
static
 | 
						|
void
 | 
						|
fts_parallel_merge(
 | 
						|
/*===============*/
 | 
						|
	void*		arg)		/*!< in: parallel merge info */
 | 
						|
{
 | 
						|
	fts_psort_t*	psort_info = (fts_psort_t*) arg;
 | 
						|
	ulint		id;
 | 
						|
 | 
						|
	ut_ad(psort_info);
 | 
						|
 | 
						|
	id = psort_info->psort_id;
 | 
						|
 | 
						|
	row_fts_merge_insert(psort_info->psort_common->dup->index,
 | 
						|
			     psort_info->psort_common->new_table,
 | 
						|
			     psort_info->psort_common->all_info, id);
 | 
						|
}
 | 
						|
 | 
						|
/*********************************************************************//**
 | 
						|
Kick off the parallel merge and insert thread */
 | 
						|
void
 | 
						|
row_fts_start_parallel_merge(
 | 
						|
/*=========================*/
 | 
						|
	fts_psort_t*	merge_info)	/*!< in: parallel sort info */
 | 
						|
{
 | 
						|
	ulint		i = 0;
 | 
						|
 | 
						|
	/* Kick off merge/insert tasks */
 | 
						|
	for (i = 0; i <  FTS_NUM_AUX_INDEX; i++) {
 | 
						|
		merge_info[i].psort_id = i;
 | 
						|
		merge_info[i].child_status = 0;
 | 
						|
 | 
						|
		merge_info[i].task = new tpool::waitable_task(
 | 
						|
			fts_parallel_merge,
 | 
						|
			(void*) &merge_info[i]);
 | 
						|
		srv_thread_pool->submit_task(merge_info[i].task);
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
Write out a single word's data as new entry/entries in the INDEX table.
 | 
						|
@param[in]	ins_ctx	insert context
 | 
						|
@param[in]	word	word string
 | 
						|
@param[in]	node	node colmns
 | 
						|
@return	DB_SUCCUESS if insertion runs fine, otherwise error code */
 | 
						|
static
 | 
						|
dberr_t
 | 
						|
row_merge_write_fts_node(
 | 
						|
	const	fts_psort_insert_t*	ins_ctx,
 | 
						|
	const	fts_string_t*		word,
 | 
						|
	const	fts_node_t*		node)
 | 
						|
{
 | 
						|
	dtuple_t*	tuple;
 | 
						|
	dfield_t*	field;
 | 
						|
	dberr_t		ret = DB_SUCCESS;
 | 
						|
	doc_id_t	write_first_doc_id[8];
 | 
						|
	doc_id_t	write_last_doc_id[8];
 | 
						|
	ib_uint32_t	write_doc_count;
 | 
						|
 | 
						|
	tuple = ins_ctx->tuple;
 | 
						|
 | 
						|
	/* The first field is the tokenized word */
 | 
						|
	field = dtuple_get_nth_field(tuple, 0);
 | 
						|
	dfield_set_data(field, word->f_str, word->f_len);
 | 
						|
 | 
						|
	/* The second field is first_doc_id */
 | 
						|
	field = dtuple_get_nth_field(tuple, 1);
 | 
						|
	fts_write_doc_id((byte*)&write_first_doc_id, node->first_doc_id);
 | 
						|
	dfield_set_data(field, &write_first_doc_id, sizeof(doc_id_t));
 | 
						|
 | 
						|
	/* The third and fourth fileds(TRX_ID, ROLL_PTR) are filled already.*/
 | 
						|
	/* The fifth field is last_doc_id */
 | 
						|
	field = dtuple_get_nth_field(tuple, 4);
 | 
						|
	fts_write_doc_id((byte*)&write_last_doc_id, node->last_doc_id);
 | 
						|
	dfield_set_data(field, &write_last_doc_id, sizeof(doc_id_t));
 | 
						|
 | 
						|
	/* The sixth field is doc_count */
 | 
						|
	field = dtuple_get_nth_field(tuple, 5);
 | 
						|
	mach_write_to_4((byte*)&write_doc_count, (ib_uint32_t)node->doc_count);
 | 
						|
	dfield_set_data(field, &write_doc_count, sizeof(ib_uint32_t));
 | 
						|
 | 
						|
	/* The seventh field is ilist */
 | 
						|
	field = dtuple_get_nth_field(tuple, 6);
 | 
						|
	dfield_set_data(field, node->ilist, node->ilist_size);
 | 
						|
 | 
						|
	ret = ins_ctx->btr_bulk->insert(tuple);
 | 
						|
 | 
						|
	return(ret);
 | 
						|
}
 | 
						|
 | 
						|
/********************************************************************//**
 | 
						|
Insert processed FTS data to auxillary index tables.
 | 
						|
@return DB_SUCCESS if insertion runs fine */
 | 
						|
static MY_ATTRIBUTE((nonnull))
 | 
						|
dberr_t
 | 
						|
row_merge_write_fts_word(
 | 
						|
/*=====================*/
 | 
						|
	fts_psort_insert_t*	ins_ctx,	/*!< in: insert context */
 | 
						|
	fts_tokenizer_word_t*	word)		/*!< in: sorted and tokenized
 | 
						|
						word */
 | 
						|
{
 | 
						|
	dberr_t	ret = DB_SUCCESS;
 | 
						|
 | 
						|
	ut_ad(ins_ctx->aux_index_id == fts_select_index(
 | 
						|
		ins_ctx->charset, word->text.f_str, word->text.f_len));
 | 
						|
 | 
						|
	/* Pop out each fts_node in word->nodes write them to auxiliary table */
 | 
						|
	for (ulint i = 0; i < ib_vector_size(word->nodes); i++) {
 | 
						|
		dberr_t		error;
 | 
						|
		fts_node_t*	fts_node;
 | 
						|
 | 
						|
		fts_node = static_cast<fts_node_t*>(ib_vector_get(word->nodes, i));
 | 
						|
 | 
						|
		error = row_merge_write_fts_node(ins_ctx, &word->text, fts_node);
 | 
						|
 | 
						|
		if (UNIV_UNLIKELY(error != DB_SUCCESS)) {
 | 
						|
			ib::error() << "Failed to write word to FTS auxiliary"
 | 
						|
				" index table "
 | 
						|
				<< ins_ctx->btr_bulk->table_name()
 | 
						|
				<< ", error " << error;
 | 
						|
			ret = error;
 | 
						|
		}
 | 
						|
 | 
						|
		ut_free(fts_node->ilist);
 | 
						|
		fts_node->ilist = NULL;
 | 
						|
	}
 | 
						|
 | 
						|
	ib_vector_reset(word->nodes);
 | 
						|
 | 
						|
	return(ret);
 | 
						|
}
 | 
						|
 | 
						|
/*********************************************************************//**
 | 
						|
Read sorted FTS data files and insert data tuples to auxillary tables.
 | 
						|
@return DB_SUCCESS or error number */
 | 
						|
static
 | 
						|
void
 | 
						|
row_fts_insert_tuple(
 | 
						|
/*=================*/
 | 
						|
	fts_psort_insert_t*
 | 
						|
			ins_ctx,	/*!< in: insert context */
 | 
						|
	fts_tokenizer_word_t* word,	/*!< in: last processed
 | 
						|
					tokenized word */
 | 
						|
	ib_vector_t*	positions,	/*!< in: word position */
 | 
						|
	doc_id_t*	in_doc_id,	/*!< in: last item doc id */
 | 
						|
	dtuple_t*	dtuple)		/*!< in: entry to insert */
 | 
						|
{
 | 
						|
	fts_node_t*	fts_node = NULL;
 | 
						|
	dfield_t*	dfield;
 | 
						|
	doc_id_t	doc_id;
 | 
						|
	ulint		position;
 | 
						|
	fts_string_t	token_word;
 | 
						|
	ulint		i;
 | 
						|
 | 
						|
	/* Get fts_node for the FTS auxillary INDEX table */
 | 
						|
	if (ib_vector_size(word->nodes) > 0) {
 | 
						|
		fts_node = static_cast<fts_node_t*>(
 | 
						|
			ib_vector_last(word->nodes));
 | 
						|
	}
 | 
						|
 | 
						|
	if (fts_node == NULL
 | 
						|
	    || fts_node->ilist_size > FTS_ILIST_MAX_SIZE) {
 | 
						|
 | 
						|
		fts_node = static_cast<fts_node_t*>(
 | 
						|
			ib_vector_push(word->nodes, NULL));
 | 
						|
 | 
						|
		memset(fts_node, 0x0, sizeof(*fts_node));
 | 
						|
	}
 | 
						|
 | 
						|
	/* If dtuple == NULL, this is the last word to be processed */
 | 
						|
	if (!dtuple) {
 | 
						|
		if (fts_node && ib_vector_size(positions) > 0) {
 | 
						|
			fts_cache_node_add_positions(
 | 
						|
				NULL, fts_node, *in_doc_id,
 | 
						|
				positions);
 | 
						|
 | 
						|
			/* Write out the current word */
 | 
						|
			row_merge_write_fts_word(ins_ctx, word);
 | 
						|
		}
 | 
						|
 | 
						|
		return;
 | 
						|
	}
 | 
						|
 | 
						|
	/* Get the first field for the tokenized word */
 | 
						|
	dfield = dtuple_get_nth_field(dtuple, 0);
 | 
						|
 | 
						|
	token_word.f_n_char = 0;
 | 
						|
	token_word.f_len = dfield->len;
 | 
						|
	token_word.f_str = static_cast<byte*>(dfield_get_data(dfield));
 | 
						|
 | 
						|
	if (!word->text.f_str) {
 | 
						|
		fts_string_dup(&word->text, &token_word, ins_ctx->heap);
 | 
						|
	}
 | 
						|
 | 
						|
	/* compare to the last word, to see if they are the same
 | 
						|
	word */
 | 
						|
	if (innobase_fts_text_cmp(ins_ctx->charset,
 | 
						|
				  &word->text, &token_word) != 0) {
 | 
						|
		ulint	num_item;
 | 
						|
 | 
						|
		/* Getting a new word, flush the last position info
 | 
						|
		for the currnt word in fts_node */
 | 
						|
		if (ib_vector_size(positions) > 0) {
 | 
						|
			fts_cache_node_add_positions(
 | 
						|
				NULL, fts_node, *in_doc_id, positions);
 | 
						|
		}
 | 
						|
 | 
						|
		/* Write out the current word */
 | 
						|
		row_merge_write_fts_word(ins_ctx, word);
 | 
						|
 | 
						|
		/* Copy the new word */
 | 
						|
		fts_string_dup(&word->text, &token_word, ins_ctx->heap);
 | 
						|
 | 
						|
		num_item = ib_vector_size(positions);
 | 
						|
 | 
						|
		/* Clean up position queue */
 | 
						|
		for (i = 0; i < num_item; i++) {
 | 
						|
			ib_vector_pop(positions);
 | 
						|
		}
 | 
						|
 | 
						|
		/* Reset Doc ID */
 | 
						|
		*in_doc_id = 0;
 | 
						|
		memset(fts_node, 0x0, sizeof(*fts_node));
 | 
						|
	}
 | 
						|
 | 
						|
	/* Get the word's Doc ID */
 | 
						|
	dfield = dtuple_get_nth_field(dtuple, 1);
 | 
						|
 | 
						|
	if (!ins_ctx->opt_doc_id_size) {
 | 
						|
		doc_id = fts_read_doc_id(
 | 
						|
			static_cast<byte*>(dfield_get_data(dfield)));
 | 
						|
	} else {
 | 
						|
		doc_id = (doc_id_t) mach_read_from_4(
 | 
						|
			static_cast<byte*>(dfield_get_data(dfield)));
 | 
						|
	}
 | 
						|
 | 
						|
	/* Get the word's position info */
 | 
						|
	dfield = dtuple_get_nth_field(dtuple, 2);
 | 
						|
	position = mach_read_from_4(static_cast<byte*>(dfield_get_data(dfield)));
 | 
						|
 | 
						|
	/* If this is the same word as the last word, and they
 | 
						|
	have the same Doc ID, we just need to add its position
 | 
						|
	info. Otherwise, we will flush position info to the
 | 
						|
	fts_node and initiate a new position vector  */
 | 
						|
	if (!(*in_doc_id) || *in_doc_id == doc_id) {
 | 
						|
		ib_vector_push(positions, &position);
 | 
						|
	} else {
 | 
						|
		ulint	num_pos = ib_vector_size(positions);
 | 
						|
 | 
						|
		fts_cache_node_add_positions(NULL, fts_node,
 | 
						|
					     *in_doc_id, positions);
 | 
						|
		for (i = 0; i < num_pos; i++) {
 | 
						|
			ib_vector_pop(positions);
 | 
						|
		}
 | 
						|
		ib_vector_push(positions, &position);
 | 
						|
	}
 | 
						|
 | 
						|
	/* record the current Doc ID */
 | 
						|
	*in_doc_id = doc_id;
 | 
						|
}
 | 
						|
 | 
						|
/*********************************************************************//**
 | 
						|
Propagate a newly added record up one level in the selection tree
 | 
						|
@return parent where this value propagated to */
 | 
						|
static
 | 
						|
ulint
 | 
						|
row_fts_sel_tree_propagate(
 | 
						|
/*=======================*/
 | 
						|
	ulint		propogated,	/*<! in: tree node propagated */
 | 
						|
	int*		sel_tree,	/*<! in: selection tree */
 | 
						|
	const mrec_t**	mrec,		/*<! in: sort record */
 | 
						|
	rec_offs**	offsets,	/*<! in: record offsets */
 | 
						|
	dict_index_t*	index)		/*<! in/out: FTS index */
 | 
						|
{
 | 
						|
	ulint	parent;
 | 
						|
	int	child_left;
 | 
						|
	int	child_right;
 | 
						|
	int	selected;
 | 
						|
 | 
						|
	/* Find which parent this value will be propagated to */
 | 
						|
	parent = (propogated - 1) / 2;
 | 
						|
 | 
						|
	/* Find out which value is smaller, and to propagate */
 | 
						|
	child_left = sel_tree[parent * 2 + 1];
 | 
						|
	child_right = sel_tree[parent * 2 + 2];
 | 
						|
 | 
						|
	if (child_left == -1 || mrec[child_left] == NULL) {
 | 
						|
		if (child_right == -1
 | 
						|
		    || mrec[child_right] == NULL) {
 | 
						|
			selected = -1;
 | 
						|
		} else {
 | 
						|
			selected = child_right ;
 | 
						|
		}
 | 
						|
	} else if (child_right == -1
 | 
						|
		   || mrec[child_right] == NULL) {
 | 
						|
		selected = child_left;
 | 
						|
	} else if (cmp_rec_rec_simple(mrec[child_left], mrec[child_right],
 | 
						|
				      offsets[child_left],
 | 
						|
				      offsets[child_right],
 | 
						|
				      index, NULL) < 0) {
 | 
						|
		selected = child_left;
 | 
						|
	} else {
 | 
						|
		selected = child_right;
 | 
						|
	}
 | 
						|
 | 
						|
	sel_tree[parent] = selected;
 | 
						|
 | 
						|
	return parent;
 | 
						|
}
 | 
						|
 | 
						|
/*********************************************************************//**
 | 
						|
Readjust selection tree after popping the root and read a new value
 | 
						|
@return the new root */
 | 
						|
static
 | 
						|
int
 | 
						|
row_fts_sel_tree_update(
 | 
						|
/*====================*/
 | 
						|
	int*		sel_tree,	/*<! in/out: selection tree */
 | 
						|
	ulint		propagated,	/*<! in: node to propagate up */
 | 
						|
	ulint		height,		/*<! in: tree height */
 | 
						|
	const mrec_t**	mrec,		/*<! in: sort record */
 | 
						|
	rec_offs**	offsets,	/*<! in: record offsets */
 | 
						|
	dict_index_t*	index)		/*<! in: index dictionary */
 | 
						|
{
 | 
						|
	ulint	i;
 | 
						|
 | 
						|
	for (i = 1; i <= height; i++) {
 | 
						|
		propagated = row_fts_sel_tree_propagate(
 | 
						|
			propagated, sel_tree, mrec, offsets, index);
 | 
						|
	}
 | 
						|
 | 
						|
	return(sel_tree[0]);
 | 
						|
}
 | 
						|
 | 
						|
/*********************************************************************//**
 | 
						|
Build selection tree at a specified level */
 | 
						|
static
 | 
						|
void
 | 
						|
row_fts_build_sel_tree_level(
 | 
						|
/*=========================*/
 | 
						|
	int*		sel_tree,	/*<! in/out: selection tree */
 | 
						|
	ulint		level,		/*<! in: selection tree level */
 | 
						|
	const mrec_t**	mrec,		/*<! in: sort record */
 | 
						|
	rec_offs**	offsets,	/*<! in: record offsets */
 | 
						|
	dict_index_t*	index)		/*<! in: index dictionary */
 | 
						|
{
 | 
						|
	ulint	start;
 | 
						|
	int	child_left;
 | 
						|
	int	child_right;
 | 
						|
	ulint	i;
 | 
						|
	ulint	num_item	= ulint(1) << level;
 | 
						|
 | 
						|
	start = num_item - 1;
 | 
						|
 | 
						|
	for (i = 0; i < num_item;  i++) {
 | 
						|
		child_left = sel_tree[(start + i) * 2 + 1];
 | 
						|
		child_right = sel_tree[(start + i) * 2 + 2];
 | 
						|
 | 
						|
		if (child_left == -1) {
 | 
						|
			if (child_right == -1) {
 | 
						|
				sel_tree[start + i] = -1;
 | 
						|
			} else {
 | 
						|
				sel_tree[start + i] =  child_right;
 | 
						|
			}
 | 
						|
			continue;
 | 
						|
		} else if (child_right == -1) {
 | 
						|
			sel_tree[start + i] = child_left;
 | 
						|
			continue;
 | 
						|
		}
 | 
						|
 | 
						|
		/* Deal with NULL child conditions */
 | 
						|
		if (!mrec[child_left]) {
 | 
						|
			if (!mrec[child_right]) {
 | 
						|
				sel_tree[start + i] = -1;
 | 
						|
			} else {
 | 
						|
				sel_tree[start + i] = child_right;
 | 
						|
			}
 | 
						|
			continue;
 | 
						|
		} else if (!mrec[child_right]) {
 | 
						|
			sel_tree[start + i] = child_left;
 | 
						|
			continue;
 | 
						|
		}
 | 
						|
 | 
						|
		/* Select the smaller one to set parent pointer */
 | 
						|
		int cmp = cmp_rec_rec_simple(
 | 
						|
			mrec[child_left], mrec[child_right],
 | 
						|
			offsets[child_left], offsets[child_right],
 | 
						|
			index, NULL);
 | 
						|
 | 
						|
		sel_tree[start + i] = cmp < 0 ? child_left : child_right;
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
/*********************************************************************//**
 | 
						|
Build a selection tree for merge. The selection tree is a binary tree
 | 
						|
and should have fts_sort_pll_degree / 2 levels. With root as level 0
 | 
						|
@return number of tree levels */
 | 
						|
static
 | 
						|
ulint
 | 
						|
row_fts_build_sel_tree(
 | 
						|
/*===================*/
 | 
						|
	int*		sel_tree,	/*<! in/out: selection tree */
 | 
						|
	const mrec_t**	mrec,		/*<! in: sort record */
 | 
						|
	rec_offs**	offsets,	/*<! in: record offsets */
 | 
						|
	dict_index_t*	index)		/*<! in: index dictionary */
 | 
						|
{
 | 
						|
	ulint	treelevel = 1;
 | 
						|
	ulint	num = 2;
 | 
						|
	ulint	i = 0;
 | 
						|
	ulint	start;
 | 
						|
 | 
						|
	/* No need to build selection tree if we only have two merge threads */
 | 
						|
	if (fts_sort_pll_degree <= 2) {
 | 
						|
		return(0);
 | 
						|
	}
 | 
						|
 | 
						|
	while (num < fts_sort_pll_degree) {
 | 
						|
		num = num << 1;
 | 
						|
		treelevel++;
 | 
						|
	}
 | 
						|
 | 
						|
	start = (ulint(1) << treelevel) - 1;
 | 
						|
 | 
						|
	for (i = 0; i < fts_sort_pll_degree; i++) {
 | 
						|
		sel_tree[i + start] = int(i);
 | 
						|
	}
 | 
						|
 | 
						|
	i = treelevel;
 | 
						|
	do {
 | 
						|
		row_fts_build_sel_tree_level(
 | 
						|
			sel_tree, --i, mrec, offsets, index);
 | 
						|
	} while (i > 0);
 | 
						|
 | 
						|
	return(treelevel);
 | 
						|
}
 | 
						|
 | 
						|
/*********************************************************************//**
 | 
						|
Read sorted file containing index data tuples and insert these data
 | 
						|
tuples to the index
 | 
						|
@return DB_SUCCESS or error number */
 | 
						|
dberr_t
 | 
						|
row_fts_merge_insert(
 | 
						|
/*=================*/
 | 
						|
	dict_index_t*		index,	/*!< in: index */
 | 
						|
	dict_table_t*		table,	/*!< in: new table */
 | 
						|
	fts_psort_t*		psort_info, /*!< parallel sort info */
 | 
						|
	ulint			id)	/* !< in: which auxiliary table's data
 | 
						|
					to insert to */
 | 
						|
{
 | 
						|
	const byte**		b;
 | 
						|
	mem_heap_t*		tuple_heap;
 | 
						|
	mem_heap_t*		heap;
 | 
						|
	dberr_t			error = DB_SUCCESS;
 | 
						|
	ulint*			foffs;
 | 
						|
	rec_offs**		offsets;
 | 
						|
	fts_tokenizer_word_t	new_word;
 | 
						|
	ib_vector_t*		positions;
 | 
						|
	doc_id_t		last_doc_id;
 | 
						|
	ib_alloc_t*		heap_alloc;
 | 
						|
	ulint			i;
 | 
						|
	mrec_buf_t**		buf;
 | 
						|
	pfs_os_file_t*			fd;
 | 
						|
	byte**			block;
 | 
						|
	byte**			crypt_block;
 | 
						|
	const mrec_t**		mrec;
 | 
						|
	ulint			count = 0;
 | 
						|
	int*			sel_tree;
 | 
						|
	ulint			height;
 | 
						|
	ulint			start;
 | 
						|
	fts_psort_insert_t	ins_ctx;
 | 
						|
	uint64_t		count_diag = 0;
 | 
						|
	fts_table_t		fts_table;
 | 
						|
	char			aux_table_name[MAX_FULL_NAME_LEN];
 | 
						|
	dict_table_t*		aux_table;
 | 
						|
	dict_index_t*		aux_index;
 | 
						|
	trx_t*			trx;
 | 
						|
 | 
						|
	/* We use the insert query graph as the dummy graph
 | 
						|
	needed in the row module call */
 | 
						|
 | 
						|
	trx = trx_create();
 | 
						|
	trx_start_if_not_started(trx, true);
 | 
						|
 | 
						|
	trx->op_info = "inserting index entries";
 | 
						|
 | 
						|
	ins_ctx.opt_doc_id_size = psort_info[0].psort_common->opt_doc_id_size;
 | 
						|
 | 
						|
	heap = mem_heap_create(500 + sizeof(mrec_buf_t));
 | 
						|
 | 
						|
	b = (const byte**) mem_heap_alloc(
 | 
						|
		heap, sizeof (*b) * fts_sort_pll_degree);
 | 
						|
	foffs = (ulint*) mem_heap_alloc(
 | 
						|
		heap, sizeof(*foffs) * fts_sort_pll_degree);
 | 
						|
	offsets = (rec_offs**) mem_heap_alloc(
 | 
						|
		heap, sizeof(*offsets) * fts_sort_pll_degree);
 | 
						|
	buf = (mrec_buf_t**) mem_heap_alloc(
 | 
						|
		heap, sizeof(*buf) * fts_sort_pll_degree);
 | 
						|
	fd = (pfs_os_file_t*) mem_heap_alloc(heap, sizeof(*fd) * fts_sort_pll_degree);
 | 
						|
	block = (byte**) mem_heap_alloc(
 | 
						|
		heap, sizeof(*block) * fts_sort_pll_degree);
 | 
						|
	crypt_block = (byte**) mem_heap_alloc(
 | 
						|
		heap, sizeof(*block) * fts_sort_pll_degree);
 | 
						|
	mrec = (const mrec_t**) mem_heap_alloc(
 | 
						|
		heap, sizeof(*mrec) * fts_sort_pll_degree);
 | 
						|
	sel_tree = (int*) mem_heap_alloc(
 | 
						|
		heap, sizeof(*sel_tree) * (fts_sort_pll_degree * 2));
 | 
						|
 | 
						|
	tuple_heap = mem_heap_create(1000);
 | 
						|
 | 
						|
	ins_ctx.charset = fts_index_get_charset(index);
 | 
						|
	ins_ctx.heap = heap;
 | 
						|
 | 
						|
	for (i = 0; i < fts_sort_pll_degree; i++) {
 | 
						|
		ulint	num;
 | 
						|
 | 
						|
		num = 1 + REC_OFFS_HEADER_SIZE
 | 
						|
			+ dict_index_get_n_fields(index);
 | 
						|
		offsets[i] = static_cast<rec_offs*>(mem_heap_zalloc(
 | 
						|
			heap, num * sizeof *offsets[i]));
 | 
						|
		rec_offs_set_n_alloc(offsets[i], num);
 | 
						|
		rec_offs_set_n_fields(offsets[i], dict_index_get_n_fields(index));
 | 
						|
		block[i] = psort_info[i].merge_block[id];
 | 
						|
		crypt_block[i] = psort_info[i].crypt_block[id];
 | 
						|
		b[i] = psort_info[i].merge_block[id];
 | 
						|
		fd[i] = psort_info[i].merge_file[id]->fd;
 | 
						|
		foffs[i] = 0;
 | 
						|
 | 
						|
		buf[i] = static_cast<mrec_buf_t*>(
 | 
						|
			mem_heap_alloc(heap, sizeof *buf[i]));
 | 
						|
 | 
						|
		count_diag += psort_info[i].merge_file[id]->n_rec;
 | 
						|
	}
 | 
						|
 | 
						|
	if (UNIV_UNLIKELY(fts_enable_diag_print)) {
 | 
						|
		ib::info() << "InnoDB_FTS: to insert " << count_diag
 | 
						|
			<< " records";
 | 
						|
	}
 | 
						|
 | 
						|
	/* Initialize related variables if creating FTS indexes */
 | 
						|
	heap_alloc = ib_heap_allocator_create(heap);
 | 
						|
 | 
						|
	memset(&new_word, 0, sizeof(new_word));
 | 
						|
 | 
						|
	new_word.nodes = ib_vector_create(heap_alloc, sizeof(fts_node_t), 4);
 | 
						|
	positions = ib_vector_create(heap_alloc, sizeof(ulint), 32);
 | 
						|
	last_doc_id = 0;
 | 
						|
 | 
						|
	/* We should set the flags2 with aux_table_name here,
 | 
						|
	in order to get the correct aux table names. */
 | 
						|
	index->table->flags2 |= DICT_TF2_FTS_AUX_HEX_NAME;
 | 
						|
	fts_table.type = FTS_INDEX_TABLE;
 | 
						|
	fts_table.index_id = index->id;
 | 
						|
	fts_table.table_id = table->id;
 | 
						|
	fts_table.table = index->table;
 | 
						|
	fts_table.suffix = fts_get_suffix(id);
 | 
						|
 | 
						|
	/* Get aux index */
 | 
						|
	fts_get_table_name(&fts_table, aux_table_name);
 | 
						|
	aux_table = dict_table_open_on_name(aux_table_name, false,
 | 
						|
					    DICT_ERR_IGNORE_NONE);
 | 
						|
	ut_ad(aux_table != NULL);
 | 
						|
	aux_index = dict_table_get_first_index(aux_table);
 | 
						|
 | 
						|
	ut_ad(!aux_index->is_instant());
 | 
						|
	/* row_merge_write_fts_node() depends on the correct value */
 | 
						|
	ut_ad(aux_index->n_core_null_bytes
 | 
						|
	      == UT_BITS_IN_BYTES(aux_index->n_nullable));
 | 
						|
 | 
						|
	/* Create bulk load instance */
 | 
						|
	ins_ctx.btr_bulk = UT_NEW_NOKEY(BtrBulk(aux_index, trx));
 | 
						|
 | 
						|
	/* Create tuple for insert */
 | 
						|
	ins_ctx.tuple = dtuple_create(heap, dict_index_get_n_fields(aux_index));
 | 
						|
	dict_index_copy_types(ins_ctx.tuple, aux_index,
 | 
						|
			      dict_index_get_n_fields(aux_index));
 | 
						|
 | 
						|
	/* Set TRX_ID and ROLL_PTR */
 | 
						|
	dfield_set_data(dtuple_get_nth_field(ins_ctx.tuple, 2),
 | 
						|
			&reset_trx_id, DATA_TRX_ID_LEN);
 | 
						|
	dfield_set_data(dtuple_get_nth_field(ins_ctx.tuple, 3),
 | 
						|
			&reset_trx_id[DATA_TRX_ID_LEN], DATA_ROLL_PTR_LEN);
 | 
						|
 | 
						|
	ut_d(ins_ctx.aux_index_id = id);
 | 
						|
 | 
						|
	const ulint space = table->space_id;
 | 
						|
 | 
						|
	for (i = 0; i < fts_sort_pll_degree; i++) {
 | 
						|
		if (psort_info[i].merge_file[id]->n_rec == 0) {
 | 
						|
			/* No Rows to read */
 | 
						|
			mrec[i] = b[i] = NULL;
 | 
						|
		} else {
 | 
						|
			/* Read from temp file only if it has been
 | 
						|
			written to. Otherwise, block memory holds
 | 
						|
			all the sorted records */
 | 
						|
			if (psort_info[i].merge_file[id]->offset > 0
 | 
						|
			    && (!row_merge_read(
 | 
						|
					fd[i], foffs[i],
 | 
						|
					(row_merge_block_t*) block[i],
 | 
						|
					(row_merge_block_t*) crypt_block[i],
 | 
						|
					space))) {
 | 
						|
				error = DB_CORRUPTION;
 | 
						|
				goto exit;
 | 
						|
			}
 | 
						|
 | 
						|
			ROW_MERGE_READ_GET_NEXT(i);
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	height = row_fts_build_sel_tree(sel_tree, (const mrec_t **) mrec,
 | 
						|
					offsets, index);
 | 
						|
 | 
						|
	start = (1U << height) - 1;
 | 
						|
 | 
						|
	/* Fetch sorted records from sort buffer and insert them into
 | 
						|
	corresponding FTS index auxiliary tables */
 | 
						|
	for (;;) {
 | 
						|
		dtuple_t*	dtuple;
 | 
						|
		int		min_rec = 0;
 | 
						|
 | 
						|
		if (fts_sort_pll_degree <= 2) {
 | 
						|
			while (!mrec[min_rec]) {
 | 
						|
				min_rec++;
 | 
						|
 | 
						|
				if (min_rec >= (int) fts_sort_pll_degree) {
 | 
						|
					row_fts_insert_tuple(
 | 
						|
						&ins_ctx, &new_word,
 | 
						|
						positions, &last_doc_id,
 | 
						|
						NULL);
 | 
						|
 | 
						|
					goto exit;
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			for (i = min_rec + 1; i < fts_sort_pll_degree; i++) {
 | 
						|
				if (!mrec[i]) {
 | 
						|
					continue;
 | 
						|
				}
 | 
						|
 | 
						|
				if (cmp_rec_rec_simple(
 | 
						|
					    mrec[i], mrec[min_rec],
 | 
						|
					    offsets[i], offsets[min_rec],
 | 
						|
					    index, NULL) < 0) {
 | 
						|
					min_rec = static_cast<int>(i);
 | 
						|
				}
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			min_rec = sel_tree[0];
 | 
						|
 | 
						|
			if (min_rec ==  -1) {
 | 
						|
				row_fts_insert_tuple(
 | 
						|
					&ins_ctx, &new_word,
 | 
						|
					positions, &last_doc_id,
 | 
						|
					NULL);
 | 
						|
 | 
						|
				goto exit;
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		dtuple = row_rec_to_index_entry_low(
 | 
						|
			mrec[min_rec], index, offsets[min_rec],
 | 
						|
			tuple_heap);
 | 
						|
 | 
						|
		row_fts_insert_tuple(
 | 
						|
			&ins_ctx, &new_word, positions,
 | 
						|
			&last_doc_id, dtuple);
 | 
						|
 | 
						|
 | 
						|
		ROW_MERGE_READ_GET_NEXT(min_rec);
 | 
						|
 | 
						|
		if (fts_sort_pll_degree > 2) {
 | 
						|
			if (!mrec[min_rec]) {
 | 
						|
				sel_tree[start + min_rec] = -1;
 | 
						|
			}
 | 
						|
 | 
						|
			row_fts_sel_tree_update(sel_tree, start + min_rec,
 | 
						|
						height, mrec,
 | 
						|
						offsets, index);
 | 
						|
		}
 | 
						|
 | 
						|
		count++;
 | 
						|
 | 
						|
		mem_heap_empty(tuple_heap);
 | 
						|
	}
 | 
						|
 | 
						|
exit:
 | 
						|
	fts_sql_commit(trx);
 | 
						|
 | 
						|
	trx->op_info = "";
 | 
						|
 | 
						|
	mem_heap_free(tuple_heap);
 | 
						|
 | 
						|
	error = ins_ctx.btr_bulk->finish(error);
 | 
						|
	UT_DELETE(ins_ctx.btr_bulk);
 | 
						|
 | 
						|
	aux_table->release();
 | 
						|
 | 
						|
	trx->free();
 | 
						|
 | 
						|
	mem_heap_free(heap);
 | 
						|
 | 
						|
	if (UNIV_UNLIKELY(fts_enable_diag_print)) {
 | 
						|
		ib::info() << "InnoDB_FTS: inserted " << count << " records";
 | 
						|
	}
 | 
						|
 | 
						|
	return(error);
 | 
						|
}
 |