mirror of
https://github.com/MariaDB/server.git
synced 2025-04-02 05:15:33 +02:00
MDEV-20621 FULLTEXT INDEX activity causes InnoDB hang
- fts_optimize_thread() uses dict_table_t object instead of table id. So that it doesn't acquire dict_sys->mutex. It leads to remove the hang of dict_sys->mutex between fts_optimize_thread() and other threads. - in_queue to indicate whether the table is in fts_optimize_queue. It is protected by fts_optimize_wq->mutex to avoid any race condition. - fts_optimize_init() adds the fts table to the fts_optimize_wq
This commit is contained in:
parent
bd22650bbc
commit
a41d429765
12 changed files with 246 additions and 264 deletions
storage
innobase
xtradb
|
@ -46,6 +46,7 @@ Created 4/24/1996 Heikki Tuuri
|
|||
#include "dict0priv.h"
|
||||
#include "ha_prototypes.h" /* innobase_casedn_str() */
|
||||
#include "fts0priv.h"
|
||||
#include "fts0opt.h"
|
||||
|
||||
/** Following are the InnoDB system tables. The positions in
|
||||
this array are referenced by enum dict_system_table_id. */
|
||||
|
@ -2548,8 +2549,12 @@ func_exit:
|
|||
FTS */
|
||||
fts_optimize_remove_table(table);
|
||||
fts_free(table);
|
||||
} else {
|
||||
} else if (fts_optimize_wq) {
|
||||
fts_optimize_add_table(table);
|
||||
} else {
|
||||
/* fts_optimize_thread is not started yet.
|
||||
So make the table as non-evictable from cache. */
|
||||
dict_table_move_from_lru_to_non_lru(table);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ Completed 2011/7/10 Sunny and Jimmy Yang
|
|||
#include "ut0wqueue.h"
|
||||
#include "srv0start.h"
|
||||
#include "zlib.h"
|
||||
#include "fts0opt.h"
|
||||
|
||||
#ifndef UNIV_NONINL
|
||||
#include "fts0types.ic"
|
||||
|
@ -41,7 +42,7 @@ Completed 2011/7/10 Sunny and Jimmy Yang
|
|||
#endif
|
||||
|
||||
/** The FTS optimize thread's work queue. */
|
||||
static ib_wqueue_t* fts_optimize_wq;
|
||||
ib_wqueue_t* fts_optimize_wq;
|
||||
|
||||
/** The FTS vector to store fts_slot_t */
|
||||
static ib_vector_t* fts_slots;
|
||||
|
@ -169,8 +170,8 @@ struct fts_encode_t {
|
|||
/** We use this information to determine when to start the optimize
|
||||
cycle for a table. */
|
||||
struct fts_slot_t {
|
||||
/** table identifier, or 0 if the slot is empty */
|
||||
table_id_t table_id;
|
||||
/** table, or NULL if the slot is unused */
|
||||
dict_table_t* table;
|
||||
|
||||
/** whether this slot is being processed */
|
||||
bool running;
|
||||
|
@ -2456,14 +2457,7 @@ fts_optimize_table_bk(
|
|||
return(DB_SUCCESS);
|
||||
}
|
||||
|
||||
dict_table_t* table = dict_table_open_on_id(
|
||||
slot->table_id, FALSE, DICT_TABLE_OP_NORMAL);
|
||||
|
||||
if (!table) {
|
||||
slot->last_run = now;
|
||||
return DB_SUCCESS;
|
||||
}
|
||||
|
||||
dict_table_t* table = slot->table;
|
||||
dberr_t error;
|
||||
|
||||
if (fil_table_accessible(table)
|
||||
|
@ -2483,8 +2477,6 @@ fts_optimize_table_bk(
|
|||
error = DB_SUCCESS;
|
||||
}
|
||||
|
||||
dict_table_close(table, FALSE, FALSE);
|
||||
|
||||
return(error);
|
||||
}
|
||||
/*********************************************************************//**
|
||||
|
@ -2627,11 +2619,13 @@ UNIV_INTERN void fts_optimize_add_table(dict_table_t* table)
|
|||
|
||||
msg = fts_optimize_create_msg(FTS_MSG_ADD_TABLE, table);
|
||||
|
||||
ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
|
||||
mutex_enter(&fts_optimize_wq->mutex);
|
||||
|
||||
ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
|
||||
|
||||
mutex_enter(&table->fts->bg_threads_mutex);
|
||||
table->fts->in_queue = true;
|
||||
mutex_exit(&table->fts->bg_threads_mutex);
|
||||
|
||||
mutex_exit(&fts_optimize_wq->mutex);
|
||||
}
|
||||
|
||||
/**********************************************************************//**
|
||||
|
@ -2648,7 +2642,7 @@ fts_optimize_remove_table(
|
|||
fts_msg_del_t* remove;
|
||||
|
||||
/* if the optimize system not yet initialized, return */
|
||||
if (!fts_optimize_is_init()) {
|
||||
if (!fts_optimize_wq) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -2660,12 +2654,10 @@ fts_optimize_remove_table(
|
|||
return;
|
||||
}
|
||||
|
||||
fts_t* fts = table->fts;
|
||||
mutex_enter(&fts->bg_threads_mutex);
|
||||
bool is_in_optimize_queue = fts->in_queue;
|
||||
mutex_exit(&fts->bg_threads_mutex);
|
||||
mutex_enter(&fts_optimize_wq->mutex);
|
||||
|
||||
if (!is_in_optimize_queue) {
|
||||
if (!table->fts->in_queue) {
|
||||
mutex_exit(&fts_optimize_wq->mutex);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -2681,15 +2673,17 @@ fts_optimize_remove_table(
|
|||
remove->event = event;
|
||||
msg->ptr = remove;
|
||||
|
||||
ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
|
||||
ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
|
||||
|
||||
mutex_exit(&fts_optimize_wq->mutex);
|
||||
|
||||
os_event_wait(event);
|
||||
|
||||
os_event_free(event);
|
||||
|
||||
mutex_enter(&fts->bg_threads_mutex);
|
||||
fts->in_queue = false;
|
||||
mutex_exit(&fts->bg_threads_mutex);
|
||||
ut_d(mutex_enter(&fts_optimize_wq->mutex));
|
||||
ut_ad(!table->fts->in_queue);
|
||||
ut_d(mutex_exit(&fts_optimize_wq->mutex));
|
||||
}
|
||||
|
||||
/** Send sync fts cache for the table.
|
||||
|
@ -2700,10 +2694,9 @@ fts_optimize_request_sync_table(
|
|||
dict_table_t* table)
|
||||
{
|
||||
fts_msg_t* msg;
|
||||
table_id_t* table_id;
|
||||
|
||||
/* if the optimize system not yet initialized, return */
|
||||
if (!fts_optimize_is_init()) {
|
||||
if (!fts_optimize_wq) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -2715,39 +2708,36 @@ fts_optimize_request_sync_table(
|
|||
return;
|
||||
}
|
||||
|
||||
msg = fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, NULL);
|
||||
msg = fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, table);
|
||||
|
||||
table_id = static_cast<table_id_t*>(
|
||||
mem_heap_alloc(msg->heap, sizeof(table_id_t)));
|
||||
*table_id = table->id;
|
||||
msg->ptr = table_id;
|
||||
mutex_enter(&fts_optimize_wq->mutex);
|
||||
|
||||
ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
|
||||
ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
|
||||
|
||||
mutex_enter(&table->fts->bg_threads_mutex);
|
||||
table->fts->in_queue = true;
|
||||
mutex_exit(&table->fts->bg_threads_mutex);
|
||||
|
||||
mutex_exit(&fts_optimize_wq->mutex);
|
||||
}
|
||||
|
||||
/** Add a table to fts_slots if it doesn't already exist. */
|
||||
static bool fts_optimize_new_table(dict_table_t* table)
|
||||
{
|
||||
ut_ad(table);
|
||||
|
||||
ulint i;
|
||||
fts_slot_t* slot;
|
||||
fts_slot_t* empty = NULL;
|
||||
const table_id_t table_id = table->id;
|
||||
ut_ad(table_id);
|
||||
|
||||
/* Search for duplicates, also find a free slot if one exists. */
|
||||
for (i = 0; i < ib_vector_size(fts_slots); ++i) {
|
||||
|
||||
slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));
|
||||
|
||||
if (!slot->table_id) {
|
||||
if (!slot->table) {
|
||||
empty = slot;
|
||||
} else if (slot->table_id == table_id) {
|
||||
} else if (slot->table == table) {
|
||||
/* Already exists in our optimize queue. */
|
||||
return(FALSE);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2756,37 +2746,36 @@ static bool fts_optimize_new_table(dict_table_t* table)
|
|||
|
||||
memset(slot, 0x0, sizeof(*slot));
|
||||
|
||||
slot->table_id = table->id;
|
||||
slot->running = false;
|
||||
|
||||
return(TRUE);
|
||||
slot->table = table;
|
||||
return true;
|
||||
}
|
||||
|
||||
/** Remove a table from fts_slots if it exists.
|
||||
@param[in,out] table table to be removed from fts_slots */
|
||||
static bool fts_optimize_del_table(const dict_table_t* table)
|
||||
{
|
||||
const table_id_t table_id = table->id;
|
||||
ut_ad(table_id);
|
||||
|
||||
for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
|
||||
fts_slot_t* slot;
|
||||
|
||||
slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));
|
||||
|
||||
if (slot->table_id == table_id) {
|
||||
if (slot->table == table) {
|
||||
if (fts_enable_diag_print) {
|
||||
ib_logf(IB_LOG_LEVEL_INFO,
|
||||
"FTS Optimize Removing table %s",
|
||||
table->name);
|
||||
}
|
||||
|
||||
slot->table_id = 0;
|
||||
return(TRUE);
|
||||
mutex_enter(&fts_optimize_wq->mutex);
|
||||
slot->table->fts->in_queue = false;
|
||||
mutex_exit(&fts_optimize_wq->mutex);
|
||||
|
||||
slot->table = NULL;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return(FALSE);
|
||||
return false;
|
||||
}
|
||||
|
||||
/**********************************************************************//**
|
||||
|
@ -2800,7 +2789,7 @@ static ulint fts_optimize_how_many()
|
|||
for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
|
||||
const fts_slot_t* slot = static_cast<const fts_slot_t*>(
|
||||
ib_vector_get_const(fts_slots, i));
|
||||
if (slot->table_id == 0) {
|
||||
if (!slot->table) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -2836,22 +2825,14 @@ static bool fts_is_sync_needed()
|
|||
const fts_slot_t* slot = static_cast<const fts_slot_t*>(
|
||||
ib_vector_get_const(fts_slots, i));
|
||||
|
||||
if (slot->table_id == 0) {
|
||||
if (!slot->table) {
|
||||
continue;
|
||||
}
|
||||
|
||||
dict_table_t* table = dict_table_open_on_id(
|
||||
slot->table_id, FALSE, DICT_TABLE_OP_NORMAL);
|
||||
if (!table) {
|
||||
continue;
|
||||
if (slot->table->fts && slot->table->fts->cache) {
|
||||
total_memory += slot->table->fts->cache->total_size;
|
||||
}
|
||||
|
||||
if (table->fts && table->fts->cache) {
|
||||
total_memory += table->fts->cache->total_size;
|
||||
}
|
||||
|
||||
dict_table_close(table, FALSE, FALSE);
|
||||
|
||||
if (total_memory > fts_max_total_cache_size) {
|
||||
return(true);
|
||||
}
|
||||
|
@ -2861,22 +2842,16 @@ static bool fts_is_sync_needed()
|
|||
}
|
||||
|
||||
/** Sync fts cache of a table
|
||||
@param[in] table_id table id */
|
||||
static void fts_optimize_sync_table(table_id_t table_id)
|
||||
@param[in,out] table table to be synced */
|
||||
static void fts_optimize_sync_table(dict_table_t* table)
|
||||
{
|
||||
if (dict_table_t* table = dict_table_open_on_id(
|
||||
table_id, FALSE, DICT_TABLE_OP_NORMAL)) {
|
||||
if (fil_table_accessible(table)
|
||||
&& table->fts && table->fts->cache) {
|
||||
fts_sync_table(table, true, false, false);
|
||||
}
|
||||
|
||||
DBUG_EXECUTE_IF(
|
||||
"ib_optimize_wq_hang",
|
||||
os_thread_sleep(6000000););
|
||||
|
||||
dict_table_close(table, FALSE, FALSE);
|
||||
if (fil_table_accessible(table)
|
||||
&& table->fts && table->fts->cache) {
|
||||
fts_sync_table(table, true, false, false);
|
||||
}
|
||||
|
||||
DBUG_EXECUTE_IF("ib_optimize_wq_hang",
|
||||
os_thread_sleep(6000000););
|
||||
}
|
||||
|
||||
/**********************************************************************//**
|
||||
|
@ -2918,7 +2893,7 @@ fts_optimize_thread(
|
|||
ib_vector_get(fts_slots, current));
|
||||
|
||||
/* Handle the case of empty slots. */
|
||||
if (slot->table_id) {
|
||||
if (slot->table) {
|
||||
slot->running = true;
|
||||
fts_optimize_table_bk(slot);
|
||||
}
|
||||
|
@ -2978,7 +2953,7 @@ fts_optimize_thread(
|
|||
os_thread_sleep(300000););
|
||||
|
||||
fts_optimize_sync_table(
|
||||
*static_cast<table_id_t*>(msg->ptr));
|
||||
static_cast<dict_table_t*>(msg->ptr));
|
||||
break;
|
||||
|
||||
default:
|
||||
|
@ -2997,8 +2972,8 @@ fts_optimize_thread(
|
|||
fts_slot_t* slot = static_cast<fts_slot_t*>(
|
||||
ib_vector_get(fts_slots, i));
|
||||
|
||||
if (table_id_t table_id = slot->table_id) {
|
||||
fts_optimize_sync_table(table_id);
|
||||
if (slot->table) {
|
||||
fts_optimize_sync_table(slot->table);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3028,24 +3003,35 @@ fts_optimize_init(void)
|
|||
ut_ad(!srv_read_only_mode);
|
||||
|
||||
/* For now we only support one optimize thread. */
|
||||
ut_a(!fts_optimize_is_init());
|
||||
ut_a(!fts_optimize_wq);
|
||||
|
||||
fts_optimize_wq = ib_wqueue_create();
|
||||
ut_a(fts_optimize_wq != NULL);
|
||||
last_check_sync_time = time(NULL);
|
||||
|
||||
os_thread_create(fts_optimize_thread, fts_optimize_wq, NULL);
|
||||
}
|
||||
/* Add fts tables to fts slots which could be skipped
|
||||
during dict_load_table() because fts_optimize_thread
|
||||
wasn't even started. */
|
||||
mutex_enter(&dict_sys->mutex);
|
||||
|
||||
/**********************************************************************//**
|
||||
Check whether the work queue is initialized.
|
||||
@return TRUE if optimze queue is initialized. */
|
||||
UNIV_INTERN
|
||||
ibool
|
||||
fts_optimize_is_init(void)
|
||||
/*======================*/
|
||||
{
|
||||
return(fts_optimize_wq != NULL);
|
||||
for (dict_table_t* table = UT_LIST_GET_FIRST(dict_sys->table_LRU);
|
||||
table != NULL;
|
||||
table = UT_LIST_GET_NEXT(table_LRU, table)) {
|
||||
|
||||
if (!table->fts || !dict_table_has_fts_index(table)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
/* fts_optimize_thread is not started yet. So there is no
|
||||
need to acqquire fts_optimize_wq->mutex for adding the fts
|
||||
table to the fts slots. */
|
||||
ut_ad(!table->can_be_evicted);
|
||||
fts_optimize_new_table(table);
|
||||
table->fts->in_queue = true;
|
||||
}
|
||||
|
||||
mutex_exit(&dict_sys->mutex);
|
||||
os_thread_create(fts_optimize_thread, fts_optimize_wq, NULL);
|
||||
}
|
||||
|
||||
/**********************************************************************//**
|
||||
|
|
|
@ -285,9 +285,6 @@ struct fts_t {
|
|||
fts_add_wq. */
|
||||
ib_mutex_t bg_threads_mutex;
|
||||
|
||||
/* Wheter the table was added to fts_optimize_wq();
|
||||
protected by bg_threads mutex */
|
||||
unsigned in_queue:1;
|
||||
/* Whether the ADDED table record sync-ed after
|
||||
crash recovery; protected by bg_threads mutex */
|
||||
unsigned added_synced:1;
|
||||
|
@ -310,6 +307,11 @@ struct fts_t {
|
|||
|
||||
ib_vector_t* indexes; /*!< Vector of FTS indexes, this is
|
||||
mainly for caching purposes. */
|
||||
|
||||
/* Whether the table was added to fts_optimize_wq();
|
||||
protected by fts_optimize_wq mutex */
|
||||
bool in_queue;
|
||||
|
||||
mem_heap_t* fts_heap; /*!< heap for fts_t allocation */
|
||||
};
|
||||
|
||||
|
@ -631,14 +633,6 @@ void
|
|||
fts_optimize_init(void);
|
||||
/*====================*/
|
||||
|
||||
/**********************************************************************//**
|
||||
Check whether the work queue is initialized.
|
||||
@return TRUE if optimze queue is initialized. */
|
||||
UNIV_INTERN
|
||||
ibool
|
||||
fts_optimize_is_init(void);
|
||||
/*======================*/
|
||||
|
||||
/****************************************************************//**
|
||||
Drops index ancillary tables for a FTS index
|
||||
@return DB_SUCCESS or error code */
|
||||
|
|
|
@ -25,6 +25,9 @@ Created 2011-02-15 Jimmy Yang
|
|||
#ifndef INNODB_FTS0OPT_H
|
||||
#define INNODB_FTS0OPT_H
|
||||
|
||||
/** The FTS optimize thread's work queue. */
|
||||
extern ib_wqueue_t* fts_optimize_wq;
|
||||
|
||||
/********************************************************************
|
||||
Callback function to fetch the rows in an FTS INDEX record. */
|
||||
UNIV_INTERN
|
||||
|
|
|
@ -56,16 +56,15 @@ ib_wqueue_free(
|
|||
/*===========*/
|
||||
ib_wqueue_t* wq); /*!< in: work queue */
|
||||
|
||||
/****************************************************************//**
|
||||
Add a work item to the queue. */
|
||||
/** Add a work item to the queue.
|
||||
@param[in,out] wq work queue
|
||||
@param[in] item work item
|
||||
@param[in,out] heap memory heap to use for allocating list node
|
||||
@param[in] wq_locked work queue mutex locked */
|
||||
UNIV_INTERN
|
||||
void
|
||||
ib_wqueue_add(
|
||||
/*==========*/
|
||||
ib_wqueue_t* wq, /*!< in: work queue */
|
||||
void* item, /*!< in: work item */
|
||||
mem_heap_t* heap); /*!< in: memory heap to use for allocating the
|
||||
list node */
|
||||
ib_wqueue_add(ib_wqueue_t* wq, void* item, mem_heap_t* heap,
|
||||
bool wq_locked = false);
|
||||
|
||||
/** Check if queue is empty.
|
||||
@param wq wait queue
|
||||
|
@ -107,14 +106,16 @@ ib_wqueue_len(
|
|||
/*==========*/
|
||||
ib_wqueue_t* wq); /*<! in: work queue */
|
||||
|
||||
|
||||
/* Work queue. */
|
||||
struct ib_wqueue_t {
|
||||
ib_mutex_t mutex; /*!< mutex protecting everything */
|
||||
ib_list_t* items; /*!< work item list */
|
||||
os_event_t event; /*!< event we use to signal additions to list;
|
||||
os_event_set() and os_event_reset() are
|
||||
protected by ib_wqueue_t::mutex */
|
||||
/** Work queue */
|
||||
struct ib_wqueue_t
|
||||
{
|
||||
/** Mutex protecting everything */
|
||||
ib_mutex_t mutex;
|
||||
/** Work item list */
|
||||
ib_list_t* items;
|
||||
/** event we use to signal additions to list;
|
||||
os_event_set() and os_event_reset() are protected by the mutex */
|
||||
os_event_t event;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -61,23 +61,25 @@ ib_wqueue_free(
|
|||
mem_free(wq);
|
||||
}
|
||||
|
||||
/****************************************************************//**
|
||||
Add a work item to the queue. */
|
||||
/** Add a work item to the queue.
|
||||
@param[in,out] wq work queue
|
||||
@param[in] item work item
|
||||
@param[in,out] heap memory heap to use for allocating list node
|
||||
@param[in] wq_locked work queue mutex locked */
|
||||
UNIV_INTERN
|
||||
void
|
||||
ib_wqueue_add(
|
||||
/*==========*/
|
||||
ib_wqueue_t* wq, /*!< in: work queue */
|
||||
void* item, /*!< in: work item */
|
||||
mem_heap_t* heap) /*!< in: memory heap to use for allocating the
|
||||
list node */
|
||||
ib_wqueue_add(ib_wqueue_t* wq, void* item, mem_heap_t* heap, bool wq_locked)
|
||||
{
|
||||
mutex_enter(&wq->mutex);
|
||||
if (!wq_locked) {
|
||||
mutex_enter(&wq->mutex);
|
||||
}
|
||||
|
||||
ib_list_add_last(wq->items, item, heap);
|
||||
os_event_set(wq->event);
|
||||
|
||||
mutex_exit(&wq->mutex);
|
||||
if (!wq_locked) {
|
||||
mutex_exit(&wq->mutex);
|
||||
}
|
||||
}
|
||||
|
||||
/****************************************************************//**
|
||||
|
|
|
@ -46,6 +46,7 @@ Created 4/24/1996 Heikki Tuuri
|
|||
#include "dict0priv.h"
|
||||
#include "ha_prototypes.h" /* innobase_casedn_str() */
|
||||
#include "fts0priv.h"
|
||||
#include "fts0opt.h"
|
||||
|
||||
/** Following are the InnoDB system tables. The positions in
|
||||
this array are referenced by enum dict_system_table_id. */
|
||||
|
@ -2570,8 +2571,12 @@ func_exit:
|
|||
FTS */
|
||||
fts_optimize_remove_table(table);
|
||||
fts_free(table);
|
||||
} else {
|
||||
} else if (fts_optimize_wq) {
|
||||
fts_optimize_add_table(table);
|
||||
} else {
|
||||
/* fts_optimize_thread is not started yet.
|
||||
So make the table as non-evictable from cache. */
|
||||
dict_table_move_from_lru_to_non_lru(table);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ Completed 2011/7/10 Sunny and Jimmy Yang
|
|||
#include "ut0wqueue.h"
|
||||
#include "srv0start.h"
|
||||
#include "zlib.h"
|
||||
#include "fts0opt.h"
|
||||
|
||||
#ifndef UNIV_NONINL
|
||||
#include "fts0types.ic"
|
||||
|
@ -41,7 +42,7 @@ Completed 2011/7/10 Sunny and Jimmy Yang
|
|||
#endif
|
||||
|
||||
/** The FTS optimize thread's work queue. */
|
||||
static ib_wqueue_t* fts_optimize_wq;
|
||||
ib_wqueue_t* fts_optimize_wq;
|
||||
|
||||
/** The FTS vector to store fts_slot_t */
|
||||
static ib_vector_t* fts_slots;
|
||||
|
@ -169,8 +170,8 @@ struct fts_encode_t {
|
|||
/** We use this information to determine when to start the optimize
|
||||
cycle for a table. */
|
||||
struct fts_slot_t {
|
||||
/** table identifier, or 0 if the slot is empty */
|
||||
table_id_t table_id;
|
||||
/** table, or NULL if the slot is unused */
|
||||
dict_table_t* table;
|
||||
|
||||
/** whether this slot is being processed */
|
||||
bool running;
|
||||
|
@ -2456,14 +2457,7 @@ fts_optimize_table_bk(
|
|||
return(DB_SUCCESS);
|
||||
}
|
||||
|
||||
dict_table_t* table = dict_table_open_on_id(
|
||||
slot->table_id, FALSE, DICT_TABLE_OP_NORMAL);
|
||||
|
||||
if (!table) {
|
||||
slot->last_run = now;
|
||||
return DB_SUCCESS;
|
||||
}
|
||||
|
||||
dict_table_t* table = slot->table;
|
||||
dberr_t error;
|
||||
|
||||
if (fil_table_accessible(table)
|
||||
|
@ -2483,8 +2477,6 @@ fts_optimize_table_bk(
|
|||
error = DB_SUCCESS;
|
||||
}
|
||||
|
||||
dict_table_close(table, FALSE, FALSE);
|
||||
|
||||
return(error);
|
||||
}
|
||||
/*********************************************************************//**
|
||||
|
@ -2627,11 +2619,13 @@ UNIV_INTERN void fts_optimize_add_table(dict_table_t* table)
|
|||
|
||||
msg = fts_optimize_create_msg(FTS_MSG_ADD_TABLE, table);
|
||||
|
||||
ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
|
||||
mutex_enter(&fts_optimize_wq->mutex);
|
||||
|
||||
ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
|
||||
|
||||
mutex_enter(&table->fts->bg_threads_mutex);
|
||||
table->fts->in_queue = true;
|
||||
mutex_exit(&table->fts->bg_threads_mutex);
|
||||
|
||||
mutex_exit(&fts_optimize_wq->mutex);
|
||||
}
|
||||
|
||||
/**********************************************************************//**
|
||||
|
@ -2648,7 +2642,7 @@ fts_optimize_remove_table(
|
|||
fts_msg_del_t* remove;
|
||||
|
||||
/* if the optimize system not yet initialized, return */
|
||||
if (!fts_optimize_is_init()) {
|
||||
if (!fts_optimize_wq) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -2660,12 +2654,10 @@ fts_optimize_remove_table(
|
|||
return;
|
||||
}
|
||||
|
||||
fts_t* fts = table->fts;
|
||||
mutex_enter(&fts->bg_threads_mutex);
|
||||
bool is_in_optimize_queue = fts->in_queue;
|
||||
mutex_exit(&fts->bg_threads_mutex);
|
||||
mutex_enter(&fts_optimize_wq->mutex);
|
||||
|
||||
if (!is_in_optimize_queue) {
|
||||
if (!table->fts->in_queue) {
|
||||
mutex_exit(&fts_optimize_wq->mutex);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -2681,15 +2673,17 @@ fts_optimize_remove_table(
|
|||
remove->event = event;
|
||||
msg->ptr = remove;
|
||||
|
||||
ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
|
||||
ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
|
||||
|
||||
mutex_exit(&fts_optimize_wq->mutex);
|
||||
|
||||
os_event_wait(event);
|
||||
|
||||
os_event_free(event);
|
||||
|
||||
mutex_enter(&fts->bg_threads_mutex);
|
||||
fts->in_queue = false;
|
||||
mutex_exit(&fts->bg_threads_mutex);
|
||||
ut_d(mutex_enter(&fts_optimize_wq->mutex));
|
||||
ut_ad(!table->fts->in_queue);
|
||||
ut_d(mutex_exit(&fts_optimize_wq->mutex));
|
||||
}
|
||||
|
||||
/** Send sync fts cache for the table.
|
||||
|
@ -2700,10 +2694,9 @@ fts_optimize_request_sync_table(
|
|||
dict_table_t* table)
|
||||
{
|
||||
fts_msg_t* msg;
|
||||
table_id_t* table_id;
|
||||
|
||||
/* if the optimize system not yet initialized, return */
|
||||
if (!fts_optimize_is_init()) {
|
||||
if (!fts_optimize_wq) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -2715,39 +2708,36 @@ fts_optimize_request_sync_table(
|
|||
return;
|
||||
}
|
||||
|
||||
msg = fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, NULL);
|
||||
msg = fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, table);
|
||||
|
||||
table_id = static_cast<table_id_t*>(
|
||||
mem_heap_alloc(msg->heap, sizeof(table_id_t)));
|
||||
*table_id = table->id;
|
||||
msg->ptr = table_id;
|
||||
mutex_enter(&fts_optimize_wq->mutex);
|
||||
|
||||
ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
|
||||
ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
|
||||
|
||||
mutex_enter(&table->fts->bg_threads_mutex);
|
||||
table->fts->in_queue = true;
|
||||
mutex_exit(&table->fts->bg_threads_mutex);
|
||||
|
||||
mutex_exit(&fts_optimize_wq->mutex);
|
||||
}
|
||||
|
||||
/** Add a table to fts_slots if it doesn't already exist. */
|
||||
static bool fts_optimize_new_table(dict_table_t* table)
|
||||
{
|
||||
ut_ad(table);
|
||||
|
||||
ulint i;
|
||||
fts_slot_t* slot;
|
||||
fts_slot_t* empty = NULL;
|
||||
const table_id_t table_id = table->id;
|
||||
ut_ad(table_id);
|
||||
|
||||
/* Search for duplicates, also find a free slot if one exists. */
|
||||
for (i = 0; i < ib_vector_size(fts_slots); ++i) {
|
||||
|
||||
slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));
|
||||
|
||||
if (!slot->table_id) {
|
||||
if (!slot->table) {
|
||||
empty = slot;
|
||||
} else if (slot->table_id == table_id) {
|
||||
} else if (slot->table == table) {
|
||||
/* Already exists in our optimize queue. */
|
||||
return(FALSE);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2756,37 +2746,36 @@ static bool fts_optimize_new_table(dict_table_t* table)
|
|||
|
||||
memset(slot, 0x0, sizeof(*slot));
|
||||
|
||||
slot->table_id = table->id;
|
||||
slot->running = false;
|
||||
|
||||
return(TRUE);
|
||||
slot->table = table;
|
||||
return true;
|
||||
}
|
||||
|
||||
/** Remove a table from fts_slots if it exists.
|
||||
@param[in,out] table table to be removed from fts_slots */
|
||||
static bool fts_optimize_del_table(const dict_table_t* table)
|
||||
{
|
||||
const table_id_t table_id = table->id;
|
||||
ut_ad(table_id);
|
||||
|
||||
for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
|
||||
fts_slot_t* slot;
|
||||
|
||||
slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));
|
||||
|
||||
if (slot->table_id == table_id) {
|
||||
if (slot->table == table) {
|
||||
if (fts_enable_diag_print) {
|
||||
ib_logf(IB_LOG_LEVEL_INFO,
|
||||
"FTS Optimize Removing table %s",
|
||||
table->name);
|
||||
}
|
||||
|
||||
slot->table_id = 0;
|
||||
return(TRUE);
|
||||
mutex_enter(&fts_optimize_wq->mutex);
|
||||
slot->table->fts->in_queue = false;
|
||||
mutex_exit(&fts_optimize_wq->mutex);
|
||||
|
||||
slot->table = NULL;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return(FALSE);
|
||||
return false;
|
||||
}
|
||||
|
||||
/**********************************************************************//**
|
||||
|
@ -2800,7 +2789,7 @@ static ulint fts_optimize_how_many()
|
|||
for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
|
||||
const fts_slot_t* slot = static_cast<const fts_slot_t*>(
|
||||
ib_vector_get_const(fts_slots, i));
|
||||
if (slot->table_id == 0) {
|
||||
if (!slot->table) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -2836,22 +2825,14 @@ static bool fts_is_sync_needed()
|
|||
const fts_slot_t* slot = static_cast<const fts_slot_t*>(
|
||||
ib_vector_get_const(fts_slots, i));
|
||||
|
||||
if (slot->table_id == 0) {
|
||||
if (!slot->table) {
|
||||
continue;
|
||||
}
|
||||
|
||||
dict_table_t* table = dict_table_open_on_id(
|
||||
slot->table_id, FALSE, DICT_TABLE_OP_NORMAL);
|
||||
if (!table) {
|
||||
continue;
|
||||
if (slot->table->fts && slot->table->fts->cache) {
|
||||
total_memory += slot->table->fts->cache->total_size;
|
||||
}
|
||||
|
||||
if (table->fts && table->fts->cache) {
|
||||
total_memory += table->fts->cache->total_size;
|
||||
}
|
||||
|
||||
dict_table_close(table, FALSE, FALSE);
|
||||
|
||||
if (total_memory > fts_max_total_cache_size) {
|
||||
return(true);
|
||||
}
|
||||
|
@ -2861,22 +2842,16 @@ static bool fts_is_sync_needed()
|
|||
}
|
||||
|
||||
/** Sync fts cache of a table
|
||||
@param[in] table_id table id */
|
||||
static void fts_optimize_sync_table(table_id_t table_id)
|
||||
@param[in,out] table table to be synced */
|
||||
static void fts_optimize_sync_table(dict_table_t* table)
|
||||
{
|
||||
if (dict_table_t* table = dict_table_open_on_id(
|
||||
table_id, FALSE, DICT_TABLE_OP_NORMAL)) {
|
||||
if (fil_table_accessible(table)
|
||||
&& table->fts && table->fts->cache) {
|
||||
fts_sync_table(table, true, false, false);
|
||||
}
|
||||
|
||||
DBUG_EXECUTE_IF(
|
||||
"ib_optimize_wq_hang",
|
||||
os_thread_sleep(6000000););
|
||||
|
||||
dict_table_close(table, FALSE, FALSE);
|
||||
if (fil_table_accessible(table)
|
||||
&& table->fts && table->fts->cache) {
|
||||
fts_sync_table(table, true, false, false);
|
||||
}
|
||||
|
||||
DBUG_EXECUTE_IF("ib_optimize_wq_hang",
|
||||
os_thread_sleep(6000000););
|
||||
}
|
||||
|
||||
/**********************************************************************//**
|
||||
|
@ -2918,7 +2893,7 @@ fts_optimize_thread(
|
|||
ib_vector_get(fts_slots, current));
|
||||
|
||||
/* Handle the case of empty slots. */
|
||||
if (slot->table_id) {
|
||||
if (slot->table) {
|
||||
slot->running = true;
|
||||
fts_optimize_table_bk(slot);
|
||||
}
|
||||
|
@ -2978,7 +2953,7 @@ fts_optimize_thread(
|
|||
os_thread_sleep(300000););
|
||||
|
||||
fts_optimize_sync_table(
|
||||
*static_cast<table_id_t*>(msg->ptr));
|
||||
static_cast<dict_table_t*>(msg->ptr));
|
||||
break;
|
||||
|
||||
default:
|
||||
|
@ -2997,8 +2972,8 @@ fts_optimize_thread(
|
|||
fts_slot_t* slot = static_cast<fts_slot_t*>(
|
||||
ib_vector_get(fts_slots, i));
|
||||
|
||||
if (table_id_t table_id = slot->table_id) {
|
||||
fts_optimize_sync_table(table_id);
|
||||
if (slot->table) {
|
||||
fts_optimize_sync_table(slot->table);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3028,24 +3003,35 @@ fts_optimize_init(void)
|
|||
ut_ad(!srv_read_only_mode);
|
||||
|
||||
/* For now we only support one optimize thread. */
|
||||
ut_a(!fts_optimize_is_init());
|
||||
ut_a(!fts_optimize_wq);
|
||||
|
||||
fts_optimize_wq = ib_wqueue_create();
|
||||
ut_a(fts_optimize_wq != NULL);
|
||||
last_check_sync_time = time(NULL);
|
||||
|
||||
os_thread_create(fts_optimize_thread, fts_optimize_wq, NULL);
|
||||
}
|
||||
/* Add fts tables to fts slots which could be skipped
|
||||
during dict_load_table() because fts_optimize_thread
|
||||
wasn't even started. */
|
||||
mutex_enter(&dict_sys->mutex);
|
||||
|
||||
/**********************************************************************//**
|
||||
Check whether the work queue is initialized.
|
||||
@return TRUE if optimze queue is initialized. */
|
||||
UNIV_INTERN
|
||||
ibool
|
||||
fts_optimize_is_init(void)
|
||||
/*======================*/
|
||||
{
|
||||
return(fts_optimize_wq != NULL);
|
||||
for (dict_table_t* table = UT_LIST_GET_FIRST(dict_sys->table_LRU);
|
||||
table != NULL;
|
||||
table = UT_LIST_GET_NEXT(table_LRU, table)) {
|
||||
|
||||
if (!table->fts || !dict_table_has_fts_index(table)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
/* fts_optimize_thread is not started yet. So there is no
|
||||
need to acqquire fts_optimize_wq->mutex for adding the fts
|
||||
table to the fts slots. */
|
||||
ut_ad(!table->can_be_evicted);
|
||||
fts_optimize_new_table(table);
|
||||
table->fts->in_queue = true;
|
||||
}
|
||||
|
||||
mutex_exit(&dict_sys->mutex);
|
||||
os_thread_create(fts_optimize_thread, fts_optimize_wq, NULL);
|
||||
}
|
||||
|
||||
/**********************************************************************//**
|
||||
|
|
|
@ -285,9 +285,6 @@ struct fts_t {
|
|||
fts_add_wq. */
|
||||
ib_mutex_t bg_threads_mutex;
|
||||
|
||||
/* Whether the table was added to fts_optimize_wq();
|
||||
protected by bg_threads mutex */
|
||||
unsigned in_queue:1;
|
||||
/* Whether the ADDED table record sync-ed afer
|
||||
crash recovery; protected by bg_threads mutex */
|
||||
unsigned added_synced:1;
|
||||
|
@ -310,6 +307,11 @@ struct fts_t {
|
|||
|
||||
ib_vector_t* indexes; /*!< Vector of FTS indexes, this is
|
||||
mainly for caching purposes. */
|
||||
|
||||
/* Whether the table was added to fts_optimize_wq();
|
||||
protected by fts_optimize_wq mutex */
|
||||
bool in_queue;
|
||||
|
||||
mem_heap_t* fts_heap; /*!< heap for fts_t allocation */
|
||||
};
|
||||
|
||||
|
@ -631,14 +633,6 @@ void
|
|||
fts_optimize_init(void);
|
||||
/*====================*/
|
||||
|
||||
/**********************************************************************//**
|
||||
Check whether the work queue is initialized.
|
||||
@return TRUE if optimze queue is initialized. */
|
||||
UNIV_INTERN
|
||||
ibool
|
||||
fts_optimize_is_init(void);
|
||||
/*======================*/
|
||||
|
||||
/****************************************************************//**
|
||||
Drops index ancillary tables for a FTS index
|
||||
@return DB_SUCCESS or error code */
|
||||
|
|
|
@ -25,6 +25,9 @@ Created 2011-02-15 Jimmy Yang
|
|||
#ifndef INNODB_FTS0OPT_H
|
||||
#define INNODB_FTS0OPT_H
|
||||
|
||||
/** The FTS optimize thread's work queue. */
|
||||
extern ib_wqueue_t* fts_optimize_wq;
|
||||
|
||||
/********************************************************************
|
||||
Callback function to fetch the rows in an FTS INDEX record. */
|
||||
UNIV_INTERN
|
||||
|
|
|
@ -56,16 +56,15 @@ ib_wqueue_free(
|
|||
/*===========*/
|
||||
ib_wqueue_t* wq); /*!< in: work queue */
|
||||
|
||||
/****************************************************************//**
|
||||
Add a work item to the queue. */
|
||||
/** Add a work item to the queue.
|
||||
@param[in,out] wq work queue
|
||||
@param[in] item work item
|
||||
@param[in,out] heap memory heap to use for allocating list node
|
||||
@param[in] wq_locked work queue mutex locked */
|
||||
UNIV_INTERN
|
||||
void
|
||||
ib_wqueue_add(
|
||||
/*==========*/
|
||||
ib_wqueue_t* wq, /*!< in: work queue */
|
||||
void* item, /*!< in: work item */
|
||||
mem_heap_t* heap); /*!< in: memory heap to use for allocating the
|
||||
list node */
|
||||
ib_wqueue_add(ib_wqueue_t* wq, void* item, mem_heap_t* heap,
|
||||
bool wq_locked = false);
|
||||
|
||||
/** Check if queue is empty.
|
||||
@param wq wait queue
|
||||
|
@ -99,7 +98,6 @@ ib_wqueue_nowait(
|
|||
/*=============*/
|
||||
ib_wqueue_t* wq); /*<! in: work queue */
|
||||
|
||||
|
||||
/********************************************************************
|
||||
Get number of items on queue.
|
||||
@return number of items on queue */
|
||||
|
@ -108,13 +106,16 @@ ib_wqueue_len(
|
|||
/*==========*/
|
||||
ib_wqueue_t* wq); /*<! in: work queue */
|
||||
|
||||
/* Work queue. */
|
||||
struct ib_wqueue_t {
|
||||
ib_mutex_t mutex; /*!< mutex protecting everything */
|
||||
ib_list_t* items; /*!< work item list */
|
||||
os_event_t event; /*!< event we use to signal additions to list;
|
||||
os_event_set() and os_event_reset() are
|
||||
protected by ib_wqueue_t::mutex */
|
||||
/** Work queue */
|
||||
struct ib_wqueue_t
|
||||
{
|
||||
/** Mutex protecting everything */
|
||||
ib_mutex_t mutex;
|
||||
/** Work item list */
|
||||
ib_list_t* items;
|
||||
/** event we use to signal additions to list;
|
||||
os_event_set() and os_event_reset() are protected by the mutex */
|
||||
os_event_t event;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -61,23 +61,25 @@ ib_wqueue_free(
|
|||
mem_free(wq);
|
||||
}
|
||||
|
||||
/****************************************************************//**
|
||||
Add a work item to the queue. */
|
||||
/** Add a work item to the queue.
|
||||
@param[in,out] wq work queue
|
||||
@param[in] item work item
|
||||
@param[in,out] heap memory heap to use for allocating list node
|
||||
@param[in] wq_locked work queue mutex locked */
|
||||
UNIV_INTERN
|
||||
void
|
||||
ib_wqueue_add(
|
||||
/*==========*/
|
||||
ib_wqueue_t* wq, /*!< in: work queue */
|
||||
void* item, /*!< in: work item */
|
||||
mem_heap_t* heap) /*!< in: memory heap to use for allocating the
|
||||
list node */
|
||||
ib_wqueue_add(ib_wqueue_t* wq, void* item, mem_heap_t* heap, bool wq_locked)
|
||||
{
|
||||
mutex_enter(&wq->mutex);
|
||||
if (!wq_locked) {
|
||||
mutex_enter(&wq->mutex);
|
||||
}
|
||||
|
||||
ib_list_add_last(wq->items, item, heap);
|
||||
os_event_set(wq->event);
|
||||
|
||||
mutex_exit(&wq->mutex);
|
||||
if (!wq_locked) {
|
||||
mutex_exit(&wq->mutex);
|
||||
}
|
||||
}
|
||||
|
||||
/****************************************************************//**
|
||||
|
|
Loading…
Add table
Reference in a new issue