MDEV-16264 Use threadpool for Innodb background work.

Almost all threads have gone
- the "ticking" threads, that sleep a while then do some work)
(srv_monitor_thread, srv_error_monitor_thread, srv_master_thread)
were replaced with timers. Some timers are periodic,
e.g the "master" timer.

- The btr_defragment_thread is also replaced by a timer , which
reschedules it self when current defragment "item" needs throttling

- the buf_resize_thread and buf_dump_threads are substitutes with tasks
Ditto with page cleaner workers.

- purge workers threads are not tasks as well, and purge cleaner
coordinator is a combination of a task and timer.

- All AIO is outsourced to tpool, Innodb just calls thread_pool::submit_io()
and provides the callback.

- The srv_slot_t was removed, and innodb_debug_sync used in purge
is currently not working, and needs reimplementation.
This commit is contained in:
Vladislav Vaintroub 2019-10-29 22:37:12 +01:00
parent 00ee8d85c9
commit 5e62b6a5e0
47 changed files with 1152 additions and 5854 deletions

View file

@ -102,6 +102,7 @@ Street, Fifth Floor, Boston, MA 02110-1335 USA
#include <srv0srv.h>
#include <crc_glue.h>
#include <log.h>
#include <thr_timer.h>
int sys_var_init();
@ -4052,7 +4053,7 @@ fail:
especially in 64-bit
computers */
}
srv_thread_pool_init();
sync_check_init();
ut_d(sync_check_enable());
/* Reset the system variables in the recovery module. */
@ -6133,9 +6134,12 @@ int main(int argc, char **argv)
DBUG_SET(dbug_option);
}
#endif
/* Main functions for library */
init_thr_timer(5);
int status = main_low(server_defaults);
end_thr_timer();
backup_cleanup();
if (innobackupex_mode) {

View file

@ -233,48 +233,3 @@ set global debug_dbug= @saved_dbug;
drop table t1;
set debug_sync=reset;
SET GLOBAL innodb_purge_rseg_truncate_frequency = @saved_frequency;
#
# MDEV-18546 ASAN heap-use-after-free
# in innobase_get_computed_value / row_purge
#
CREATE TABLE t1 (
pk INT AUTO_INCREMENT,
b BIT(15),
v BIT(15) AS (b) VIRTUAL,
PRIMARY KEY(pk),
UNIQUE(v)
) ENGINE=InnoDB;
INSERT IGNORE INTO t1 (b) VALUES
(NULL),(b'011'),(b'000110100'),
(b'01101101010'),(b'01111001001011'),(NULL);
SET GLOBAL innodb_debug_sync = "ib_clust_v_col_before_row_allocated "
"SIGNAL before_row_allocated "
"WAIT_FOR flush_unlock";
SET GLOBAL innodb_debug_sync = "ib_open_after_dict_open "
"SIGNAL purge_open "
"WAIT_FOR select_open";
set @saved_dbug= @@global.debug_dbug;
set global debug_dbug= "+d,ib_purge_virtual_index_callback";
connect purge_waiter,localhost,root;
SET debug_sync= "now WAIT_FOR before_row_allocated";
connection default;
REPLACE INTO t1 (pk, b) SELECT pk, b FROM t1;
connection purge_waiter;
connection default;
disconnect purge_waiter;
FLUSH TABLES;
SET GLOBAL innodb_debug_sync = reset;
SET debug_sync= "now SIGNAL flush_unlock WAIT_FOR purge_open";
SET GLOBAL innodb_debug_sync = reset;
SET debug_sync= "ib_open_after_dict_open SIGNAL select_open";
SELECT * FROM t1;
pk b v
1 NULL NULL
2  
3 4 4
4 j j
5 K K
6 NULL NULL
DROP TABLE t1;
SET debug_sync= reset;
set global debug_dbug= @saved_dbug;

View file

@ -310,67 +310,3 @@ drop table t1;
--source include/wait_until_count_sessions.inc
set debug_sync=reset;
SET GLOBAL innodb_purge_rseg_truncate_frequency = @saved_frequency;
--echo #
--echo # MDEV-18546 ASAN heap-use-after-free
--echo # in innobase_get_computed_value / row_purge
--echo #
CREATE TABLE t1 (
pk INT AUTO_INCREMENT,
b BIT(15),
v BIT(15) AS (b) VIRTUAL,
PRIMARY KEY(pk),
UNIQUE(v)
) ENGINE=InnoDB;
INSERT IGNORE INTO t1 (b) VALUES
(NULL),(b'011'),(b'000110100'),
(b'01101101010'),(b'01111001001011'),(NULL);
SET GLOBAL innodb_debug_sync = "ib_clust_v_col_before_row_allocated "
"SIGNAL before_row_allocated "
"WAIT_FOR flush_unlock";
SET GLOBAL innodb_debug_sync = "ib_open_after_dict_open "
"SIGNAL purge_open "
"WAIT_FOR select_open";
# In 10.2 trx_undo_roll_ptr_is_insert(t_roll_ptr) condition never pass in purge,
# so this condition is forced to pass in row_vers_old_has_index_entry
set @saved_dbug= @@global.debug_dbug;
set global debug_dbug= "+d,ib_purge_virtual_index_callback";
# The purge starts from REPLACE command. To avoid possible race, separate
# connection is used.
--connect(purge_waiter,localhost,root)
--send
SET debug_sync= "now WAIT_FOR before_row_allocated";
--connection default
REPLACE INTO t1 (pk, b) SELECT pk, b FROM t1;
--connection purge_waiter
# Now we will definitely catch ib_clust_v_col_before_row_allocated
--reap
--connection default
--disconnect purge_waiter
# purge hangs on the sync point. table is purged, ref_count is set to 0
FLUSH TABLES;
# Avoid hang on repeating purge.
# Reset Will be applied after first record is purged
SET GLOBAL innodb_debug_sync = reset;
SET debug_sync= "now SIGNAL flush_unlock WAIT_FOR purge_open";
# Avoid hang on repeating purge
SET GLOBAL innodb_debug_sync = reset;
# select unblocks purge thread
SET debug_sync= "ib_open_after_dict_open SIGNAL select_open";
SELECT * FROM t1;
# Cleanup
DROP TABLE t1;
SET debug_sync= reset;
set global debug_dbug= @saved_dbug;

View file

@ -6,11 +6,6 @@ select user,state from information_schema.processlist order by 2;
user state
root
root Filling schema table
system user InnoDB purge coordinator
system user InnoDB purge worker
system user InnoDB purge worker
system user InnoDB purge worker
system user InnoDB shutdown handler
set global debug_dbug='+d,only_kill_system_threads';
set global innodb_fast_shutdown=0;
shutdown;
@ -19,11 +14,6 @@ disconnect con1;
select user,state from information_schema.processlist order by 2;
user state
root Filling schema table
system user InnoDB purge coordinator
system user InnoDB purge worker
system user InnoDB purge worker
system user InnoDB purge worker
system user InnoDB slow shutdown wait
set global innodb_fast_shutdown=1;
select user,state from information_schema.processlist order by 2;
user state

View file

@ -5,14 +5,5 @@ FROM performance_schema.threads
WHERE name LIKE 'thread/innodb/%'
GROUP BY name;
name type processlist_user processlist_host processlist_db processlist_command processlist_time processlist_state processlist_info parent_thread_id role instrumented
thread/innodb/io_ibuf_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES
thread/innodb/io_log_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES
thread/innodb/io_read_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES
thread/innodb/io_write_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES
thread/innodb/page_cleaner_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES
thread/innodb/srv_error_monitor_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES
thread/innodb/srv_lock_timeout_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES
thread/innodb/srv_master_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES
thread/innodb/srv_monitor_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES
thread/innodb/srv_purge_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES
thread/innodb/thd_destructor_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL 1 NULL YES
thread/innodb/thread_pool_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES

View file

@ -525,18 +525,6 @@ NUMERIC_BLOCK_SIZE NULL
ENUM_VALUE_LIST OFF,ON
READ_ONLY NO
COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME INNODB_DEBUG_SYNC
SESSION_VALUE NULL
DEFAULT_VALUE
VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE VARCHAR
VARIABLE_COMMENT debug_sync for innodb purge threads. Use it to set up sync points for all purge threads at once. The commands will be applied sequentially at the beginning of purging the next undo record.
NUMERIC_MIN_VALUE NULL
NUMERIC_MAX_VALUE NULL
NUMERIC_BLOCK_SIZE NULL
ENUM_VALUE_LIST NULL
READ_ONLY NO
COMMAND_LINE_ARGUMENT NONE
VARIABLE_NAME INNODB_DEFAULT_ENCRYPTION_KEY_ID
SESSION_VALUE 1
DEFAULT_VALUE 1

View file

@ -19,6 +19,7 @@
INCLUDE(innodb.cmake)
INCLUDE_DIRECTORIES(${PROJECT_SOURCE_DIR}/tpool)
SET(INNOBASE_SOURCES
btr/btr0btr.cc
@ -154,7 +155,8 @@ MYSQL_ADD_PLUGIN(innobase ${INNOBASE_SOURCES} STORAGE_ENGINE
${CRC32_LIBRARY}
${NUMA_LIBRARY}
${LIBSYSTEMD}
${LINKER_SCRIPT})
${LINKER_SCRIPT}
tpool)
IF(NOT TARGET innobase)
RETURN()

View file

@ -3267,10 +3267,6 @@ btr_cur_prefetch_siblings(
page_id_t(block->page.id.space(), right_page_no),
block->zip_size(), false);
}
if (left_page_no != FIL_NULL
|| right_page_no != FIL_NULL) {
os_aio_simulated_wake_handler_threads();
}
}
/*************************************************************//**

View file

@ -71,6 +71,21 @@ The difference between btr_defragment_count and btr_defragment_failures shows
the amount of effort wasted. */
Atomic_counter<ulint> btr_defragment_count;
bool btr_defragment_active;
struct defragment_chunk_state_t
{
btr_defragment_item_t* m_item;
};
static defragment_chunk_state_t defragment_chunk_state;
static void btr_defragment_chunk(void*);
static tpool::timer* btr_defragment_timer;
static tpool::task_group task_group(1);
static tpool::task btr_defragment_task(btr_defragment_chunk, 0, &task_group);
static void btr_defragment_start();
/******************************************************************//**
Constructor for btr_defragment_item_t. */
btr_defragment_item_t::btr_defragment_item_t(
@ -94,6 +109,11 @@ btr_defragment_item_t::~btr_defragment_item_t() {
}
}
static void submit_defragment_task(void*arg=0)
{
srv_thread_pool->submit_task(&btr_defragment_task);
}
/******************************************************************//**
Initialize defragmentation. */
void
@ -101,6 +121,9 @@ btr_defragment_init()
{
srv_defragment_interval = 1000000000ULL / srv_defragment_frequency;
mutex_create(LATCH_ID_BTR_DEFRAGMENT_MUTEX, &btr_defragment_mutex);
defragment_chunk_state.m_item = 0;
btr_defragment_timer = srv_thread_pool->create_timer(submit_defragment_task);
btr_defragment_active = true;
}
/******************************************************************//**
@ -108,6 +131,11 @@ Shutdown defragmentation. Release all resources. */
void
btr_defragment_shutdown()
{
if (!btr_defragment_timer)
return;
delete btr_defragment_timer;
btr_defragment_timer = 0;
task_group.cancel_pending(&btr_defragment_task);
mutex_enter(&btr_defragment_mutex);
std::list< btr_defragment_item_t* >::iterator iter = btr_defragment_wq.begin();
while(iter != btr_defragment_wq.end()) {
@ -117,6 +145,7 @@ btr_defragment_shutdown()
}
mutex_exit(&btr_defragment_mutex);
mutex_free(&btr_defragment_mutex);
btr_defragment_active = false;
}
@ -197,6 +226,10 @@ btr_defragment_add_index(
btr_defragment_item_t* item = new btr_defragment_item_t(pcur, event);
mutex_enter(&btr_defragment_mutex);
btr_defragment_wq.push_back(item);
if(btr_defragment_wq.size() == 1){
/* Kick off defragmentation work */
btr_defragment_start();
}
mutex_exit(&btr_defragment_mutex);
return event;
}
@ -674,14 +707,29 @@ btr_defragment_n_pages(
return current_block;
}
/** Whether btr_defragment_thread is active */
bool btr_defragment_thread_active;
/** Merge consecutive b-tree pages into fewer pages to defragment indexes */
extern "C" UNIV_INTERN
os_thread_ret_t
DECLARE_THREAD(btr_defragment_thread)(void*)
void btr_defragment_start() {
if (!srv_defragment)
return;
ut_ad(!btr_defragment_wq.empty());
submit_defragment_task();
}
/**
Callback used by defragment timer
Throttling "sleep", is implemented via rescheduling the
threadpool timer, which, when fired, will resume the work again,
where it is left.
The state (current item) is stored in function parameter.
*/
static void btr_defragment_chunk(void*)
{
defragment_chunk_state_t* state = &defragment_chunk_state;
btr_pcur_t* pcur;
btr_cur_t* cursor;
dict_index_t* index;
@ -690,37 +738,24 @@ DECLARE_THREAD(btr_defragment_thread)(void*)
buf_block_t* last_block;
while (srv_shutdown_state == SRV_SHUTDOWN_NONE) {
ut_ad(btr_defragment_thread_active);
/* If defragmentation is disabled, sleep before
checking whether it's enabled. */
if (!srv_defragment) {
os_thread_sleep(BTR_DEFRAGMENT_SLEEP_IN_USECS);
continue;
}
/* The following call won't remove the item from work queue.
We only get a pointer to it to work on. This will make sure
when user issue a kill command, all indices are in the work
queue to be searched. This also means that the user thread
cannot directly remove the item from queue (since we might be
using it). So user thread only marks index as removed. */
btr_defragment_item_t* item = btr_defragment_get_item();
/* If work queue is empty, sleep and check later. */
if (!item) {
os_thread_sleep(BTR_DEFRAGMENT_SLEEP_IN_USECS);
continue;
if (!state->m_item) {
state->m_item = btr_defragment_get_item();
}
/* If an index is marked as removed, we remove it from the work
queue. No other thread could be using this item at this point so
it's safe to remove now. */
if (item->removed) {
btr_defragment_remove_item(item);
continue;
while (state->m_item && state->m_item->removed) {
btr_defragment_remove_item(state->m_item);
state->m_item = btr_defragment_get_item();
}
if (!state->m_item) {
/* Queue empty */
return;
}
pcur = item->pcur;
pcur = state->m_item->pcur;
ulonglong now = my_interval_timer();
ulonglong elapsed = now - item->last_processed;
ulonglong elapsed = now - state->m_item->last_processed;
if (elapsed < srv_defragment_interval) {
/* If we see an index again before the interval
@ -729,12 +764,12 @@ DECLARE_THREAD(btr_defragment_thread)(void*)
defragmentation of all indices queue up on a single
thread, it's likely other indices that follow this one
don't need to sleep again. */
os_thread_sleep(static_cast<ulint>
((srv_defragment_interval - elapsed)
/ 1000));
int sleep_ms = (int)((srv_defragment_interval - elapsed) / 1000 / 1000);
if (sleep_ms) {
btr_defragment_timer->set_time(sleep_ms, 0);
return;
}
}
now = my_interval_timer();
mtr_start(&mtr);
cursor = btr_pcur_get_btr_cur(pcur);
index = btr_cur_get_index(cursor);
@ -763,7 +798,7 @@ DECLARE_THREAD(btr_defragment_thread)(void*)
btr_pcur_store_position(pcur, &mtr);
mtr_commit(&mtr);
/* Update the last_processed time of this index. */
item->last_processed = now;
state->m_item->last_processed = now;
} else {
dberr_t err = DB_SUCCESS;
mtr_commit(&mtr);
@ -786,11 +821,8 @@ DECLARE_THREAD(btr_defragment_thread)(void*)
}
}
btr_defragment_remove_item(item);
btr_defragment_remove_item(state->m_item);
state->m_item = NULL;
}
}
btr_defragment_thread_active = false;
os_thread_exit();
OS_THREAD_DUMMY_RETURN;
}

View file

@ -2021,7 +2021,7 @@ buf_pool_init_instance(
/* Initialize the temporal memory array and slots */
new(&buf_pool->io_buf) buf_pool_t::io_buf_t(
(srv_n_read_io_threads + srv_n_write_io_threads)
* (8 * OS_AIO_N_PENDING_IOS_PER_THREAD));
* OS_AIO_N_PENDING_IOS_PER_THREAD);
buf_pool_mutex_exit(buf_pool);
@ -3186,48 +3186,39 @@ calc_buf_pool_size:
return;
}
/** This is the thread for resizing buffer pool. It waits for an event and
when waked up either performs a resizing and sleeps again.
@return this function does not return, calls os_thread_exit()
*/
extern "C"
os_thread_ret_t
DECLARE_THREAD(buf_resize_thread)(void*)
/* Thread pool task invoked by innodb_buffer_pool_size changes. */
static void buf_resize_callback(void *)
{
my_thread_init();
while (srv_shutdown_state == SRV_SHUTDOWN_NONE) {
os_event_wait(srv_buf_resize_event);
os_event_reset(srv_buf_resize_event);
if (srv_shutdown_state != SRV_SHUTDOWN_NONE) {
break;
}
buf_pool_mutex_enter_all();
if (srv_buf_pool_old_size == srv_buf_pool_size) {
buf_pool_mutex_exit_all();
std::ostringstream sout;
sout << "Size did not change (old size = new size = "
<< srv_buf_pool_size << ". Nothing to do.";
buf_resize_status(sout.str().c_str());
/* nothing to do */
continue;
}
ut_a(srv_shutdown_state == SRV_SHUTDOWN_NONE);
buf_pool_mutex_enter_all();
if (srv_buf_pool_old_size == srv_buf_pool_size) {
buf_pool_mutex_exit_all();
buf_pool_resize();
std::ostringstream sout;
sout << "Size did not change (old size = new size = "
<< srv_buf_pool_size << ". Nothing to do.";
buf_resize_status(sout.str().c_str());
return;
}
srv_buf_resize_thread_active = false;
my_thread_end();
os_thread_exit();
OS_THREAD_DUMMY_RETURN;
buf_pool_mutex_exit_all();
buf_pool_resize();
}
/* Ensure that task does not run in parallel, by setting max_concurrency to 1 for the thread group */
static tpool::task_group single_threaded_group(1);
static tpool::waitable_task buf_resize_task(buf_resize_callback,
nullptr, &single_threaded_group);
void buf_resize_start()
{
srv_thread_pool->submit_task(&buf_resize_task);
}
void buf_resize_shutdown()
{
buf_resize_task.wait();
}
#ifdef BTR_CUR_HASH_ADAPT
/** Clear the adaptive hash index on all pages in the buffer pool. */
void

View file

@ -99,10 +99,6 @@ void
buf_dblwr_sync_datafiles()
/*======================*/
{
/* Wake possible simulated aio thread to actually post the
writes to the operating system */
os_aio_simulated_wake_handler_threads();
/* Wait that all async writes to tablespaces have been posted to
the OS */
os_aio_wait_until_no_pending_writes();
@ -914,11 +910,6 @@ buf_dblwr_write_block_to_datafile(
ut_a(buf_page_in_file(bpage));
ulint type = IORequest::WRITE;
if (sync) {
type |= IORequest::DO_NOT_WAKE;
}
IORequest request(type, const_cast<buf_page_t*>(bpage));
/* We request frame here to get correct buffer in case of
@ -950,9 +941,8 @@ buf_dblwr_write_block_to_datafile(
}
/********************************************************************//**
Flushes possible buffered writes from the doublewrite memory buffer to disk,
and also wakes up the aio thread if simulated aio is used. It is very
important to call this function after a batch of writes has been posted,
Flushes possible buffered writes from the doublewrite memory buffer to disk.
It is very important to call this function after a batch of writes has been posted,
and also when we may have to wait for a page latch! Otherwise a deadlock
of threads can occur. */
void
@ -982,13 +972,6 @@ try_again:
if (buf_dblwr->first_free == 0) {
mutex_exit(&buf_dblwr->mutex);
/* Wake possible simulated aio thread as there could be
system temporary tablespace pages active for flushing.
Note: system temporary tablespace pages are not scheduled
for doublewrite. */
os_aio_simulated_wake_handler_threads();
return;
}
@ -1090,12 +1073,6 @@ flush:
buf_dblwr_write_block_to_datafile(
buf_dblwr->buf_block_arr[i], false);
}
/* Wake possible simulated aio thread to actually post the
writes to the operating system. We don't flush the files
at this point. We leave it to the IO helper thread to flush
datafiles when the whole batch has been processed. */
os_aio_simulated_wake_handler_threads();
}
/********************************************************************//**

View file

@ -45,6 +45,8 @@ Created April 08, 2011 Vasil Dimov
#include "mysql/service_wsrep.h" /* wsrep_recovery */
#include <my_service_manager.h>
static void buf_do_load_dump();
enum status_severity {
STATUS_INFO,
STATUS_ERR
@ -80,7 +82,7 @@ buf_dump_start()
/*============*/
{
buf_dump_should_start = true;
os_event_set(srv_buf_dump_event);
buf_do_load_dump();
}
/*****************************************************************//**
@ -93,7 +95,7 @@ buf_load_start()
/*============*/
{
buf_load_should_start = true;
os_event_set(srv_buf_dump_event);
buf_do_load_dump();
}
/*****************************************************************//**
@ -720,9 +722,6 @@ buf_load()
page_id_t(this_space_id, BUF_DUMP_PAGE(dump[i])),
zip_size, true);
if (i % 64 == 63) {
os_aio_simulated_wake_handler_threads();
}
if (buf_load_abort_flag) {
if (space != NULL) {
@ -802,22 +801,13 @@ buf_load_abort()
}
/*****************************************************************//**
This is the main thread for buffer pool dump/load. It waits for an
event and when waked up either performs a dump or load and sleeps
again.
@return this function does not return, it calls os_thread_exit() */
extern "C"
os_thread_ret_t
DECLARE_THREAD(buf_dump_thread)(void*)
This is the main task for buffer pool dump/load. when scheduled
either performs a dump or load, depending on server state, state of the variables etc- */
static void buf_dump_load_func(void *)
{
my_thread_init();
ut_ad(!srv_read_only_mode);
/* JAN: TODO: MySQL 5.7 PSI
#ifdef UNIV_PFS_THREAD
pfs_register_thread(buf_dump_thread_key);
#endif */ /* UNIV_PFS_THREAD */
if (srv_buffer_pool_load_at_startup) {
static bool first_time = true;
if (first_time && srv_buffer_pool_load_at_startup) {
#ifdef WITH_WSREP
if (!get_wsrep_recovery()) {
@ -827,27 +817,24 @@ DECLARE_THREAD(buf_dump_thread)(void*)
}
#endif /* WITH_WSREP */
}
first_time = false;
while (!SHUTTING_DOWN()) {
os_event_wait(srv_buf_dump_event);
if (buf_dump_should_start) {
buf_dump_should_start = false;
buf_dump(TRUE /* quit on shutdown */);
}
if (buf_load_should_start) {
buf_load_should_start = false;
buf_load();
}
if (buf_dump_should_start || buf_load_should_start) {
continue;
if (!buf_dump_should_start && !buf_load_should_start) {
return;
}
os_event_reset(srv_buf_dump_event);
}
/* In shutdown */
if (srv_buffer_pool_dump_at_shutdown && srv_fast_shutdown != 2) {
if (export_vars.innodb_buffer_pool_load_incomplete) {
buf_dump_status(STATUS_INFO,
@ -860,13 +847,33 @@ DECLARE_THREAD(buf_dump_thread)(void*)
buf_dump(FALSE/* do complete dump at shutdown */);
}
}
srv_buf_dump_thread_active = false;
my_thread_end();
/* We count the number of threads in os_thread_exit(). A created
thread should always use that to exit and not use return() to exit. */
os_thread_exit();
OS_THREAD_DUMMY_RETURN;
}
/* Execute tak with max.concurrency */
tpool::task_group tpool_group(1);
static tpool::waitable_task buf_dump_load_task(buf_dump_load_func, &tpool_group);
static bool load_dump_enabled;
/** Start async buffer pool load, if srv_buffer_pool_load_at_startup was set.*/
void buf_load_at_startup()
{
load_dump_enabled = true;
if (srv_buffer_pool_load_at_startup) {
buf_do_load_dump();
}
}
static void buf_do_load_dump()
{
if (!load_dump_enabled || buf_dump_load_task.is_running())
return;
srv_thread_pool->submit_task(&buf_dump_load_task);
}
/** Wait for currently running load/dumps to finish*/
void buf_load_dump_end()
{
ut_ad(SHUTTING_DOWN());
buf_dump_load_task.wait();
}

View file

@ -89,6 +89,11 @@ mysql_pfs_key_t page_cleaner_thread_key;
/** Event to synchronise with the flushing. */
os_event_t buf_flush_event;
static void pc_flush_slot_func(void *);
static tpool::task_group page_cleaner_task_group(1);
static tpool::waitable_task pc_flush_slot_task(
pc_flush_slot_func, 0, &page_cleaner_task_group);
/** State for page cleaner array slot */
enum page_cleaner_state_t {
/** Not requested any yet.
@ -146,8 +151,6 @@ struct page_cleaner_t {
ib_mutex_t mutex; /*!< mutex to protect whole of
page_cleaner_t struct and
page_cleaner_slot_t slots. */
os_event_t is_requested; /*!< event to activate worker
threads. */
os_event_t is_finished; /*!< event to signal that all
slots were finished. */
os_event_t is_started; /*!< event to signal that
@ -186,6 +189,9 @@ struct page_cleaner_t {
#endif /* UNIV_DEBUG */
};
static void pc_submit_task();
static void pc_wait_all_tasks();
static page_cleaner_t page_cleaner;
#ifdef UNIV_DEBUG
@ -946,8 +952,8 @@ buf_flush_init_for_writing(
}
/********************************************************************//**
Does an asynchronous write of a buffer page. NOTE: in simulated aio and
also when the doublewrite buffer is used, we must call
Does an asynchronous write of a buffer page. NOTE: when the
doublewrite buffer is used, we must call
buf_dblwr_flush_buffered_writes after we have posted a batch of
writes! */
static
@ -1046,7 +1052,7 @@ buf_flush_write_block_low(
&& space->use_doublewrite();
if (!use_doublewrite) {
ulint type = IORequest::WRITE | IORequest::DO_NOT_WAKE;
ulint type = IORequest::WRITE;
IORequest request(type, bpage);
@ -1100,9 +1106,7 @@ buf_flush_write_block_low(
/********************************************************************//**
Writes a flushable page asynchronously from the buffer pool to a file.
NOTE: in simulated aio we must call
os_aio_simulated_wake_handler_threads after we have posted a batch of
writes! NOTE: buf_pool->mutex and buf_page_get_mutex(bpage) must be
NOTE: buf_pool->mutex and buf_page_get_mutex(bpage) must be
held upon entering this function, and they will be released by this
function if it returns true.
@return TRUE if the page was flushed */
@ -1931,8 +1935,6 @@ buf_flush_end(
if (!srv_read_only_mode) {
buf_dblwr_flush_buffered_writes();
} else {
os_aio_simulated_wake_handler_threads();
}
}
@ -2659,7 +2661,6 @@ buf_flush_page_cleaner_init(void)
mutex_create(LATCH_ID_PAGE_CLEANER, &page_cleaner.mutex);
page_cleaner.is_requested = os_event_create("pc_is_requested");
page_cleaner.is_finished = os_event_create("pc_is_finished");
page_cleaner.is_started = os_event_create("pc_is_started");
page_cleaner.n_slots = static_cast<ulint>(srv_buf_pool_instances);
@ -2722,8 +2723,10 @@ pc_request(
page_cleaner.n_slots_flushing = 0;
page_cleaner.n_slots_finished = 0;
os_event_set(page_cleaner.is_requested);
/* Submit slots-1 tasks, coordinator also does the work itself */
for (ulint i = pc_flush_slot_task.get_ref_count(); i < page_cleaner.n_slots - 1; i++) {
pc_submit_task();
}
mutex_exit(&page_cleaner.mutex);
}
@ -2741,9 +2744,7 @@ pc_flush_slot(void)
mutex_enter(&page_cleaner.mutex);
if (!page_cleaner.n_slots_requested) {
os_event_reset(page_cleaner.is_requested);
} else {
if (page_cleaner.n_slots_requested) {
page_cleaner_slot_t* slot = NULL;
ulint i;
@ -2771,10 +2772,6 @@ pc_flush_slot(void)
goto finish_mutex;
}
if (page_cleaner.n_slots_requested == 0) {
os_event_reset(page_cleaner.is_requested);
}
mutex_exit(&page_cleaner.mutex);
lru_tm = ut_time_ms();
@ -2956,18 +2953,6 @@ void buf_flush_page_cleaner_disabled_debug_update(THD*,
}
innodb_page_cleaner_disabled_debug = false;
/* Enable page cleaner threads. */
while (srv_shutdown_state == SRV_SHUTDOWN_NONE) {
mutex_enter(&page_cleaner.mutex);
const ulint n = page_cleaner.n_disabled_debug;
mutex_exit(&page_cleaner.mutex);
/* Check if all threads have been enabled, to avoid
problem when we decide to re-disable them soon. */
if (n == 0) {
break;
}
}
return;
}
@ -2976,34 +2961,7 @@ void buf_flush_page_cleaner_disabled_debug_update(THD*,
}
innodb_page_cleaner_disabled_debug = true;
while (srv_shutdown_state == SRV_SHUTDOWN_NONE) {
/* Workers are possibly sleeping on is_requested.
We have to wake them, otherwise they could possibly
have never noticed, that they should be disabled,
and we would wait for them here forever.
That's why we have sleep-loop instead of simply
waiting on some disabled_debug_event. */
os_event_set(page_cleaner.is_requested);
mutex_enter(&page_cleaner.mutex);
ut_ad(page_cleaner.n_disabled_debug
<= srv_n_page_cleaners);
if (page_cleaner.n_disabled_debug
== srv_n_page_cleaners) {
mutex_exit(&page_cleaner.mutex);
break;
}
mutex_exit(&page_cleaner.mutex);
os_thread_sleep(100000);
}
pc_wait_all_tasks();
}
#endif /* UNIV_DEBUG */
@ -3316,7 +3274,7 @@ DECLARE_THREAD(buf_flush_page_cleaner_coordinator)(void*)
/* At this point all threads including the master and the purge
thread must have been suspended. */
ut_a(srv_get_active_thread_type() == SRV_NONE);
ut_a(!srv_any_background_activity());
ut_a(srv_shutdown_state == SRV_SHUTDOWN_FLUSH_PHASE);
/* We can now make a final sweep on flushing the buffer pool
@ -3348,7 +3306,7 @@ DECLARE_THREAD(buf_flush_page_cleaner_coordinator)(void*)
} while (!success || n_flushed > 0);
/* Some sanity checks */
ut_a(srv_get_active_thread_type() == SRV_NONE);
ut_a(!srv_any_background_activity());
ut_a(srv_shutdown_state == SRV_SHUTDOWN_FLUSH_PHASE);
for (ulint i = 0; i < srv_buf_pool_instances; i++) {
@ -3359,21 +3317,12 @@ DECLARE_THREAD(buf_flush_page_cleaner_coordinator)(void*)
/* We have lived our life. Time to die. */
thread_exit:
/* All worker threads are waiting for the event here,
and no more access to page_cleaner structure by them.
Wakes worker threads up just to make them exit. */
/* Wait for worker tasks to finish */
page_cleaner.is_running = false;
/* waiting for all worker threads exit */
while (page_cleaner.n_workers) {
os_event_set(page_cleaner.is_requested);
os_thread_sleep(10000);
}
pc_wait_all_tasks();
mutex_destroy(&page_cleaner.mutex);
os_event_destroy(page_cleaner.is_finished);
os_event_destroy(page_cleaner.is_requested);
os_event_destroy(page_cleaner.is_started);
buf_page_cleaner_is_active = false;
@ -3386,113 +3335,39 @@ thread_exit:
OS_THREAD_DUMMY_RETURN;
}
static void pc_flush_slot_func(void*)
{
while (pc_flush_slot() > 0) {};
}
/** Adjust thread count for page cleaner workers.
@param[in] new_cnt Number of threads to be used */
void
buf_flush_set_page_cleaner_thread_cnt(ulong new_cnt)
{
mutex_enter(&page_cleaner.mutex);
page_cleaner_task_group.set_max_tasks((uint)new_cnt);
srv_n_page_cleaners = new_cnt;
if (new_cnt > page_cleaner.n_workers) {
/* User has increased the number of page
cleaner threads. */
ulint add = new_cnt - page_cleaner.n_workers;
for (ulint i = 0; i < add; i++) {
os_thread_id_t cleaner_thread_id;
os_thread_create(buf_flush_page_cleaner_worker, NULL, &cleaner_thread_id);
}
}
mutex_exit(&page_cleaner.mutex);
/* Wait until defined number of workers has started. */
while (page_cleaner.is_running &&
page_cleaner.n_workers != (srv_n_page_cleaners - 1)) {
os_event_set(page_cleaner.is_requested);
os_event_reset(page_cleaner.is_started);
os_event_wait_time(page_cleaner.is_started, 1000000);
}
}
/******************************************************************//**
Worker thread of page_cleaner.
@return a dummy parameter */
extern "C"
os_thread_ret_t
DECLARE_THREAD(buf_flush_page_cleaner_worker)(
/*==========================================*/
void* arg MY_ATTRIBUTE((unused)))
/*!< in: a dummy parameter required by
os_thread_create */
void pc_submit_task()
{
my_thread_init();
#ifndef DBUG_OFF
os_thread_id_t cleaner_thread_id = os_thread_get_curr_id();
#ifdef UNIV_DEBUG
if (innodb_page_cleaner_disabled_debug)
return;
#endif
srv_thread_pool->submit_task(&pc_flush_slot_task);
}
mutex_enter(&page_cleaner.mutex);
ulint thread_no = page_cleaner.n_workers++;
DBUG_LOG("ib_buf", "Thread " << cleaner_thread_id
<< " started; n_workers=" << page_cleaner.n_workers);
/* Signal that we have started */
os_event_set(page_cleaner.is_started);
mutex_exit(&page_cleaner.mutex);
#ifdef UNIV_LINUX
/* linux might be able to set different setting for each thread
worth to try to set high priority for page cleaner threads */
if (buf_flush_page_cleaner_set_priority(
buf_flush_page_cleaner_priority)) {
ib::info() << "page_cleaner worker priority: "
<< buf_flush_page_cleaner_priority;
}
#endif /* UNIV_LINUX */
while (true) {
os_event_wait(page_cleaner.is_requested);
ut_d(buf_flush_page_cleaner_disabled_loop());
if (!page_cleaner.is_running) {
break;
}
ut_ad(srv_n_page_cleaners >= 1);
/* If number of page cleaner threads is decreased
exit those that are not anymore needed. */
if (srv_shutdown_state == SRV_SHUTDOWN_NONE &&
thread_no >= (srv_n_page_cleaners - 1)) {
DBUG_LOG("ib_buf", "Exiting "
<< thread_no
<< " page cleaner worker thread_id "
<< os_thread_pf(cleaner_thread_id)
<< " total threads " << srv_n_page_cleaners << ".");
break;
}
pc_flush_slot();
}
mutex_enter(&page_cleaner.mutex);
page_cleaner.n_workers--;
DBUG_LOG("ib_buf", "Thread " << cleaner_thread_id
<< " exiting; n_workers=" << page_cleaner.n_workers);
/* Signal that we have stopped */
os_event_set(page_cleaner.is_started);
mutex_exit(&page_cleaner.mutex);
my_thread_end();
os_thread_exit();
OS_THREAD_DUMMY_RETURN;
void pc_wait_all_tasks()
{
pc_flush_slot_task.wait();
}
/*******************************************************************//**

View file

@ -529,9 +529,6 @@ buf_flush_or_remove_page(
buf_pool, bpage, BUF_FLUSH_SINGLE_PAGE, false);
if (processed) {
/* Wake possible simulated aio thread to actually
post the writes to the operating system */
os_aio_simulated_wake_handler_threads();
buf_pool_mutex_enter(buf_pool);
} else {
mutex_exit(block_mutex);
@ -1038,7 +1035,7 @@ buf_LRU_check_size_of_non_data_objects(
buf_lru_switched_on_innodb_mon = true;
srv_print_innodb_monitor = TRUE;
os_event_set(srv_monitor_event);
srv_monitor_timer_schedule_now();
}
} else if (buf_lru_switched_on_innodb_mon) {

View file

@ -344,7 +344,7 @@ read_ahead:
if (!ibuf_bitmap_page(cur_page_id, zip_size)) {
count += buf_read_page_low(
&err, false,
IORequest::DO_NOT_WAKE,
0,
ibuf_mode,
cur_page_id, zip_size, false);
@ -364,11 +364,6 @@ read_ahead:
}
}
/* In simulated aio we wake the aio handler threads only after
queuing all aio requests, in native aio the following call does
nothing: */
os_aio_simulated_wake_handler_threads();
if (count) {
DBUG_PRINT("ib_buf", ("random read-ahead %u pages, %u:%u",
@ -440,7 +435,7 @@ buf_read_page_background(const page_id_t page_id, ulint zip_size, bool sync)
count = buf_read_page_low(
&err, sync,
IORequest::DO_NOT_WAKE | IORequest::IGNORE_MISSING,
IORequest::IGNORE_MISSING,
BUF_READ_ANY_PAGE,
page_id, zip_size, false);
@ -712,7 +707,7 @@ buf_read_ahead_linear(const page_id_t page_id, ulint zip_size, bool ibuf)
if (!ibuf_bitmap_page(cur_page_id, zip_size)) {
count += buf_read_page_low(
&err, false,
IORequest::DO_NOT_WAKE,
0,
ibuf_mode, cur_page_id, zip_size, false);
switch (err) {
@ -732,12 +727,6 @@ buf_read_ahead_linear(const page_id_t page_id, ulint zip_size, bool ibuf)
}
}
/* In simulated aio we wake the aio handler threads only after
queuing all aio requests, in native aio the following call does
nothing: */
os_aio_simulated_wake_handler_threads();
if (count) {
DBUG_PRINT("ib_buf", ("linear read-ahead " ULINTPF " pages, "
"%u:%u",
@ -788,7 +777,6 @@ buf_read_recv_pages(
buf_pool = buf_pool_get(cur_page_id);
while (buf_pool->n_pend_reads >= recv_n_pool_free_frames / 2) {
os_aio_simulated_wake_handler_threads();
os_thread_sleep(10000);
count++;
@ -814,7 +802,7 @@ buf_read_recv_pages(
} else {
buf_read_page_low(
&err, false,
IORequest::DO_NOT_WAKE,
0,
BUF_READ_ANY_PAGE,
cur_page_id, zip_size, true);
}
@ -825,8 +813,6 @@ buf_read_recv_pages(
}
}
os_aio_simulated_wake_handler_threads();
DBUG_PRINT("ib_buf", ("recovery read-ahead (%u pages)",
unsigned(n_stored)));
}

View file

@ -1355,7 +1355,7 @@ dict_check_if_system_table_exists(
dict_table_t* sys_table;
dberr_t error = DB_SUCCESS;
ut_a(srv_get_active_thread_type() == SRV_NONE);
ut_a(!srv_any_background_activity());
mutex_enter(&dict_sys.mutex);
@ -1395,7 +1395,7 @@ dict_create_or_check_foreign_constraint_tables(void)
dberr_t sys_foreign_err;
dberr_t sys_foreign_cols_err;
ut_a(srv_get_active_thread_type() == SRV_NONE);
ut_a(!srv_any_background_activity());
/* Note: The master thread has not been started at this point. */
@ -1537,7 +1537,7 @@ dict_create_or_check_sys_virtual()
my_bool srv_file_per_table_backup;
dberr_t err;
ut_a(srv_get_active_thread_type() == SRV_NONE);
ut_a(!srv_any_background_activity());
/* Note: The master thread has not been started at this point. */
err = dict_check_if_system_table_exists(
@ -2064,7 +2064,7 @@ dict_create_or_check_sys_tablespace(void)
dberr_t sys_tablespaces_err;
dberr_t sys_datafiles_err;
ut_a(srv_get_active_thread_type() == SRV_NONE);
ut_a(!srv_any_background_activity());
/* Note: The master thread has not been started at this point. */

View file

@ -44,7 +44,6 @@ typedef defrag_pool_t::iterator defrag_pool_iterator_t;
by background defragmentation. */
defrag_pool_t defrag_pool;
extern bool dict_stats_start_shutdown;
/*****************************************************************//**
Initialize the defrag pool, called once during thread initialization. */
@ -134,10 +133,11 @@ dict_stats_defrag_pool_add(
item.table_id = index->table->id;
item.index_id = index->id;
defrag_pool.push_back(item);
if (defrag_pool.size() == 1) {
/* Kick off dict stats optimizer work */
dict_stats_schedule_now();
}
mutex_exit(&defrag_pool_mutex);
os_event_set(dict_stats_event);
}
/*****************************************************************//**
@ -224,7 +224,7 @@ void
dict_defrag_process_entries_from_defrag_pool()
/*==========================================*/
{
while (defrag_pool.size() && !dict_stats_start_shutdown) {
while (defrag_pool.size()) {
dict_stats_process_entry_from_defrag_pool();
}
}

View file

@ -42,24 +42,11 @@ Created Apr 25, 2012 Vasil Dimov
/** Minimum time interval between stats recalc for a given table */
#define MIN_RECALC_INTERVAL 10 /* seconds */
/** Event to wake up dict_stats_thread on dict_stats_recalc_pool_add()
or shutdown. Not protected by any mutex. */
os_event_t dict_stats_event;
/** Variable to initiate shutdown the dict stats thread. Note we don't
use 'srv_shutdown_state' because we want to shutdown dict stats thread
before purge thread. */
bool dict_stats_start_shutdown;
/** Event to wait for shutdown of the dict stats thread */
os_event_t dict_stats_shutdown_event;
static void dict_stats_schedule(int ms);
#ifdef UNIV_DEBUG
/** Used by SET GLOBAL innodb_dict_stats_disabled_debug = 1; */
my_bool innodb_dict_stats_disabled_debug;
static os_event_t dict_stats_disabled_event;
#endif /* UNIV_DEBUG */
/** This mutex protects the "recalc_pool" variable. */
@ -118,7 +105,9 @@ static
void
dict_stats_recalc_pool_add(
/*=======================*/
const dict_table_t* table) /*!< in: table to add */
const dict_table_t* table, /*!< in: table to add */
bool schedule_dict_stats_task = true /*!< in: schedule dict stats task */
)
{
ut_ad(!srv_read_only_mode);
@ -136,10 +125,11 @@ dict_stats_recalc_pool_add(
}
recalc_pool.push_back(table->id);
if (recalc_pool.size() == 1 && schedule_dict_stats_task) {
dict_stats_schedule_now();
}
mutex_exit(&recalc_pool_mutex);
os_event_set(dict_stats_event);
}
#ifdef WITH_WSREP
@ -295,13 +285,10 @@ dict_stats_wait_bg_to_stop_using_table(
Initialize global variables needed for the operation of dict_stats_thread()
Must be called before dict_stats_thread() is started. */
void
dict_stats_thread_init()
dict_stats_init()
{
ut_a(!srv_read_only_mode);
dict_stats_event = os_event_create(0);
dict_stats_shutdown_event = os_event_create(0);
ut_d(dict_stats_disabled_event = os_event_create(0));
/* The recalc_pool_mutex is acquired from:
1) the background stats gathering thread before any other latch
@ -324,37 +311,34 @@ dict_stats_thread_init()
}
/*****************************************************************//**
Free resources allocated by dict_stats_thread_init(), must be called
after dict_stats_thread() has exited. */
Free resources allocated by dict_stats_init(), must be called
after dict_stats task has exited. */
void
dict_stats_thread_deinit()
dict_stats_deinit()
/*======================*/
{
ut_a(!srv_read_only_mode);
ut_ad(!srv_dict_stats_thread_active);
if (!stats_initialised) {
return;
}
ut_a(!srv_read_only_mode);
stats_initialised = false;
dict_stats_recalc_pool_deinit();
dict_defrag_pool_deinit();
mutex_free(&recalc_pool_mutex);
ut_d(os_event_destroy(dict_stats_disabled_event));
os_event_destroy(dict_stats_event);
os_event_destroy(dict_stats_shutdown_event);
dict_stats_start_shutdown = false;
}
/*****************************************************************//**
Get the first table that has been added for auto recalc and eventually
update its stats. */
update its stats.
@return : true if pool was non-empty and first entry does
not needs delay, false otherwise.
*/
static
void
bool
dict_stats_process_entry_from_recalc_pool()
/*=======================================*/
{
@ -362,10 +346,11 @@ dict_stats_process_entry_from_recalc_pool()
ut_ad(!srv_read_only_mode);
next_table_id:
/* pop the first table from the auto recalc pool */
if (!dict_stats_recalc_pool_get(&table_id)) {
/* no tables for auto recalc */
return;
return false;
}
dict_table_t* table;
@ -378,7 +363,7 @@ dict_stats_process_entry_from_recalc_pool()
/* table does not exist, must have been DROPped
after its id was enqueued */
mutex_exit(&dict_sys.mutex);
return;
goto next_table_id;
}
ut_ad(!table->is_temporary());
@ -386,7 +371,7 @@ dict_stats_process_entry_from_recalc_pool()
if (!fil_table_accessible(table)) {
dict_table_close(table, TRUE, FALSE);
mutex_exit(&dict_sys.mutex);
return;
goto next_table_id;
}
table->stats_bg_flag |= BG_STAT_IN_PROGRESS;
@ -399,7 +384,7 @@ dict_stats_process_entry_from_recalc_pool()
find out that this is a problem, then the check below could eventually
be replaced with something else, though a time interval is the natural
approach. */
int ret;
if (difftime(time(NULL), table->stats_last_recalc)
< MIN_RECALC_INTERVAL) {
@ -407,11 +392,13 @@ dict_stats_process_entry_from_recalc_pool()
too frequent stats updates we put back the table on
the auto recalc list and do nothing. */
dict_stats_recalc_pool_add(table);
dict_stats_recalc_pool_add(table, false);
dict_stats_schedule(MIN_RECALC_INTERVAL*1000);
ret = false;
} else {
dict_stats_update(table, DICT_STATS_RECALC_PERSISTENT);
ret = true;
}
mutex_enter(&dict_sys.mutex);
@ -421,6 +408,7 @@ dict_stats_process_entry_from_recalc_pool()
dict_table_close(table, TRUE, FALSE);
mutex_exit(&dict_sys.mutex);
return ret;
}
#ifdef UNIV_DEBUG
@ -430,89 +418,51 @@ dict_stats_process_entry_from_recalc_pool()
void dict_stats_disabled_debug_update(THD*, st_mysql_sys_var*, void*,
const void* save)
{
/* This method is protected by mutex, as every SET GLOBAL .. */
ut_ad(dict_stats_disabled_event != NULL);
const bool disable = *static_cast<const my_bool*>(save);
const int64_t sig_count = os_event_reset(dict_stats_disabled_event);
innodb_dict_stats_disabled_debug = disable;
if (disable) {
os_event_set(dict_stats_event);
os_event_wait_low(dict_stats_disabled_event, sig_count);
}
if (disable)
dict_stats_shutdown();
else
dict_stats_start();
}
#endif /* UNIV_DEBUG */
static tpool::timer* dict_stats_timer;
std::mutex dict_stats_mutex;
/*****************************************************************//**
This is the thread for background stats gathering. It pops tables, from
the auto recalc list and proceeds them, eventually recalculating their
statistics.
@return this function does not return, it calls os_thread_exit() */
extern "C"
os_thread_ret_t
DECLARE_THREAD(dict_stats_thread)(void*)
static void dict_stats_func(void*)
{
my_thread_init();
ut_a(!srv_read_only_mode);
while (dict_stats_process_entry_from_recalc_pool()) {}
dict_defrag_process_entries_from_defrag_pool();
}
#ifdef UNIV_PFS_THREAD
/* JAN: TODO: MySQL 5.7 PSI
pfs_register_thread(dict_stats_thread_key);
*/
#endif /* UNIV_PFS_THREAD */
while (!dict_stats_start_shutdown) {
/* Wake up periodically even if not signaled. This is
because we may lose an event - if the below call to
dict_stats_process_entry_from_recalc_pool() puts the entry back
in the list, the os_event_set() will be lost by the subsequent
os_event_reset(). */
os_event_wait_time(
dict_stats_event, MIN_RECALC_INTERVAL * 1000000);
#ifdef UNIV_DEBUG
while (innodb_dict_stats_disabled_debug) {
os_event_set(dict_stats_disabled_event);
if (dict_stats_start_shutdown) {
break;
}
os_event_wait_time(
dict_stats_event, 100000);
}
#endif /* UNIV_DEBUG */
if (dict_stats_start_shutdown) {
break;
}
dict_stats_process_entry_from_recalc_pool();
dict_defrag_process_entries_from_defrag_pool();
os_event_reset(dict_stats_event);
void dict_stats_start()
{
std::lock_guard<std::mutex> lk(dict_stats_mutex);
if (dict_stats_timer) {
return;
}
dict_stats_timer = srv_thread_pool->create_timer(dict_stats_func);
}
srv_dict_stats_thread_active = false;
os_event_set(dict_stats_shutdown_event);
my_thread_end();
static void dict_stats_schedule(int ms)
{
std::lock_guard<std::mutex> lk(dict_stats_mutex);
if(dict_stats_timer) {
dict_stats_timer->set_time(ms,0);
}
}
/* We count the number of threads in os_thread_exit(). A created
thread should always use that to exit instead of return(). */
os_thread_exit();
OS_THREAD_DUMMY_RETURN;
void dict_stats_schedule_now()
{
dict_stats_schedule(0);
}
/** Shut down the dict_stats_thread. */
void
dict_stats_shutdown()
void dict_stats_shutdown()
{
dict_stats_start_shutdown = true;
os_event_set(dict_stats_event);
os_event_wait(dict_stats_shutdown_event);
std::lock_guard<std::mutex> lk(dict_stats_mutex);
delete dict_stats_timer;
dict_stats_timer = 0;
}

View file

@ -954,7 +954,6 @@ fil_mutex_enter_and_prepare_for_io(
break;
} else {
mutex_exit(&fil_system.mutex);
os_aio_simulated_wake_handler_threads();
os_thread_sleep(20000);
/* Flush tablespaces so that we can
close modified files in the LRU list */
@ -4148,13 +4147,7 @@ fil_io(
} else if (req_type.is_read()
&& !recv_no_ibuf_operations
&& ibuf_page(page_id, zip_size, NULL)) {
mode = OS_AIO_IBUF;
/* Reduce probability of deadlock bugs in connection with ibuf:
do not let the ibuf i/o handler sleep */
req_type.clear_do_not_wake();
} else {
mode = OS_AIO_NORMAL;
}
@ -4200,8 +4193,6 @@ fil_io(
return(DB_TABLESPACE_DELETED);
}
ut_ad(mode != OS_AIO_IBUF || fil_type_is_data(space->purpose));
ulint cur_page_no = page_id.page_no();
fil_node_t* node = UT_LIST_GET_FIRST(space->chain);
@ -4337,34 +4328,27 @@ fil_io(
return(err);
}
/**********************************************************************//**
Waits for an aio operation to complete. This function is used to write the
handler for completed requests. The aio array of pending requests is divided
into segments (see os0file.cc for more info). The thread specifies which
segment it wants to wait for. */
#include <tpool.h>
/**********************************************************************/
/* Callback for AIO completion */
void
fil_aio_wait(
/*=========*/
ulint segment) /*!< in: the number of the segment in the aio
array to wait for */
fil_aio_callback(const tpool::aiocb *cb)
{
fil_node_t* node;
IORequest type;
void* message;
os_aio_userdata_t *data=(os_aio_userdata_t *)cb->m_userdata;
fil_node_t* node= data->node;
IORequest type = data->type;
void* message = data->message;
ut_ad(fil_validate_skip());
dberr_t err = os_aio_handler(segment, &node, &message, &type);
ut_a(err == DB_SUCCESS);
ut_a(cb->m_err == DB_SUCCESS);
if (node == NULL) {
ut_ad(srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS);
return;
}
srv_set_io_thread_op_info(segment, "complete io for fil node");
mutex_enter(&fil_system.mutex);
fil_node_complete_io(node, type);
@ -4382,7 +4366,6 @@ fil_aio_wait(
deadlocks in the i/o system. We keep tablespace 0 data files always
open, and use a special i/o thread to serve insert buffer requests. */
srv_set_io_thread_op_info(segment, "complete io for buf page");
/* async single page writes from the dblwr buffer don't have
access to the page */
@ -4396,7 +4379,7 @@ fil_aio_wait(
bpage->init_on_flush = false;
dblwr = false;
}
err = buf_page_io_complete(bpage, dblwr);
dberr_t err = buf_page_io_complete(bpage, dblwr);
if (err == DB_SUCCESS) {
return;
}

View file

@ -157,7 +157,7 @@ void
Datafile::init_file_info()
{
#ifdef _WIN32
GetFileInformationByHandle(m_handle, &m_file_info);
GetFileInformationByHandle((os_file_t)m_handle, &m_file_info);
#else
fstat(m_handle, &m_file_info);
#endif /* WIN32 */

View file

@ -39,6 +39,12 @@ Completed 2011/7/10 Sunny and Jimmy Yang
/** The FTS optimize thread's work queue. */
ib_wqueue_t* fts_optimize_wq;
static void fts_optimize_callback(void *);
static void timer_callback(void*);
static tpool::timer* timer;
static tpool::task_group task_group(1);
static tpool::task task(fts_optimize_callback,0, &task_group);
/** The FTS vector to store fts_slot_t */
static ib_vector_t* fts_slots;
@ -2535,6 +2541,23 @@ fts_optimize_create_msg(
return(msg);
}
/** Add message to wqueue, signal thread pool*/
void add_msg(fts_msg_t *msg, bool wq_locked = false)
{
ut_a(fts_optimize_wq);
ib_wqueue_add(fts_optimize_wq,msg, msg->heap,wq_locked);
srv_thread_pool->submit_task(&task);
}
/**
Called by "idle" timer. Submits optimize task, which
will only recalculate is_sync_needed, in case the queue is empty.
*/
static void timer_callback(void *)
{
srv_thread_pool->submit_task(&task);
}
/** Add the table to add to the OPTIMIZER's list.
@param[in] table table to add */
void fts_optimize_add_table(dict_table_t* table)
@ -2558,7 +2581,7 @@ void fts_optimize_add_table(dict_table_t* table)
mutex_enter(&fts_optimize_wq->mutex);
ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
add_msg(msg,true);
table->fts->in_queue = true;
@ -2608,7 +2631,7 @@ fts_optimize_remove_table(
remove->event = event;
msg->ptr = remove;
ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
add_msg(msg, true);
mutex_exit(&fts_optimize_wq->mutex);
@ -2643,7 +2666,7 @@ fts_optimize_request_sync_table(
mutex_enter(&fts_optimize_wq->mutex);
ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
add_msg(msg, true);
table->fts->in_queue = true;
@ -2786,24 +2809,22 @@ static void fts_optimize_sync_table(dict_table_t* table)
Optimize all FTS tables.
@return Dummy return */
static
os_thread_ret_t
DECLARE_THREAD(fts_optimize_thread)(
void fts_optimize_callback(
/*================*/
void* arg) /*!< in: work queue*/
{
ulint current = 0;
ibool done = FALSE;
ulint n_tables = 0;
ulint n_optimize = 0;
ib_wqueue_t* wq = (ib_wqueue_t*) arg;
static ulint current = 0;
static ibool done = FALSE;
static ulint n_tables = ib_vector_size(fts_slots);
static ulint n_optimize = 0;
ib_wqueue_t* wq = fts_optimize_wq;
ut_ad(!srv_read_only_mode);
my_thread_init();
ut_ad(fts_slots);
/* Assign number of tables added in fts_slots_t to n_tables */
n_tables = ib_vector_size(fts_slots);
if (!fts_optimize_wq) {
/* Possibly timer initiated callback, can come after FTS_MSG_STOP.*/
return;
}
while (!done && srv_shutdown_state == SRV_SHUTDOWN_NONE) {
@ -2831,17 +2852,15 @@ DECLARE_THREAD(fts_optimize_thread)(
} else if (n_optimize == 0 || !ib_wqueue_is_empty(wq)) {
fts_msg_t* msg;
msg = static_cast<fts_msg_t*>(
ib_wqueue_timedwait(wq, FTS_QUEUE_WAIT_IN_USECS));
msg = static_cast<fts_msg_t*>(ib_wqueue_nowait(wq));
/* Timeout ? */
if (msg == NULL) {
if (fts_is_sync_needed()) {
fts_need_sync = true;
}
continue;
if (n_tables)
timer->set_time(5000, 0);
return;
}
switch (msg->type) {
@ -2908,13 +2927,6 @@ DECLARE_THREAD(fts_optimize_thread)(
ib::info() << "FTS optimize thread exiting.";
os_event_set(fts_opt_shutdown_event);
my_thread_end();
/* We count the number of threads in os_thread_exit(). A created
thread should always use that to exit and not use return() to exit. */
os_thread_exit();
OS_THREAD_DUMMY_RETURN;
}
/**********************************************************************//**
@ -2933,7 +2945,8 @@ fts_optimize_init(void)
/* Create FTS optimize work queue */
fts_optimize_wq = ib_wqueue_create();
ut_a(fts_optimize_wq != NULL);
ut_a(fts_optimize_wq != NULL);
timer = srv_thread_pool->create_timer(timer_callback);
/* Create FTS vector to store fts_slot_t */
heap = mem_heap_create(sizeof(dict_table_t*) * 64);
@ -2962,8 +2975,6 @@ fts_optimize_init(void)
fts_opt_shutdown_event = os_event_create(0);
last_check_sync_time = time(NULL);
os_thread_create(fts_optimize_thread, fts_optimize_wq, NULL);
}
/** Shutdown fts optimize thread. */
@ -2987,15 +2998,18 @@ fts_optimize_shutdown()
/* We tell the OPTIMIZE thread to switch to state done, we
can't delete the work queue here because the add thread needs
deregister the FTS tables. */
delete timer;
timer = NULL;
task_group.cancel_pending(&task);
msg = fts_optimize_create_msg(FTS_MSG_STOP, NULL);
ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
add_msg(msg, false);
os_event_wait(fts_opt_shutdown_event);
os_event_destroy(fts_opt_shutdown_event);
ib_wqueue_free(fts_optimize_wq);
fts_optimize_wq = NULL;
}

View file

@ -55,6 +55,8 @@ this program; if not, write to the Free Software Foundation, Inc.,
#include <mysql/service_thd_alloc.h>
#include <mysql/service_thd_wait.h>
#include "field.h"
#include "srv0srv.h"
// MYSQL_PLUGIN_IMPORT extern my_bool lower_case_file_system;
// MYSQL_PLUGIN_IMPORT extern char mysql_unpacked_real_data_home[];
@ -69,6 +71,7 @@ this program; if not, write to the Free Software Foundation, Inc.,
#include "btr0sea.h"
#include "buf0dblwr.h"
#include "buf0dump.h"
#include "buf0buf.h"
#include "buf0flu.h"
#include "buf0lru.h"
#include "dict0boot.h"
@ -121,8 +124,8 @@ void thd_clear_error(MYSQL_THD thd);
TABLE *find_fk_open_table(THD *thd, const char *db, size_t db_len,
const char *table, size_t table_len);
MYSQL_THD create_thd();
void destroy_thd(MYSQL_THD thd);
MYSQL_THD create_background_thd();
void destroy_background_thd(MYSQL_THD thd);
void reset_thd(MYSQL_THD thd);
TABLE *open_purge_table(THD *thd, const char *db, size_t dblen,
const char *tb, size_t tblen);
@ -250,62 +253,7 @@ is_partition(
return strstr(file_name, table_name_t::part_suffix);
}
/** Signal to shut down InnoDB (NULL if shutdown was signaled, or if
running in innodb_read_only mode, srv_read_only_mode) */
std::atomic <st_my_thread_var *> srv_running;
/** Service thread that waits for the server shutdown and stops purge threads.
Purge workers have THDs that are needed to calculate virtual columns.
This THDs must be destroyed rather early in the server shutdown sequence.
This service thread creates a THD and idly waits for it to get a signal to
die. Then it notifies all purge workers to shutdown.
*/
static pthread_t thd_destructor_thread;
pthread_handler_t
thd_destructor_proxy(void *)
{
mysql_mutex_t thd_destructor_mutex;
mysql_cond_t thd_destructor_cond;
my_thread_init();
mysql_mutex_init(PSI_NOT_INSTRUMENTED, &thd_destructor_mutex, 0);
mysql_cond_init(PSI_NOT_INSTRUMENTED, &thd_destructor_cond, 0);
st_my_thread_var *myvar= _my_thread_var();
myvar->current_mutex = &thd_destructor_mutex;
myvar->current_cond = &thd_destructor_cond;
THD *thd= create_thd();
thd_proc_info(thd, "InnoDB shutdown handler");
mysql_mutex_lock(&thd_destructor_mutex);
srv_running.store(myvar, std::memory_order_relaxed);
/* wait until the server wakes the THD to abort and die */
while (!myvar->abort)
mysql_cond_wait(&thd_destructor_cond, &thd_destructor_mutex);
mysql_mutex_unlock(&thd_destructor_mutex);
srv_running.store(NULL, std::memory_order_relaxed);
while (srv_fast_shutdown == 0 &&
(trx_sys.any_active_transactions() ||
(uint)thread_count > srv_n_purge_threads + 1)) {
thd_proc_info(thd, "InnoDB slow shutdown wait");
os_thread_sleep(1000);
}
/* Some background threads might generate undo pages that will
need to be purged, so they have to be shut down before purge
threads if slow shutdown is requested. */
srv_shutdown_bg_undo_sources();
srv_purge_shutdown();
destroy_thd(thd);
mysql_cond_destroy(&thd_destructor_cond);
mysql_mutex_destroy(&thd_destructor_mutex);
my_thread_end();
return 0;
}
/** Return the InnoDB ROW_FORMAT enum value
@param[in] row_format row_format from "innodb_default_row_format"
@ -547,7 +495,6 @@ performance schema */
static mysql_pfs_key_t commit_cond_mutex_key;
static mysql_pfs_key_t commit_cond_key;
static mysql_pfs_key_t pending_checkpoint_mutex_key;
static mysql_pfs_key_t thd_destructor_thread_key;
static PSI_mutex_info all_pthread_mutexes[] = {
PSI_KEY(commit_cond_mutex),
@ -651,23 +598,10 @@ static PSI_rwlock_info all_innodb_rwlocks[] = {
performance schema instrumented if "UNIV_PFS_THREAD"
is defined */
static PSI_thread_info all_innodb_threads[] = {
PSI_KEY(buf_dump_thread),
PSI_KEY(dict_stats_thread),
PSI_KEY(io_handler_thread),
PSI_KEY(io_ibuf_thread),
PSI_KEY(io_log_thread),
PSI_KEY(io_read_thread),
PSI_KEY(io_write_thread),
PSI_KEY(page_cleaner_thread),
PSI_KEY(recv_writer_thread),
PSI_KEY(srv_error_monitor_thread),
PSI_KEY(srv_lock_timeout_thread),
PSI_KEY(srv_master_thread),
PSI_KEY(srv_monitor_thread),
PSI_KEY(srv_purge_thread),
PSI_KEY(srv_worker_thread),
PSI_KEY(trx_rollback_clean_thread),
PSI_KEY(thd_destructor_thread),
PSI_KEY(thread_pool_thread)
};
# endif /* UNIV_PFS_THREAD */
@ -1586,7 +1520,7 @@ MYSQL_THD
innobase_create_background_thd(const char* name)
/*============================*/
{
MYSQL_THD thd= create_thd();
MYSQL_THD thd= create_background_thd();
thd_proc_info(thd, name);
THDVAR(thd, background_thread) = true;
return thd;
@ -1604,7 +1538,7 @@ innobase_destroy_background_thd(
if innodb is in the PLUGIN_IS_DYING state */
innobase_close_connection(innodb_hton_ptr, thd);
thd_set_ha_data(thd, innodb_hton_ptr, NULL);
destroy_thd(thd);
destroy_background_thd(thd);
}
/** Close opened tables, free memory, delete items for a MYSQL_THD.
@ -4012,6 +3946,7 @@ static int innodb_init(void* p)
innobase_hton->drop_database = innobase_drop_database;
innobase_hton->panic = innobase_end;
innobase_hton->pre_shutdown = innodb_preshutdown;
innobase_hton->start_consistent_snapshot =
innobase_start_trx_and_assign_read_view;
@ -4113,12 +4048,6 @@ static int innodb_init(void* p)
if (err != DB_SUCCESS) {
innodb_shutdown();
DBUG_RETURN(innodb_init_abort());
} else if (!srv_read_only_mode) {
mysql_thread_create(thd_destructor_thread_key,
&thd_destructor_thread,
NULL, thd_destructor_proxy, NULL);
while (!srv_running.load(std::memory_order_relaxed))
os_thread_sleep(20);
}
srv_was_started = true;
@ -4197,17 +4126,6 @@ innobase_end(handlerton*, ha_panic_function)
}
}
if (auto r = srv_running.load(std::memory_order_relaxed)) {
ut_ad(!srv_read_only_mode);
if (!abort_loop) {
// may be UNINSTALL PLUGIN statement
mysql_mutex_lock(r->current_mutex);
r->abort = 1;
mysql_cond_broadcast(r->current_cond);
mysql_mutex_unlock(r->current_mutex);
}
pthread_join(thd_destructor_thread, NULL);
}
innodb_shutdown();
innobase_space_shutdown();
@ -17154,7 +17072,7 @@ fast_shutdown_validate(
uint new_val = *reinterpret_cast<uint*>(save);
if (srv_fast_shutdown && !new_val
&& !srv_running.load(std::memory_order_relaxed)) {
&& !srv_read_only_mode && abort_loop) {
return(1);
}
@ -17203,6 +17121,8 @@ innodb_stopword_table_validate(
return(ret);
}
extern void buf_resize_start();
/** Update the system variable innodb_buffer_pool_size using the "saved"
value. This function is registered as a callback with MySQL.
@param[in] save immediate result from check function */
@ -17216,7 +17136,7 @@ innodb_buffer_pool_size_update(THD*,st_mysql_sys_var*,void*, const void* save)
sizeof(export_vars.innodb_buffer_pool_resize_status),
"Requested to resize buffer pool.");
os_event_set(srv_buf_resize_event);
buf_resize_start();
ib::info() << export_vars.innodb_buffer_pool_resize_status
<< " (new size: " << in_val << " bytes)";
@ -18384,8 +18304,8 @@ innodb_status_output_update(THD*,st_mysql_sys_var*,void*var,const void*save)
{
*static_cast<my_bool*>(var) = *static_cast<const my_bool*>(save);
mysql_mutex_unlock(&LOCK_global_system_variables);
/* Wakeup server monitor thread. */
os_event_set(srv_monitor_event);
/* Wakeup server monitor. */
srv_monitor_timer_schedule_now();
mysql_mutex_lock(&LOCK_global_system_variables);
}
@ -18455,33 +18375,6 @@ innodb_undo_logs_warn(THD* thd, st_mysql_sys_var*, void*, const void*)
innodb_undo_logs_deprecated);
}
#ifdef UNIV_DEBUG
static
void
innobase_debug_sync_callback(srv_slot_t *slot, const void *value)
{
const char *value_str = *static_cast<const char* const*>(value);
size_t len = strlen(value_str) + 1;
// One allocatoin for list node object and value.
void *buf = ut_malloc_nokey(sizeof(srv_slot_t::debug_sync_t) + len);
srv_slot_t::debug_sync_t *sync = new(buf) srv_slot_t::debug_sync_t();
strcpy(reinterpret_cast<char*>(&sync[1]), value_str);
rw_lock_x_lock(&slot->debug_sync_lock);
UT_LIST_ADD_LAST(slot->debug_sync, sync);
rw_lock_x_unlock(&slot->debug_sync_lock);
}
static
void
innobase_debug_sync_set(THD *thd, st_mysql_sys_var*, void *, const void *value)
{
srv_for_each_thread(SRV_WORKER, innobase_debug_sync_callback, value);
srv_for_each_thread(SRV_PURGE, innobase_debug_sync_callback, value);
}
#endif
static SHOW_VAR innodb_status_variables_export[]= {
{"Innodb", (char*) &show_innodb_vars, SHOW_FUNC},
{NullS, NullS, SHOW_LONG}
@ -19812,16 +19705,6 @@ static MYSQL_SYSVAR_BOOL(debug_force_scrubbing,
0,
"Perform extra scrubbing to increase test exposure",
NULL, NULL, FALSE);
char *innobase_debug_sync;
static MYSQL_SYSVAR_STR(debug_sync, innobase_debug_sync,
PLUGIN_VAR_NOCMDARG,
"debug_sync for innodb purge threads. "
"Use it to set up sync points for all purge threads "
"at once. The commands will be applied sequentially at "
"the beginning of purging the next undo record.",
NULL,
innobase_debug_sync_set, NULL);
#endif /* UNIV_DEBUG */
static MYSQL_SYSVAR_BOOL(encrypt_temporary_tables, innodb_encrypt_temporary_tables,
@ -20029,7 +19912,6 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(background_scrub_data_check_interval),
#ifdef UNIV_DEBUG
MYSQL_SYSVAR(debug_force_scrubbing),
MYSQL_SYSVAR(debug_sync),
#endif
MYSQL_SYSVAR(buf_dump_status_frequency),
MYSQL_SYSVAR(background_thread),

View file

@ -86,12 +86,7 @@ void
btr_defragment_save_defrag_stats_if_needed(
dict_index_t* index); /*!< in: index */
/** Merge consecutive b-tree pages into fewer pages to defragment indexes */
extern "C" UNIV_INTERN
os_thread_ret_t
DECLARE_THREAD(btr_defragment_thread)(void*);
/** Whether btr_defragment_thread is active */
extern bool btr_defragment_thread_active;
/* Stop defragmentation.*/
void btr_defragment_end();
extern bool btr_defragment_active;
#endif

View file

@ -261,14 +261,6 @@ buf_frame_will_withdrawn(
buf_pool_t* buf_pool,
const byte* ptr);
/** This is the thread for resizing buffer pool. It waits for an event and
when waked up either performs a resizing and sleeps again.
@return this function does not return, calls os_thread_exit()
*/
extern "C"
os_thread_ret_t
DECLARE_THREAD(buf_resize_thread)(void*);
#ifdef BTR_CUR_HASH_ADAPT
/** Clear the adaptive hash index on all pages in the buffer pool. */
void

View file

@ -100,11 +100,10 @@ void
buf_dblwr_sync_datafiles();
/********************************************************************//**
Flushes possible buffered writes from the doublewrite memory buffer to disk,
and also wakes up the aio thread if simulated aio is used. It is very
important to call this function after a batch of writes has been posted,
and also when we may have to wait for a page latch! Otherwise a deadlock
of threads can occur. */
Flushes possible buffered writes from the doublewrite memory buffer to disk.
It is very important to call this function after a batch of writes
has been posted, and also when we may have to wait for a page latch!
Otherwise a deadlock of threads can occur. */
void
buf_dblwr_flush_buffered_writes();

View file

@ -29,7 +29,7 @@ Created April 08, 2011 Vasil Dimov
#include "univ.i"
/*****************************************************************//**
Wakes up the buffer pool dump/load thread and instructs it to start
Starts the buffer pool dump/load task dump/load thread and instructs it to start
a dump. This function is called by MySQL code via buffer_pool_dump_now()
and it should return immediately because the whole MySQL is frozen during
its execution. */
@ -38,7 +38,7 @@ buf_dump_start();
/*============*/
/*****************************************************************//**
Wakes up the buffer pool dump/load thread and instructs it to start
Starts the buffer pool dump/load task (if not started) and instructs it to start
a load. This function is called by MySQL code via buffer_pool_load_now()
and it should return immediately because the whole MySQL is frozen during
its execution. */
@ -54,16 +54,10 @@ void
buf_load_abort();
/*============*/
/*****************************************************************//**
This is the main thread for buffer pool dump/load. It waits for an
event and when waked up either performs a dump or load and sleeps
again.
@return this function does not return, it calls os_thread_exit() */
extern "C"
os_thread_ret_t
DECLARE_THREAD(buf_dump_thread)(
/*============================*/
void* arg); /*!< in: a dummy parameter
required by os_thread_create */
/** Start async buffer pool load, if srv_buffer_pool_load_at_startup was set.*/
void buf_load_at_startup();
/** Wait for currently running load/dumps to finish*/
void buf_load_dump_end();
#endif /* buf0dump_h */

View file

@ -269,9 +269,7 @@ buf_flush_free_flush_rbt(void);
/********************************************************************//**
Writes a flushable page asynchronously from the buffer pool to a file.
NOTE: in simulated aio we must call
os_aio_simulated_wake_handler_threads after we have posted a batch of
writes! NOTE: buf_pool->mutex and buf_page_get_mutex(bpage) must be
NOTE: buf_pool->mutex and buf_page_get_mutex(bpage) must be
held upon entering this function, and they will be released by this
function.
@return TRUE if page was flushed */

View file

@ -31,9 +31,6 @@ Created Apr 26, 2012 Vasil Dimov
#include "os0event.h"
#include "os0thread.h"
/** Event to wake up dict_stats_thread on dict_stats_recalc_pool_add()
or shutdown. Not protected by any mutex. */
extern os_event_t dict_stats_event;
#ifdef HAVE_PSI_INTERFACE
extern mysql_pfs_key_t dict_stats_recalc_pool_mutex_key;
@ -99,16 +96,16 @@ dict_stats_wait_bg_to_stop_using_table(
unlocking/locking the data dict */
/*****************************************************************//**
Initialize global variables needed for the operation of dict_stats_thread().
Must be called before dict_stats_thread() is started. */
Must be called before dict_stats task is started. */
void
dict_stats_thread_init();
dict_stats_init();
/*====================*/
/*****************************************************************//**
Free resources allocated by dict_stats_thread_init(), must be called
after dict_stats_thread() has exited. */
after dict_stats task has exited. */
void
dict_stats_thread_deinit();
dict_stats_deinit();
/*======================*/
#ifdef UNIV_DEBUG
@ -119,20 +116,17 @@ void dict_stats_disabled_debug_update(THD*, st_mysql_sys_var*, void*,
const void* save);
#endif /* UNIV_DEBUG */
/*****************************************************************//**
This is the thread for background stats gathering. It pops tables, from
the auto recalc list and proceeds them, eventually recalculating their
statistics.
@return this function does not return, it calls os_thread_exit() */
extern "C"
os_thread_ret_t
DECLARE_THREAD(dict_stats_thread)(
/*==============================*/
void* arg); /*!< in: a dummy parameter
required by os_thread_create */
/** Shut down the dict_stats_thread. */
/** Start the dict stats timer */
void
dict_stats_start();
/** Shut down the dict_stats timer. */
void
dict_stats_shutdown();
/** reschedule dict stats timer to run now. */
void
dict_stats_schedule_now();
#endif /* dict0stats_bg_h */

View file

@ -647,14 +647,8 @@ lock_table_has_locks(
table itself */
/*********************************************************************//**
A thread which wakes up threads whose lock wait may have lasted too long.
@return a dummy parameter */
extern "C"
os_thread_ret_t
DECLARE_THREAD(lock_wait_timeout_thread)(
/*=====================================*/
void* arg); /*!< in: a dummy parameter required by
os_thread_create */
A task which wakes up threads whose lock wait may have lasted too long. */
void lock_wait_timeout_task(void*);
/********************************************************************//**
Releases a user OS thread waiting for a lock to be released, if the
@ -791,14 +785,8 @@ public:
ulint n_lock_max_wait_time; /*!< Max wait time */
os_event_t timeout_event; /*!< An event waited for by
lock_wait_timeout_thread.
Not protected by a mutex,
but the waits are timed.
Signaled on shutdown only. */
bool timeout_thread_active; /*!< True if the timeout thread
is running */
std::unique_ptr<tpool::timer> timeout_timer; /*!< Thread pool timer task */
bool timeout_timer_active;
/**

View file

@ -38,6 +38,7 @@ Created 10/21/1995 Heikki Tuuri
#include "fsp0types.h"
#include "os0api.h"
#include "tpool.h"
#ifndef _WIN32
#include <dirent.h>
@ -66,7 +67,7 @@ the OS actually supports it: Win 95 does not, NT does. */
# define UNIV_NON_BUFFERED_IO
/** File handle */
typedef HANDLE os_file_t;
typedef native_file_handle os_file_t;
#else /* _WIN32 */
@ -102,6 +103,14 @@ struct pfs_os_file_t
/** Assignment operator.
@param[in] file file handle to be assigned */
void operator=(os_file_t file) { m_file = file; }
bool operator==(os_file_t file) const { return m_file == file; }
bool operator!=(os_file_t file) const { return !(*this == file); }
#ifndef DBUG_OFF
friend std::ostream& operator<<(std::ostream& os, pfs_os_file_t f){
os << os_file_t(f);
return os;
}
#endif
};
/** The next value should be smaller or equal to the smallest sector size used
@ -206,14 +215,6 @@ public:
/** Disable partial read warnings */
DISABLE_PARTIAL_IO_WARNINGS = 32,
/** Do not to wake i/o-handler threads, but the caller will do
the waking explicitly later, in this way the caller can post
several requests in a batch; NOTE that the batch must not be
so big that it exhausts the slots in AIO arrays! NOTE that
a simulated batch may introduce hidden chances of deadlocks,
because I/Os are not actually handled until all
have been posted: use with great caution! */
DO_NOT_WAKE = 64,
/** Ignore failed reads of non-existent pages */
IGNORE_MISSING = 128,
@ -296,13 +297,6 @@ public:
return((m_type & LOG) == LOG);
}
/** @return true if the simulated AIO thread should be woken up */
bool is_wake() const
MY_ATTRIBUTE((warn_unused_result))
{
return((m_type & DO_NOT_WAKE) == 0);
}
/** Clear the punch hole flag */
void clear_punch_hole()
{
@ -352,12 +346,6 @@ public:
}
}
/** Clear the do not wake flag */
void clear_do_not_wake()
{
m_type &= ~DO_NOT_WAKE;
}
/** Set the pointer to file node for IO
@param[in] node File node */
inline void set_fil_node(fil_node_t* node);
@ -438,7 +426,7 @@ struct os_file_size_t {
};
/** Win NT does not allow more than 64 */
static const ulint OS_AIO_N_PENDING_IOS_PER_THREAD = 32;
static const ulint OS_AIO_N_PENDING_IOS_PER_THREAD = 256;
/** Modes for aio operations @{ */
/** Normal asynchronous i/o not for ibuf pages or ibuf bitmap pages */
@ -450,12 +438,10 @@ static const ulint OS_AIO_IBUF = 22;
/** Asynchronous i/o for the log */
static const ulint OS_AIO_LOG = 23;
/** Asynchronous i/o where the calling thread will itself wait for
the i/o to complete, doing also the job of the i/o-handler thread;
/**Calling thread will wait for the i/o to complete,
and perform IO completion routine itself;
can be used for any pages, ibuf or non-ibuf. This is used to save
CPU time, as we can do with fewer thread switches. Plain synchronous
I/O is not as good, because it must serialize the file seek and read
or write, causing a bottleneck for parallelism. */
CPU time, as we can do with fewer thread switches. */
static const ulint OS_AIO_SYNC = 24;
/* @} */
@ -1396,6 +1382,12 @@ Frees the asynchronous io system. */
void
os_aio_free();
struct os_aio_userdata_t
{
fil_node_t* node;
IORequest type;
void* message;
};
/**
NOTE! Use the corresponding macro os_aio(), not directly this function!
Requests an asynchronous i/o operation.
@ -1428,47 +1420,12 @@ os_aio_func(
fil_node_t* m1,
void* m2);
/** Wakes up all async i/o threads so that they know to exit themselves in
shutdown. */
void
os_aio_wake_all_threads_at_shutdown();
/** Waits until there are no pending writes in os_aio_write_array. There can
be other, synchronous, pending writes. */
void
os_aio_wait_until_no_pending_writes();
/** Wakes up simulated aio i/o-handler threads if they have something to do. */
void
os_aio_simulated_wake_handler_threads();
/** This is the generic AIO handler interface function.
Waits for an aio operation to complete. This function is used to wait the
for completed requests. The AIO array of pending requests is divided
into segments. The thread specifies which segment or slot it wants to wait
for. NOTE: this function will also take care of freeing the aio slot,
therefore no other thread is allowed to do the freeing!
@param[in] segment the number of the segment in the aio arrays to
wait for; segment 0 is the ibuf I/O thread,
segment 1 the log I/O thread, then follow the
non-ibuf read threads, and as the last are the
non-ibuf write threads; if this is
ULINT_UNDEFINED, then it means that sync AIO
is used, and this parameter is ignored
@param[out] m1 the messages passed with the AIO request;
note that also in the case where the AIO
operation failed, these output parameters
are valid and can be used to restart the
operation, for example
@param[out] m2 callback message
@param[out] type OS_FILE_WRITE or ..._READ
@return DB_SUCCESS or error code */
dberr_t
os_aio_handler(
ulint segment,
fil_node_t** m1,
void** m2,
IORequest* type);
/** Prints info of the aio arrays.
@param[in/out] file file where to print */
@ -1484,14 +1441,6 @@ no pending io operations. */
bool
os_aio_all_slots_free();
#ifdef UNIV_DEBUG
/** Prints all pending IO
@param[in] file file where to print */
void
os_aio_print_pending_io(FILE* file);
#endif /* UNIV_DEBUG */
/** This function returns information about the specified file
@param[in] path pathname of the file

View file

@ -378,9 +378,6 @@ struct que_thr_t{
related delete/updates */
row_prebuilt_t* prebuilt; /*!< prebuilt structure processed by
the query thread */
/** a slot of srv_sys.sys_threads, for DEBUG_SYNC in purge thread */
ut_d(srv_slot_t* thread_slot;)
};
#define QUE_THR_MAGIC_N 8476583

View file

@ -32,6 +32,7 @@ Created 10/13/2010 Jimmy Yang
#include "fts0priv.h"
#include "row0merge.h"
#include "btr0bulk.h"
#include "srv0srv.h"
/** This structure defineds information the scan thread will fetch
and put to the linked list for parallel tokenization/sort threads
@ -64,7 +65,6 @@ struct fts_psort_common_t {
trx_t* trx; /*!< transaction */
fts_psort_t* all_info; /*!< all parallel sort info */
os_event_t sort_event; /*!< sort event */
os_event_t merge_event; /*!< merge event */
ibool opt_doc_id_size;/*!< whether to use 4 bytes
instead of 8 bytes integer to
store Doc ID during sort, if
@ -86,11 +86,11 @@ struct fts_psort_t {
/*!< buffer to crypt data */
row_merge_block_t* crypt_alloc[FTS_NUM_AUX_INDEX];
/*!< buffer to allocated */
ulint child_status; /*!< child thread status */
ulint state; /*!< parent thread state */
ulint child_status; /*!< child task status */
ulint state; /*!< parent state */
fts_doc_list_t fts_doc_list; /*!< doc list to process */
fts_psort_common_t* psort_common; /*!< ptr to all psort info */
os_thread_t thread_hdl; /*!< thread handler */
tpool::waitable_task* task; /*!< threadpool task */
dberr_t error; /*!< db error during psort */
ulint memory_used; /*!< memory used by fts_doc_list */
ib_mutex_t mutex; /*!< mutex for fts_doc_list */

View file

@ -51,6 +51,8 @@ Created 10/10/1995 Heikki Tuuri
#include "mysql/psi/mysql_stage.h"
#include "mysql/psi/psi.h"
#include <tpool.h>
#include <memory>
/** Global counters used inside InnoDB. */
struct srv_stats_t
@ -204,21 +206,6 @@ extern const char* srv_main_thread_op_info;
/** Prefix used by MySQL to indicate pre-5.1 table name encoding */
extern const char srv_mysql50_table_name_prefix[10];
/** Event to signal srv_monitor_thread. Not protected by a mutex.
Set after setting srv_print_innodb_monitor. */
extern os_event_t srv_monitor_event;
/** Event to signal the shutdown of srv_error_monitor_thread.
Not protected by a mutex. */
extern os_event_t srv_error_event;
/** Event for waking up buf_dump_thread. Not protected by a mutex.
Set on shutdown or by buf_dump_start() or buf_load_start(). */
extern os_event_t srv_buf_dump_event;
/** The buffer pool resize thread waits on this event. */
extern os_event_t srv_buf_resize_event;
/** The buffer pool dump/load file name */
#define SRV_BUF_DUMP_FILENAME_DEFAULT "ib_buffer_pool"
extern char* srv_buf_dump_filename;
@ -274,7 +261,7 @@ extern unsigned long long srv_online_max_size;
/* If this flag is TRUE, then we will use the native aio of the
OS (provided we compiled Innobase with it in), otherwise we will
use simulated aio we build below with threads.
use simulated aio.
Currently we support native aio on windows and linux */
extern my_bool srv_use_native_aio;
extern my_bool srv_numa_interleave;
@ -505,16 +492,7 @@ extern my_bool srv_print_innodb_lock_monitor;
extern ibool srv_print_verbose_log;
extern bool srv_monitor_active;
extern bool srv_error_monitor_active;
/* TRUE during the lifetime of the buffer pool dump/load thread */
extern bool srv_buf_dump_thread_active;
/* true during the lifetime of the buffer pool resize thread */
extern bool srv_buf_resize_thread_active;
/* TRUE during the lifetime of the stats thread */
extern bool srv_dict_stats_thread_active;
/* TRUE if enable log scrubbing */
extern my_bool srv_scrub_log;
@ -593,23 +571,10 @@ extern ulong srv_buf_dump_status_frequency;
#define srv_max_purge_threads 32
# ifdef UNIV_PFS_THREAD
/* Keys to register InnoDB threads with performance schema */
extern mysql_pfs_key_t buf_dump_thread_key;
extern mysql_pfs_key_t dict_stats_thread_key;
extern mysql_pfs_key_t io_handler_thread_key;
extern mysql_pfs_key_t io_ibuf_thread_key;
extern mysql_pfs_key_t io_log_thread_key;
extern mysql_pfs_key_t io_read_thread_key;
extern mysql_pfs_key_t io_write_thread_key;
extern mysql_pfs_key_t page_cleaner_thread_key;
extern mysql_pfs_key_t recv_writer_thread_key;
extern mysql_pfs_key_t srv_error_monitor_thread_key;
extern mysql_pfs_key_t srv_lock_timeout_thread_key;
extern mysql_pfs_key_t srv_master_thread_key;
extern mysql_pfs_key_t srv_monitor_thread_key;
extern mysql_pfs_key_t srv_purge_thread_key;
extern mysql_pfs_key_t srv_worker_thread_key;
extern mysql_pfs_key_t trx_rollback_clean_thread_key;
extern mysql_pfs_key_t thread_pool_thread_key;
/* This macro register the current thread and its key with performance
schema */
@ -732,17 +697,6 @@ enum srv_stats_method_name_enum {
typedef enum srv_stats_method_name_enum srv_stats_method_name_t;
/** Types of threads existing in the system. */
enum srv_thread_type {
SRV_NONE, /*!< None */
SRV_WORKER, /*!< threads serving parallelized
queries and queries released from
lock wait */
SRV_PURGE, /*!< Purge coordinator thread */
SRV_MASTER /*!< the master thread, (whose type
number must be biggest) */
};
/*********************************************************************//**
Boots Innobase server. */
void
@ -766,10 +720,10 @@ Resets the info describing an i/o thread current state. */
void
srv_reset_io_thread_op_info();
/** Wake up the purge threads if there is work to do. */
/** Wake up the purge if there is work to do. */
void
srv_wake_purge_thread_if_not_active();
/** Wake up the InnoDB master thread if it was suspended (not sleeping). */
/** Wake up the InnoDB master thread */
void
srv_active_wake_master_thread_low();
@ -779,7 +733,7 @@ srv_active_wake_master_thread_low();
srv_active_wake_master_thread_low(); \
} \
} while (0)
/** Wake up the master thread if it is suspended or being suspended. */
/** Wake up the master */
void
srv_wake_master_thread();
@ -832,61 +786,37 @@ srv_que_task_enqueue_low(
que_thr_t* thr); /*!< in: query thread */
/**********************************************************************//**
Check whether any background thread is active. If so, return the thread
type.
@return SRV_NONE if all are are suspended or have exited, thread
type if any are still active. */
enum srv_thread_type
srv_get_active_thread_type(void);
Check whether purge or master is active.
@return false if all are are suspended or have exited, true
if any are still active. */
bool srv_any_background_activity();
/*============================*/
extern "C" {
/*********************************************************************//**
A thread which prints the info output by various InnoDB monitors.
@return a dummy parameter */
os_thread_ret_t
DECLARE_THREAD(srv_monitor_thread)(
/*===============================*/
void* arg); /*!< in: a dummy parameter required by
os_thread_create */
/*********************************************************************//**
The master thread controlling the server.
@return a dummy parameter */
os_thread_ret_t
DECLARE_THREAD(srv_master_thread)(
/*==============================*/
void* arg); /*!< in: a dummy parameter required by
os_thread_create */
/** Periodic task which prints the info output by various InnoDB monitors.*/
void srv_monitor_task(void*);
/** The periodic master task controlling the server. */
void srv_master_callback(void *);
/**
Perform shutdown tasks such as background drop,
and optionally ibuf merge.
*/
void srv_shutdown(bool ibuf_merge);
/*************************************************************************
A thread which prints warnings about semaphore waits which have lasted
A task which prints warnings about semaphore waits which have lasted
too long. These can be used to track bugs which cause hangs.
@return a dummy parameter */
os_thread_ret_t
DECLARE_THREAD(srv_error_monitor_thread)(
/*=====================================*/
void* arg); /*!< in: a dummy parameter required by
os_thread_create */
*/
void srv_error_monitor_task(void*);
/*********************************************************************//**
Purge coordinator thread that schedules the purge tasks.
@return a dummy parameter */
os_thread_ret_t
DECLARE_THREAD(srv_purge_coordinator_thread)(
/*=========================================*/
void* arg MY_ATTRIBUTE((unused))); /*!< in: a dummy parameter
required by os_thread_create */
/*********************************************************************//**
Worker thread that reads tasks from the work queue and executes them.
@return a dummy parameter */
os_thread_ret_t
DECLARE_THREAD(srv_worker_thread)(
/*==============================*/
void* arg MY_ATTRIBUTE((unused))); /*!< in: a dummy parameter
required by os_thread_create */
} /* extern "C" */
/**********************************************************************//**
@ -896,12 +826,6 @@ ulint
srv_get_task_queue_length(void);
/*===========================*/
/** Ensure that a given number of threads of the type given are running
(or are already terminated).
@param[in] type thread type
@param[in] n number of threads that have to run */
void
srv_release_threads(enum srv_thread_type type, ulint n);
/** Wakeup the purge threads. */
void
@ -910,6 +834,12 @@ srv_purge_wakeup();
/** Shut down the purge threads. */
void srv_purge_shutdown();
/** Init purge tasks*/
void srv_init_purge_tasks(uint n_max);
/** Shut down purge tasks*/
void srv_shutdown_purge_tasks();
#ifdef UNIV_DEBUG
/** Disables master thread. It's used by:
SET GLOBAL innodb_master_thread_disabled_debug = 1 (0).
@ -1074,8 +1004,6 @@ struct export_var_t{
/** Thread slot in the thread table. */
struct srv_slot_t{
srv_thread_type type; /*!< thread type: user,
utility etc. */
ibool in_use; /*!< TRUE if this slot
is in use */
ibool suspended; /*!< TRUE if the thread is
@ -1099,22 +1027,29 @@ struct srv_slot_t{
to do */
que_thr_t* thr; /*!< suspended query thread
(only used for user threads) */
#ifdef UNIV_DEBUG
struct debug_sync_t {
UT_LIST_NODE_T(debug_sync_t) debug_sync_list;
};
UT_LIST_BASE_NODE_T(debug_sync_t) debug_sync;
rw_lock_t debug_sync_lock;
#endif
};
#ifdef UNIV_DEBUG
typedef void srv_slot_callback_t(srv_slot_t*, const void*);
extern tpool::thread_pool *srv_thread_pool;
extern std::unique_ptr<tpool::timer> srv_master_timer;
extern std::unique_ptr<tpool::timer> srv_error_monitor_timer;
extern std::unique_ptr<tpool::timer> srv_monitor_timer;
void srv_for_each_thread(srv_thread_type type,
srv_slot_callback_t callback,
const void *arg);
#endif
#define SRV_MONITOR_TIMER_PERIOD 5000
static inline void srv_monitor_timer_schedule_now()
{
srv_monitor_timer->set_time(0, SRV_MONITOR_TIMER_PERIOD);
}
static inline void srv_start_periodic_timer(
std::unique_ptr<tpool::timer>& timer,
void (*func)(void*),
int period)
{
timer.reset(srv_thread_pool->create_timer(func));
timer->set_time(0, period);
}
void srv_thread_pool_init();
void srv_thread_pool_end();
#ifdef WITH_WSREP
UNIV_INTERN

View file

@ -48,6 +48,12 @@ srv_undo_tablespaces_init(bool create_new_db);
@return DB_SUCCESS or error code */
dberr_t srv_start(bool create_new_db);
/**
Shutdown purge to make sure that there is no possibility that we call any
plugin code (e.g audit) inside virtual column computation.
*/
void innodb_preshutdown();
/** Shut down InnoDB. */
void innodb_shutdown();

View file

@ -62,10 +62,6 @@ trx_purge(
ulint n_purge_threads, /*!< in: number of purge tasks to
submit to task queue. */
bool truncate /*!< in: truncate history if true */
#ifdef UNIV_DEBUG
, srv_slot_t *slot /*!< in/out: purge coordinator
thread slot */
#endif
);
/** Rollback segements from a given transaction with trx-no
@ -144,14 +140,11 @@ private:
class purge_sys_t
{
public:
/** signal state changes; os_event_reset() and os_event_set()
are protected by rw_lock_x_lock(latch) */
MY_ALIGNED(CACHE_LINE_SIZE)
os_event_t event;
/** latch protecting view, m_enabled */
MY_ALIGNED(CACHE_LINE_SIZE)
rw_lock_t latch;
private:
bool m_initialized;
/** whether purge is enabled; protected by latch and std::atomic */
std::atomic<bool> m_enabled;
/** number of pending stop() calls without resume() */
@ -162,9 +155,6 @@ public:
MY_ALIGNED(CACHE_LINE_SIZE)
ReadView view; /*!< The purge will not remove undo logs
which are >= this view (purge view) */
/** Number of not completed tasks. Accessed by srv_purge_coordinator
and srv_worker_thread by std::atomic. */
std::atomic<ulint> n_tasks;
/** Iterator to the undo log records of committed transactions */
struct iterator
@ -234,7 +224,7 @@ public:
uninitialised. Real initialisation happens in create().
*/
purge_sys_t() : event(NULL), m_enabled(false), n_tasks(0) {}
purge_sys_t():m_initialized(false),m_enabled(false) {}
/** Create the instance */

View file

@ -461,7 +461,6 @@ void lock_sys_t::create(ulint n_cells)
mutex_create(LATCH_ID_LOCK_SYS_WAIT, &wait_mutex);
timeout_event = os_event_create(0);
rec_hash = hash_create(n_cells);
prdt_hash = hash_create(n_cells);
@ -471,6 +470,7 @@ void lock_sys_t::create(ulint n_cells)
lock_latest_err_file = os_file_create_tmpfile();
ut_a(lock_latest_err_file);
}
timeout_timer_active = false;
}
/** Calculates the fold value of a lock: used in migrating the hash table.
@ -560,8 +560,6 @@ void lock_sys_t::close()
hash_table_free(prdt_hash);
hash_table_free(prdt_page_hash);
os_event_destroy(timeout_event);
mutex_destroy(&mutex);
mutex_destroy(&wait_mutex);
@ -6290,14 +6288,6 @@ lock_trx_lock_list_init(
UT_LIST_INIT(*lock_list, &lock_t::trx_locks);
}
/*******************************************************************//**
Set the lock system timeout event. */
void
lock_set_timeout_event()
/*====================*/
{
os_event_set(lock_sys.timeout_event);
}
#ifdef UNIV_DEBUG
/*******************************************************************//**

View file

@ -36,6 +36,7 @@ Created 25/5/2010 Sunny Bains
#include "row0mysql.h"
#include "srv0start.h"
#include "lock0priv.h"
#include "srv0srv.h"
/*********************************************************************//**
Print the contents of the lock_sys_t::waiting_threads array. */
@ -51,10 +52,9 @@ lock_wait_table_print(void)
for (ulint i = 0; i < srv_max_n_threads; i++, ++slot) {
fprintf(stderr,
"Slot %lu: thread type %lu,"
"Slot %lu:"
" in use %lu, susp %lu, timeout %lu, time %lu\n",
(ulong) i,
(ulong) slot->type,
(ulong) slot->in_use,
(ulong) slot->suspended,
slot->wait_timeout,
@ -164,7 +164,10 @@ lock_wait_table_reserve_slot(
ut_ad(lock_sys.last_slot
<= lock_sys.waiting_threads + srv_max_n_threads);
if (!lock_sys.timeout_timer_active) {
lock_sys.timeout_timer_active = true;
lock_sys.timeout_timer->set_time(1000, 0);
}
return(slot);
}
}
@ -211,7 +214,7 @@ wsrep_is_BF_lock_timeout(
srv_print_innodb_monitor = TRUE;
srv_print_innodb_lock_monitor = TRUE;
os_event_set(srv_monitor_event);
srv_monitor_timer_schedule_now();
return true;
}
return false;
@ -235,6 +238,7 @@ lock_wait_suspend_thread(
ibool was_declared_inside_innodb;
ulong lock_wait_timeout;
ut_a(lock_sys.timeout_timer.get());
trx = thr_get_trx(thr);
if (trx->mysql_thd != 0) {
@ -497,67 +501,33 @@ lock_wait_check_and_cancel(
}
}
/*********************************************************************//**
A thread which wakes up threads whose lock wait may have lasted too long.
@return a dummy parameter */
extern "C"
os_thread_ret_t
DECLARE_THREAD(lock_wait_timeout_thread)(void*)
/** Task that is periodically runs in the thread pool*/
void lock_wait_timeout_task(void*)
{
int64_t sig_count = 0;
os_event_t event = lock_sys.timeout_event;
lock_wait_mutex_enter();
ut_ad(!srv_read_only_mode);
/* Check all slots for user threads that are waiting
on locks, and if they have exceeded the time limit. */
bool any_slot_in_use = false;
for (srv_slot_t* slot = lock_sys.waiting_threads;
slot < lock_sys.last_slot;
++slot) {
#ifdef UNIV_PFS_THREAD
pfs_register_thread(srv_lock_timeout_thread_key);
#endif /* UNIV_PFS_THREAD */
/* We are doing a read without the lock mutex
and/or the trx mutex. This is OK because a slot
can't be freed or reserved without the lock wait
mutex. */
do {
srv_slot_t* slot;
/* When someone is waiting for a lock, we wake up every second
and check if a timeout has passed for a lock wait */
os_event_wait_time_low(event, 1000000, sig_count);
sig_count = os_event_reset(event);
if (srv_shutdown_state >= SRV_SHUTDOWN_CLEANUP) {
break;
if (slot->in_use) {
any_slot_in_use = true;
lock_wait_check_and_cancel(slot);
}
lock_wait_mutex_enter();
/* Check all slots for user threads that are waiting
on locks, and if they have exceeded the time limit. */
for (slot = lock_sys.waiting_threads;
slot < lock_sys.last_slot;
++slot) {
/* We are doing a read without the lock mutex
and/or the trx mutex. This is OK because a slot
can't be freed or reserved without the lock wait
mutex. */
if (slot->in_use) {
lock_wait_check_and_cancel(slot);
}
}
sig_count = os_event_reset(event);
lock_wait_mutex_exit();
} while (srv_shutdown_state < SRV_SHUTDOWN_CLEANUP);
lock_sys.timeout_thread_active = false;
/* We count the number of threads in os_thread_exit(). A created
thread should always use that to exit and not use return() to exit. */
os_thread_exit();
OS_THREAD_DUMMY_RETURN;
}
if (any_slot_in_use) {
lock_sys.timeout_timer->set_time(1000, 0);
} else {
lock_sys.timeout_timer_active = false;
}
lock_wait_mutex_exit();
}

View file

@ -53,6 +53,7 @@ Created 12/9/1995 Heikki Tuuri
#include "trx0roll.h"
#include "srv0mon.h"
#include "sync0sync.h"
#include "buf0dump.h"
/*
General philosophy of InnoDB redo-logs:
@ -1495,6 +1496,7 @@ log_check_margins(void)
} while (check);
}
extern void buf_resize_shutdown();
/****************************************************************//**
Makes a checkpoint at the latest lsn and writes it to first page of each
data file in the database, so that we know that the file spaces contain
@ -1511,26 +1513,37 @@ logs_empty_and_mark_files_at_shutdown(void)
/* Wait until the master thread and all other operations are idle: our
algorithm only works if the server is idle at shutdown */
bool do_srv_shutdown = false;
if (srv_master_timer) {
do_srv_shutdown = srv_fast_shutdown < 2;
srv_master_timer.reset();
}
/* Wait for the end of the buffer resize task.*/
buf_resize_shutdown();
dict_stats_shutdown();
btr_defragment_shutdown();
srv_shutdown_state = SRV_SHUTDOWN_CLEANUP;
if (srv_buffer_pool_dump_at_shutdown &&
!srv_read_only_mode && srv_fast_shutdown < 2) {
buf_dump_start();
}
srv_error_monitor_timer.reset();
srv_monitor_timer.reset();
lock_sys.timeout_timer.reset();
if (do_srv_shutdown) {
srv_shutdown(srv_fast_shutdown == 0);
}
loop:
ut_ad(lock_sys.is_initialised() || !srv_was_started);
ut_ad(log_sys.is_initialised() || !srv_was_started);
ut_ad(fil_system.is_initialised() || !srv_was_started);
os_event_set(srv_buf_resize_event);
if (!srv_read_only_mode) {
os_event_set(srv_error_event);
os_event_set(srv_monitor_event);
os_event_set(srv_buf_dump_event);
if (lock_sys.timeout_thread_active) {
os_event_set(lock_sys.timeout_event);
}
if (dict_stats_event) {
os_event_set(dict_stats_event);
} else {
ut_ad(!srv_dict_stats_thread_active);
}
if (recv_sys.flush_start) {
/* This is in case recv_writer_thread was never
started, or buf_flush_page_cleaner_coordinator
@ -1570,23 +1583,7 @@ loop:
/* We need these threads to stop early in shutdown. */
const char* thread_name;
if (srv_error_monitor_active) {
thread_name = "srv_error_monitor_thread";
} else if (srv_monitor_active) {
thread_name = "srv_monitor_thread";
} else if (srv_buf_resize_thread_active) {
thread_name = "buf_resize_thread";
goto wait_suspend_loop;
} else if (srv_dict_stats_thread_active) {
thread_name = "dict_stats_thread";
} else if (lock_sys.timeout_thread_active) {
thread_name = "lock_wait_timeout_thread";
} else if (srv_buf_dump_thread_active) {
thread_name = "buf_dump_thread";
goto wait_suspend_loop;
} else if (btr_defragment_thread_active) {
thread_name = "btr_defragment_thread";
} else if (srv_fast_shutdown != 2 && trx_rollback_is_active) {
if (srv_fast_shutdown != 2 && trx_rollback_is_active) {
thread_name = "rollback of recovered transactions";
} else {
thread_name = NULL;
@ -1608,26 +1605,17 @@ wait_suspend_loop:
/* Check that the background threads are suspended */
switch (srv_get_active_thread_type()) {
case SRV_NONE:
if (!srv_n_fil_crypt_threads_started) {
srv_shutdown_state = SRV_SHUTDOWN_FLUSH_PHASE;
break;
}
ut_a(!srv_any_background_activity());
if (srv_n_fil_crypt_threads_started) {
os_event_set(fil_crypt_threads_event);
thread_name = "fil_crypt_thread";
goto wait_suspend_loop;
case SRV_PURGE:
case SRV_WORKER:
ut_ad(!"purge was not shut down");
srv_purge_wakeup();
thread_name = "purge thread";
goto wait_suspend_loop;
case SRV_MASTER:
thread_name = "master thread";
goto wait_suspend_loop;
}
buf_load_dump_end();
srv_shutdown_state = SRV_SHUTDOWN_FLUSH_PHASE;
/* At this point only page_cleaner should be active. We wait
here to let it complete the flushing of the buffer pools
before proceeding further. */
@ -1740,7 +1728,7 @@ wait_suspend_loop:
srv_shutdown_state = SRV_SHUTDOWN_LAST_PHASE;
/* Make some checks that the server really is quiet */
ut_a(srv_get_active_thread_type() == SRV_NONE);
ut_a(!srv_any_background_activity());
service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
"Free innodb buffer pool");
@ -1768,7 +1756,7 @@ wait_suspend_loop:
fil_close_all_files();
/* Make some checks that the server really is quiet */
ut_a(srv_get_active_thread_type() == SRV_NONE);
ut_a(!srv_any_background_activity());
ut_a(lsn == log_sys.lsn
|| srv_force_recovery == SRV_FORCE_NO_LOG_REDO);

File diff suppressed because it is too large Load diff

View file

@ -216,7 +216,6 @@ row_fts_psort_info_init(
common_info->trx = trx;
common_info->all_info = psort_info;
common_info->sort_event = os_event_create(0);
common_info->merge_event = os_event_create(0);
common_info->opt_doc_id_size = opt_doc_id_size;
if (log_tmp_is_encrypted()) {
@ -350,7 +349,6 @@ row_fts_psort_info_destroy(
}
os_event_destroy(merge_info[0].psort_common->sort_event);
os_event_destroy(merge_info[0].psort_common->merge_event);
ut_free(merge_info[0].psort_common->dup);
ut_free(merge_info[0].psort_common);
ut_free(psort_info);
@ -754,10 +752,9 @@ row_merge_fts_get_next_doc_item(
/*********************************************************************//**
Function performs parallel tokenization of the incoming doc strings.
It also performs the initial in memory sort of the parsed records.
@return OS_THREAD_DUMMY_RETURN */
*/
static
os_thread_ret_t
DECLARE_THREAD(fts_parallel_tokenization)(
void fts_parallel_tokenization(
/*======================*/
void* arg) /*!< in: psort_info for the thread */
{
@ -1065,10 +1062,6 @@ func_exit:
psort_info->child_status = FTS_CHILD_COMPLETE;
os_event_set(psort_info->psort_common->sort_event);
psort_info->child_status = FTS_CHILD_EXITING;
os_thread_exit();
OS_THREAD_DUMMY_RETURN;
}
/*********************************************************************//**
@ -1079,23 +1072,20 @@ row_fts_start_psort(
fts_psort_t* psort_info) /*!< parallel sort structure */
{
ulint i = 0;
os_thread_id_t thd_id;
for (i = 0; i < fts_sort_pll_degree; i++) {
psort_info[i].psort_id = i;
psort_info[i].thread_hdl =
os_thread_create(fts_parallel_tokenization,
(void*) &psort_info[i],
&thd_id);
psort_info[i].task =
new tpool::waitable_task(fts_parallel_tokenization,&psort_info[i]);
srv_thread_pool->submit_task(psort_info[i].task);
}
}
/*********************************************************************//**
Function performs the merge and insertion of the sorted records.
@return OS_THREAD_DUMMY_RETURN */
Function performs the merge and insertion of the sorted records. */
static
os_thread_ret_t
DECLARE_THREAD(fts_parallel_merge)(
void
fts_parallel_merge(
/*===============*/
void* arg) /*!< in: parallel merge info */
{
@ -1109,14 +1099,6 @@ DECLARE_THREAD(fts_parallel_merge)(
row_fts_merge_insert(psort_info->psort_common->dup->index,
psort_info->psort_common->new_table,
psort_info->psort_common->all_info, id);
psort_info->child_status = FTS_CHILD_COMPLETE;
os_event_set(psort_info->psort_common->merge_event);
psort_info->child_status = FTS_CHILD_EXITING;
os_thread_exit(false);
OS_THREAD_DUMMY_RETURN;
}
/*********************************************************************//**
@ -1128,15 +1110,15 @@ row_fts_start_parallel_merge(
{
ulint i = 0;
/* Kick off merge/insert threads */
/* Kick off merge/insert tasks */
for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
merge_info[i].psort_id = i;
merge_info[i].child_status = 0;
merge_info[i].thread_hdl = os_thread_create(
merge_info[i].task = new tpool::waitable_task(
fts_parallel_merge,
(void*) &merge_info[i],
&merge_info[i].thread_hdl);
(void*) &merge_info[i]);
srv_thread_pool->submit_task(merge_info[i].task);
}
}

View file

@ -2771,10 +2771,6 @@ all_done:
DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Scan Table\n");
#endif
if (fts_pll_sort) {
bool all_exit = false;
ulint trial_count = 0;
const ulint max_trial_count = 10000;
wait_again:
/* Check if error occurs in child thread */
for (ulint j = 0; j < fts_sort_pll_degree; j++) {
@ -2807,27 +2803,9 @@ wait_again:
}
}
/* Now all children should complete, wait a bit until
they all finish setting the event, before we free everything.
This has a 10 second timeout */
do {
all_exit = true;
for (ulint j = 0; j < fts_sort_pll_degree; j++) {
if (psort_info[j].child_status
!= FTS_CHILD_EXITING) {
all_exit = false;
os_thread_sleep(1000);
break;
}
}
trial_count++;
} while (!all_exit && trial_count < max_trial_count);
if (!all_exit) {
ib::fatal() << "Not all child sort threads exited"
" when creating FTS index '"
<< fts_sort_idx->name << "'";
for (ulint j = 0; j < fts_sort_pll_degree; j++) {
psort_info[j].task->wait();
delete psort_info[j].task;
}
}
@ -4109,7 +4087,7 @@ row_merge_file_create_low(
File f = create_temp_file(filename, path, "ib",
O_BINARY | O_SEQUENTIAL,
MYF(MY_WME | MY_TEMPORARY));
pfs_os_file_t fd = IF_WIN(my_get_osfhandle(f), f);
pfs_os_file_t fd = IF_WIN((os_file_t)my_get_osfhandle(f), f);
#ifdef UNIV_PFS_IO
register_pfs_file_open_end(locker, fd,
@ -4155,7 +4133,7 @@ row_merge_file_destroy_low(
const pfs_os_file_t& fd) /*!< in: merge file descriptor */
{
if (fd != OS_FILE_CLOSED) {
int res = mysql_file_close(IF_WIN(my_win_handle2File(fd), fd),
int res = mysql_file_close(IF_WIN(my_win_handle2File((os_file_t)fd), fd),
MYF(MY_WME));
ut_a(res != -1);
}
@ -4589,7 +4567,6 @@ row_merge_build_indexes(
dict_index_t* fts_sort_idx = NULL;
fts_psort_t* psort_info = NULL;
fts_psort_t* merge_info = NULL;
int64_t sig_count = 0;
bool fts_psort_initiated = false;
double total_static_cost = 0;
@ -4756,65 +4733,14 @@ row_merge_build_indexes(
}
if (indexes[i]->type & DICT_FTS) {
os_event_t fts_parallel_merge_event;
sort_idx = fts_sort_idx;
fts_parallel_merge_event
= merge_info[0].psort_common->merge_event;
if (FTS_PLL_MERGE) {
ulint trial_count = 0;
bool all_exit = false;
os_event_reset(fts_parallel_merge_event);
row_fts_start_parallel_merge(merge_info);
wait_again:
os_event_wait_time_low(
fts_parallel_merge_event, 1000000,
sig_count);
for (j = 0; j < FTS_NUM_AUX_INDEX; j++) {
if (merge_info[j].child_status
!= FTS_CHILD_COMPLETE
&& merge_info[j].child_status
!= FTS_CHILD_EXITING) {
sig_count = os_event_reset(
fts_parallel_merge_event);
goto wait_again;
}
}
/* Now all children should complete, wait
a bit until they all finish using event */
while (!all_exit && trial_count < 10000) {
all_exit = true;
for (j = 0; j < FTS_NUM_AUX_INDEX;
j++) {
if (merge_info[j].child_status
!= FTS_CHILD_EXITING) {
all_exit = false;
os_thread_sleep(1000);
break;
}
}
trial_count++;
}
if (!all_exit) {
ib::error() << "Not all child merge"
" threads exited when creating"
" FTS index '"
<< indexes[i]->name << "'";
} else {
for (j = 0; j < FTS_NUM_AUX_INDEX;
j++) {
os_thread_join(merge_info[j]
.thread_hdl);
}
merge_info[j].task->wait();
delete merge_info[j].task;
}
} else {
/* This cannot report duplicates; an

View file

@ -3435,7 +3435,7 @@ row_drop_table_for_mysql(
dict_stats_recalc_pool_del(table);
dict_stats_defrag_pool_del(table, NULL);
if (btr_defragment_thread_active) {
if (btr_defragment_active) {
/* During fts_drop_orphaned_tables() in
recv_recovery_rollback_active() the
btr_defragment_mutex has not yet been

View file

@ -1311,26 +1311,6 @@ row_purge_step(
node->start();
#ifdef UNIV_DEBUG
srv_slot_t *slot = thr->thread_slot;
ut_ad(slot);
rw_lock_x_lock(&slot->debug_sync_lock);
while (UT_LIST_GET_LEN(slot->debug_sync)) {
srv_slot_t::debug_sync_t *sync =
UT_LIST_GET_FIRST(slot->debug_sync);
const char* sync_str = reinterpret_cast<char*>(&sync[1]);
bool result = debug_sync_set_action(current_thd,
sync_str,
strlen(sync_str));
ut_a(!result);
UT_LIST_REMOVE(slot->debug_sync, sync);
ut_free(sync);
}
rw_lock_x_unlock(&slot->debug_sync_lock);
#endif
if (!(node->undo_recs == NULL || ib_vector_is_empty(node->undo_recs))) {
trx_purge_rec_t*purge_rec;

File diff suppressed because it is too large Load diff

View file

@ -145,19 +145,18 @@ the initialisation step. */
enum srv_start_state_t {
/** No thread started */
SRV_START_STATE_NONE = 0, /*!< No thread started */
/** lock_wait_timeout_thread started */
SRV_START_STATE_LOCK_SYS = 1, /*!< Started lock-timeout
thread. */
/** lock_wait_timeout timer task started */
SRV_START_STATE_LOCK_SYS = 1,
/** buf_flush_page_cleaner_coordinator,
buf_flush_page_cleaner_worker started */
SRV_START_STATE_IO = 2,
/** srv_error_monitor_thread, srv_monitor_thread started */
/** srv_error_monitor_thread, srv_print_monitor_task started */
SRV_START_STATE_MONITOR = 4,
/** srv_master_thread started */
SRV_START_STATE_MASTER = 8,
/** srv_purge_coordinator_thread, srv_worker_thread started */
SRV_START_STATE_PURGE = 16,
/** fil_crypt_thread, btr_defragment_thread started
/** fil_crypt_thread,
(all background threads that can generate redo log but not undo log */
SRV_START_STATE_REDO = 32
};
@ -172,40 +171,16 @@ enum srv_shutdown_t srv_shutdown_state = SRV_SHUTDOWN_NONE;
/** Files comprising the system tablespace */
pfs_os_file_t files[1000];
/** io_handler_thread parameters for thread identification */
static ulint n[SRV_MAX_N_IO_THREADS + 6];
/** io_handler_thread identifiers, 32 is the maximum number of purge threads */
/** 6 is the ? */
#define START_OLD_THREAD_CNT (SRV_MAX_N_IO_THREADS + 6 + 32)
static os_thread_id_t thread_ids[SRV_MAX_N_IO_THREADS + 6 + 32];
/** Thead handles */
static os_thread_t thread_handles[SRV_MAX_N_IO_THREADS + 6 + 32];
static os_thread_t buf_dump_thread_handle;
static os_thread_t dict_stats_thread_handle;
/** Status variables, is thread started ?*/
static bool thread_started[SRV_MAX_N_IO_THREADS + 6 + 32] = {false};
/** Name of srv_monitor_file */
static char* srv_monitor_file_name;
std::unique_ptr<tpool::timer> srv_master_timer;
/** */
#define SRV_MAX_N_PENDING_SYNC_IOS 100
#ifdef UNIV_PFS_THREAD
/* Keys to register InnoDB threads with performance schema */
mysql_pfs_key_t buf_dump_thread_key;
mysql_pfs_key_t dict_stats_thread_key;
mysql_pfs_key_t io_handler_thread_key;
mysql_pfs_key_t io_ibuf_thread_key;
mysql_pfs_key_t io_log_thread_key;
mysql_pfs_key_t io_read_thread_key;
mysql_pfs_key_t io_write_thread_key;
mysql_pfs_key_t srv_error_monitor_thread_key;
mysql_pfs_key_t srv_lock_timeout_thread_key;
mysql_pfs_key_t srv_master_thread_key;
mysql_pfs_key_t srv_monitor_thread_key;
mysql_pfs_key_t srv_purge_thread_key;
mysql_pfs_key_t srv_worker_thread_key;
mysql_pfs_key_t thread_pool_thread_key;
#endif /* UNIV_PFS_THREAD */
#ifdef HAVE_PSI_STAGE_INTERFACE
@ -275,64 +250,6 @@ srv_file_check_mode(
return(true);
}
/********************************************************************//**
I/o-handler thread function.
@return OS_THREAD_DUMMY_RETURN */
extern "C"
os_thread_ret_t
DECLARE_THREAD(io_handler_thread)(
/*==============================*/
void* arg) /*!< in: pointer to the number of the segment in
the aio array */
{
ulint segment;
segment = *((ulint*) arg);
#ifdef UNIV_DEBUG_THREAD_CREATION
ib::info() << "Io handler thread " << segment << " starts, id "
<< os_thread_pf(os_thread_get_curr_id());
#endif
/* For read only mode, we don't need ibuf and log I/O thread.
Please see srv_start() */
ulint start = (srv_read_only_mode) ? 0 : 2;
if (segment < start) {
if (segment == 0) {
pfs_register_thread(io_ibuf_thread_key);
} else {
ut_ad(segment == 1);
pfs_register_thread(io_log_thread_key);
}
} else if (segment >= start
&& segment < (start + srv_n_read_io_threads)) {
pfs_register_thread(io_read_thread_key);
} else if (segment >= (start + srv_n_read_io_threads)
&& segment < (start + srv_n_read_io_threads
+ srv_n_write_io_threads)) {
pfs_register_thread(io_write_thread_key);
} else {
pfs_register_thread(io_handler_thread_key);
}
while (srv_shutdown_state != SRV_SHUTDOWN_EXIT_THREADS
|| buf_page_cleaner_is_active
|| !os_aio_all_slots_free()) {
fil_aio_wait(segment);
}
/* We count the number of threads in os_thread_exit(). A created
thread should always use that to exit and not use return() to exit.
The thread actually never comes here because it is exited in an
os_event_wait(). */
os_thread_exit();
OS_THREAD_DUMMY_RETURN;
}
/*********************************************************************//**
Creates a log file.
@ -1066,6 +983,13 @@ srv_shutdown_all_bg_threads()
ut_ad(!srv_undo_sources);
srv_shutdown_state = SRV_SHUTDOWN_EXIT_THREADS;
lock_sys.timeout_timer.reset();
srv_master_timer.reset();
if (purge_sys.enabled()) {
srv_purge_shutdown();
}
/* All threads end up waiting for certain events. Put those events
to the signaled state. Then the threads will exit themselves after
os_event_wait(). */
@ -1073,26 +997,10 @@ srv_shutdown_all_bg_threads()
/* NOTE: IF YOU CREATE THREADS IN INNODB, YOU MUST EXIT THEM
HERE OR EARLIER */
if (srv_start_state_is_set(SRV_START_STATE_LOCK_SYS)) {
/* a. Let the lock timeout thread exit */
os_event_set(lock_sys.timeout_event);
}
if (!srv_read_only_mode) {
/* b. srv error monitor thread exits automatically,
no need to do anything here */
if (srv_start_state_is_set(SRV_START_STATE_MASTER)) {
/* c. We wake the master thread so that
it exits */
srv_wake_master_thread();
}
if (srv_start_state_is_set(SRV_START_STATE_PURGE)) {
/* d. Wakeup purge threads. */
srv_purge_wakeup();
}
if (srv_n_fil_crypt_threads_started) {
os_event_set(fil_crypt_threads_event);
}
@ -1120,25 +1028,11 @@ srv_shutdown_all_bg_threads()
return;
}
switch (srv_operation) {
case SRV_OPERATION_BACKUP:
case SRV_OPERATION_RESTORE_DELTA:
break;
case SRV_OPERATION_NORMAL:
case SRV_OPERATION_RESTORE:
case SRV_OPERATION_RESTORE_EXPORT:
if (!buf_page_cleaner_is_active
&& os_aio_all_slots_free()) {
os_aio_wake_all_threads_at_shutdown();
}
}
os_thread_sleep(100000);
}
ib::warn() << os_thread_count << " threads created by InnoDB"
" had not exited at shutdown!";
ut_d(os_aio_print_pending_io(stderr));
ut_ad(0);
}
@ -1379,10 +1273,7 @@ dberr_t srv_start(bool create_new_db)
srv_max_n_threads = 1 /* io_ibuf_thread */
+ 1 /* io_log_thread */
+ 1 /* lock_wait_timeout_thread */
+ 1 /* srv_error_monitor_thread */
+ 1 /* srv_monitor_thread */
+ 1 /* srv_master_thread */
+ 1 /* srv_print_monitor_task */
+ 1 /* srv_purge_coordinator_thread */
+ 1 /* buf_dump_thread */
+ 1 /* dict_stats_thread */
@ -1535,15 +1426,6 @@ dberr_t srv_start(bool create_new_db)
recv_sys.create();
lock_sys.create(srv_lock_table_size);
/* Create i/o-handler threads: */
for (ulint t = 0; t < srv_n_file_io_threads; ++t) {
n[t] = t;
thread_handles[t] = os_thread_create(io_handler_thread, n + t, thread_ids + t);
thread_started[t] = true;
}
if (!srv_read_only_mode) {
buf_flush_page_cleaner_init();
@ -1803,7 +1685,7 @@ files_checked:
/* Initialize objects used by dict stats gathering thread, which
can also be used by recovery if it tries to drop some table */
if (!srv_read_only_mode) {
dict_stats_thread_init();
dict_stats_init();
}
trx_sys.create();
@ -2239,28 +2121,16 @@ files_checked:
srv_startup_is_before_trx_rollback_phase = false;
if (!srv_read_only_mode) {
/* Create the thread which watches the timeouts
/* timer task which watches the timeouts
for lock waits */
thread_handles[2 + SRV_MAX_N_IO_THREADS] = os_thread_create(
lock_wait_timeout_thread,
NULL, thread_ids + 2 + SRV_MAX_N_IO_THREADS);
thread_started[2 + SRV_MAX_N_IO_THREADS] = true;
lock_sys.timeout_thread_active = true;
lock_sys.timeout_timer.reset(srv_thread_pool->create_timer(
lock_wait_timeout_task));
DBUG_EXECUTE_IF("innodb_skip_monitors", goto skip_monitors;);
/* Create the thread which warns of long semaphore waits */
srv_error_monitor_active = true;
thread_handles[3 + SRV_MAX_N_IO_THREADS] = os_thread_create(
srv_error_monitor_thread,
NULL, thread_ids + 3 + SRV_MAX_N_IO_THREADS);
thread_started[3 + SRV_MAX_N_IO_THREADS] = true;
/* Create the task which warns of long semaphore waits */
srv_start_periodic_timer(srv_error_monitor_timer, srv_error_monitor_task, 1000);
srv_start_periodic_timer(srv_monitor_timer, srv_monitor_task, 5000);
/* Create the thread which prints InnoDB monitor info */
srv_monitor_active = true;
thread_handles[4 + SRV_MAX_N_IO_THREADS] = os_thread_create(
srv_monitor_thread,
NULL, thread_ids + 4 + SRV_MAX_N_IO_THREADS);
thread_started[4 + SRV_MAX_N_IO_THREADS] = true;
srv_start_state |= SRV_START_STATE_LOCK_SYS
| SRV_START_STATE_MONITOR;
@ -2272,11 +2142,8 @@ skip_monitors:
if (srv_force_recovery < SRV_FORCE_NO_BACKGROUND) {
srv_undo_sources = true;
/* Create the dict stats gathering thread */
srv_dict_stats_thread_active = true;
dict_stats_thread_handle = os_thread_create(
dict_stats_thread, NULL, NULL);
/* Create the dict stats gathering task */
dict_stats_start();
/* Create the thread that will optimize the
FULLTEXT search index subsystem. */
fts_optimize_init();
@ -2316,42 +2183,16 @@ skip_monitors:
trx_temp_rseg_create();
if (srv_force_recovery < SRV_FORCE_NO_BACKGROUND) {
thread_handles[1 + SRV_MAX_N_IO_THREADS]
= os_thread_create(srv_master_thread, NULL,
(1 + SRV_MAX_N_IO_THREADS)
+ thread_ids);
thread_started[1 + SRV_MAX_N_IO_THREADS] = true;
srv_start_state_set(SRV_START_STATE_MASTER);
srv_start_periodic_timer(srv_master_timer, srv_master_callback, 1000);
}
}
if (!srv_read_only_mode && srv_operation == SRV_OPERATION_NORMAL
&& srv_force_recovery < SRV_FORCE_NO_BACKGROUND) {
thread_handles[5 + SRV_MAX_N_IO_THREADS] = os_thread_create(
srv_purge_coordinator_thread,
NULL, thread_ids + 5 + SRV_MAX_N_IO_THREADS);
thread_started[5 + SRV_MAX_N_IO_THREADS] = true;
ut_a(UT_ARR_SIZE(thread_ids)
> 5 + srv_n_purge_threads + SRV_MAX_N_IO_THREADS);
/* We've already created the purge coordinator thread above. */
for (i = 1; i < srv_n_purge_threads; ++i) {
thread_handles[5 + i + SRV_MAX_N_IO_THREADS] = os_thread_create(
srv_worker_thread, NULL,
thread_ids + 5 + i + SRV_MAX_N_IO_THREADS);
thread_started[5 + i + SRV_MAX_N_IO_THREADS] = true;
}
while (srv_shutdown_state == SRV_SHUTDOWN_NONE
&& srv_force_recovery < SRV_FORCE_NO_BACKGROUND
&& !purge_sys.enabled()) {
ib::info() << "Waiting for purge to start";
os_thread_sleep(50000);
}
srv_init_purge_tasks(srv_n_purge_threads);
purge_sys.coordinator_startup();
srv_wake_purge_thread_if_not_active();
srv_start_state_set(SRV_START_STATE_PURGE);
}
@ -2396,10 +2237,8 @@ skip_monitors:
if (!get_wsrep_recovery()) {
#endif /* WITH_WSREP */
/* Create the buffer pool dump/load thread */
srv_buf_dump_thread_active = true;
buf_dump_thread_handle=
os_thread_create(buf_dump_thread, NULL, NULL);
/* Start buffer pool dump/load task */
buf_load_at_startup();
#ifdef WITH_WSREP
} else {
@ -2420,16 +2259,10 @@ skip_monitors:
/* Initialize online defragmentation. */
btr_defragment_init();
btr_defragment_thread_active = true;
os_thread_create(btr_defragment_thread, NULL, NULL);
srv_start_state |= SRV_START_STATE_REDO;
}
/* Create the buffer pool resize thread */
srv_buf_resize_thread_active = true;
os_thread_create(buf_resize_thread, NULL, NULL);
return(DB_SUCCESS);
}
@ -2448,12 +2281,38 @@ void srv_shutdown_bg_undo_sources()
}
}
/**
Perform pre-shutdown task.
Since purge tasks vall into server (some MDL acqusition,
and compute virtual functions), let them shut down right
after use connections go down while the rest of the server
infrasture is still intact.
*/
void innodb_preshutdown()
{
static bool first_time = true;
if (!first_time)
return;
first_time = false;
if (!srv_read_only_mode) {
if (!srv_fast_shutdown && srv_operation == SRV_OPERATION_NORMAL) {
while (trx_sys.any_active_transactions()) {
os_thread_sleep(1000);
}
}
srv_shutdown_bg_undo_sources();
srv_purge_shutdown();
}
}
/** Shut down InnoDB. */
void innodb_shutdown()
{
ut_ad(!srv_running.load(std::memory_order_relaxed));
innodb_preshutdown();
ut_ad(!srv_undo_sources);
switch (srv_operation) {
case SRV_OPERATION_BACKUP:
case SRV_OPERATION_RESTORE:
@ -2489,7 +2348,6 @@ void innodb_shutdown()
srv_misc_tmpfile = 0;
}
ut_ad(dict_stats_event || !srv_was_started || srv_read_only_mode);
ut_ad(dict_sys.is_initialised() || !srv_was_started);
ut_ad(trx_sys.is_initialised() || !srv_was_started);
ut_ad(buf_dblwr || !srv_was_started || srv_read_only_mode
@ -2501,9 +2359,7 @@ void innodb_shutdown()
#endif /* BTR_CUR_HASH_ADAPT */
ut_ad(ibuf.index || !srv_was_started);
if (dict_stats_event) {
dict_stats_thread_deinit();
}
dict_stats_deinit();
if (srv_start_state_is_set(SRV_START_STATE_REDO)) {
ut_ad(!srv_read_only_mode);
@ -2565,7 +2421,7 @@ void innodb_shutdown()
<< srv_shutdown_lsn
<< "; transaction id " << trx_sys.get_max_trx_id();
}
srv_thread_pool_end();
srv_start_state = SRV_START_STATE_NONE;
srv_was_started = false;
srv_start_has_been_called = false;
@ -2605,3 +2461,5 @@ srv_get_meta_data_filename(
ut_free(path);
}

View file

@ -1096,9 +1096,7 @@ sync_array_print_long_waits(
srv_print_innodb_monitor = TRUE;
lock_set_timeout_event();
os_thread_sleep(30000000);
lock_wait_timeout_task(nullptr);
srv_print_innodb_monitor = static_cast<my_bool>(old_val);
fprintf(stderr,

View file

@ -161,9 +161,6 @@ void purge_sys_t::create()
{
ut_ad(this == &purge_sys);
ut_ad(!enabled());
ut_ad(!event);
event= os_event_create(0);
ut_ad(event);
m_paused= 0;
query= purge_graph_build();
next_stored= false;
@ -176,16 +173,18 @@ void purge_sys_t::create()
mutex_create(LATCH_ID_PURGE_SYS_PQ, &pq_mutex);
truncate.current= NULL;
truncate.last= NULL;
m_initialized = true;
}
/** Close the purge subsystem on shutdown. */
void purge_sys_t::close()
{
ut_ad(this == &purge_sys);
if (!event) return;
if (!m_initialized)
return;
ut_ad(this == &purge_sys);
ut_ad(!enabled());
ut_ad(n_tasks.load(std::memory_order_relaxed) == 0);
trx_t* trx = query->trx;
que_graph_free(query);
ut_ad(!trx->id);
@ -194,7 +193,7 @@ void purge_sys_t::close()
trx_free(trx);
rw_lock_free(&latch);
mutex_free(&pq_mutex);
os_event_destroy(event);
m_initialized = false;
}
/*================ UNDO LOG HISTORY LIST =============================*/
@ -1250,21 +1249,14 @@ trx_purge_dml_delay(void)
return(delay);
}
extern tpool::waitable_task purge_worker_task;
/** Wait for pending purge jobs to complete. */
static
void
trx_purge_wait_for_workers_to_complete()
{
/* Ensure that the work queue empties out. */
while (purge_sys.n_tasks.load(std::memory_order_acquire)) {
if (srv_get_task_queue_length() > 0) {
srv_release_threads(SRV_WORKER, 1);
}
os_thread_yield();
}
purge_worker_task.wait();
/* There should be no outstanding tasks as long
as the worker threads are active. */
ut_a(srv_get_task_queue_length() == 0);
@ -1279,10 +1271,6 @@ trx_purge(
ulint n_purge_threads, /*!< in: number of purge tasks
to submit to the work queue */
bool truncate /*!< in: truncate history if true */
#ifdef UNIV_DEBUG
, srv_slot_t *slot /*!< in/out: purge coordinator
thread slot */
#endif
)
{
que_thr_t* thr = NULL;
@ -1292,9 +1280,6 @@ trx_purge(
srv_dml_needed_delay = trx_purge_dml_delay();
/* All submitted tasks should be completed. */
ut_ad(purge_sys.n_tasks.load(std::memory_order_relaxed) == 0);
rw_lock_x_lock(&purge_sys.latch);
trx_sys.clone_oldest_view();
rw_lock_x_unlock(&purge_sys.latch);
@ -1307,24 +1292,21 @@ trx_purge(
/* Fetch the UNDO recs that need to be purged. */
n_pages_handled = trx_purge_attach_undo_recs(n_purge_threads);
purge_sys.n_tasks.store(n_purge_threads - 1, std::memory_order_relaxed);
/* Submit tasks to workers queue if using multi-threaded purge. */
for (ulint i = n_purge_threads; --i; ) {
for (ulint i = 0; i < n_purge_threads-1; i++) {
thr = que_fork_scheduler_round_robin(purge_sys.query, thr);
ut_a(thr);
srv_que_task_enqueue_low(thr);
srv_thread_pool->submit_task(&purge_worker_task);
}
thr = que_fork_scheduler_round_robin(purge_sys.query, thr);
ut_d(thr->thread_slot = slot);
que_run_threads(thr);
trx_purge_wait_for_workers_to_complete();
ut_ad(purge_sys.n_tasks.load(std::memory_order_relaxed) == 0);
if (truncate) {
trx_purge_truncate_history();
}
@ -1335,6 +1317,8 @@ trx_purge(
return(n_pages_handled);
}
extern tpool::waitable_task purge_coordinator_task;
/** Stop purge during FLUSH TABLES FOR EXPORT */
void purge_sys_t::stop()
{
@ -1352,14 +1336,8 @@ void purge_sys_t::stop()
if (m_paused++ == 0)
{
/* We need to wakeup the purge thread in case it is suspended, so
that it can acknowledge the state change. */
const int64_t sig_count = os_event_reset(event);
rw_lock_x_unlock(&latch);
ib::info() << "Stopping purge";
srv_purge_wakeup();
/* Wait for purge coordinator to signal that it is suspended. */
os_event_wait_low(event, sig_count);
MONITOR_ATOMIC_INC(MONITOR_PURGE_STOP_COUNT);
return;
}
@ -1369,8 +1347,7 @@ void purge_sys_t::stop()
if (running())
{
ib::info() << "Waiting for purge to stop";
while (running())
os_thread_sleep(10000);
purge_coordinator_task.wait();
}
}
@ -1384,6 +1361,7 @@ void purge_sys_t::resume()
return;
}
rw_lock_x_lock(&latch);
int32_t paused= m_paused--;
ut_a(paused);
@ -1393,4 +1371,5 @@ void purge_sys_t::resume()
srv_purge_wakeup();
MONITOR_ATOMIC_INC(MONITOR_PURGE_RESUME_COUNT);
}
rw_lock_x_unlock(&latch);
}